You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/04 01:32:00 UTC

svn commit: r1393814 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/graph/ src/test/java/org/apache/giraph/

Author: aching
Date: Wed Oct  3 23:32:00 2012
New Revision: 1393814

URL: http://svn.apache.org/viewvc?rev=1393814&view=rev
Log:
GIRAPH-212: Security is busted since GIRAPH-168

Added:
    giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
      - copied, changed from r1393275, giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/pom.xml
    giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Wed Oct  3 23:32:00 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-212: Security is busted since GIRAPH-168. (ekoontz via
+  aching)
+
   GIRAPH-315: giraph-site.xml isn't read on time. (majakabiljo via
   aching)
 

Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Wed Oct  3 23:32:00 2012
@@ -551,8 +551,9 @@ under the License.
                 </goals>
               </execution>
             </executions>
+	    <!-- profile: hadoop_0.20.203 -->
             <configuration>
-              <symbols>HADOOP_NON_SASL_RPC,HADOOP_NON_INTERVERSIONED_RPC</symbols>
+              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
             </configuration>
           </plugin>
         </plugins>
@@ -594,8 +595,9 @@ under the License.
                 </goals>
               </execution>
             </executions>
+	    <!-- profile: hadoop_1.0 -->
             <configuration>
-              <symbols>HADOOP_NON_SASL_RPC,HADOOP_NON_INTERVERSIONED_RPC</symbols>
+              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
             </configuration>
           </plugin>
         </plugins>
@@ -620,6 +622,12 @@ under the License.
           <version>${hadoop.version}</version>
           <scope>provided</scope>
         </dependency>
+	<dependency>
+	  <groupId>commons-collections</groupId>
+	  <artifactId>commons-collections</artifactId>
+	  <scope>provided</scope>
+	  <version>3.2.1</version>
+	</dependency>
       </dependencies>
       <build>
         <plugins>
@@ -637,7 +645,8 @@ under the License.
               </execution>
             </executions>
             <configuration>
-              <symbols>HADOOP_NON_SECURE,HADOOP_NON_SASL_RPC,HADOOP_NON_INTERVERSIONED_RPC</symbols>
+	      <!-- profile: hadoop_non_secure -->
+              <symbols>HADOOP_NON_SECURE,HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
             </configuration>
           </plugin>
           <plugin>
@@ -647,6 +656,7 @@ under the License.
             <configuration>
               <excludes>
                 <exclude>**/BspTokenSelector.java</exclude>
+                <exclude>**/SecureRPCCommunications.java</exclude>
               </excludes>
               <source>${compileSource}</source>
               <target>${compileSource}</target>
@@ -695,6 +705,7 @@ under the License.
             <directory>${basedir}/src/main/java/org/apache/giraph/hadoop</directory>
             <excludes>
               <exclude>BspTokenSelector.java</exclude>
+	      <exclude>SecureRPCCommunications.java</exclude>
             </excludes>
           </resource>
         </resources>
@@ -706,6 +717,7 @@ under the License.
             <configuration>
               <excludes>
                 <exclude>**/BspTokenSelector.java</exclude>
+		<exclude>**/SecureRPCCommunications.java</exclude>
               </excludes>
               <source>${compileSource}</source>
               <target>${compileSource}</target>
@@ -724,8 +736,9 @@ under the License.
                 </goals>
               </execution>
             </executions>
+	    <!-- profile: hadoop_facebook -->
             <configuration>
-              <symbols>HADOOP_NON_SECURE,HADOOP_NON_SASL_RPC</symbols>
+              <symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
             </configuration>
           </plugin>
           <plugin>
@@ -745,6 +758,9 @@ under the License.
       </build>
     </profile>
 
+    <!-- Help keep future Hadoop versions munge-free: 
+	 All profiles below are munge-free: avoid introducing any munge
+         flags on any of the following profiles. -->
     <profile>
       <id>hadoop_0.23</id>
        <activation>

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Wed Oct  3 23:32:00 2012
@@ -33,6 +33,10 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+/*if[HADOOP_NON_INTERVERSIONED_RPC]
+else[HADOOP_NON_INTERVERSIONED_RPC]*/
+import org.apache.hadoop.ipc.ProtocolSignature;
+/*end[HADOOP_NON_INTERVERSIONED_RPC]*/
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -60,11 +64,6 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-/*if[HADOOP_NON_INTERVERSIONED_RPC]
-else[HADOOP_NON_INTERVERSIONED_RPC]*/
-import org.apache.hadoop.ipc.ProtocolSignature;
-/*end[HADOOP_NON_INTERVERSIONED_RPC]*/
-
 /**
  * Basic RPC communications object that implements the lower level operations
  * for RPC communication.
@@ -237,7 +236,7 @@ public abstract class BasicRPCCommunicat
     /**
      * Constructor.
      *
-     * @param peerConnection Connection to send the messsages to.
+     * @param peerConnection Connection to send the messages to.
      * @param context Context of the mapper.
      */
     PeerFlushExecutor(PeerConnection peerConnection,
@@ -1302,4 +1301,11 @@ public abstract class BasicRPCCommunicat
   public String getName() {
     return myName;
   }
+
+  @Override
+  public ServerData<I, V, E, M> getServerData() {
+    throw
+      new IllegalStateException("getServerData() called" +
+      "while using Hadoop RPC: should only be used by Netty RPC.");
+  }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Wed Oct  3 23:32:00 2012
@@ -22,34 +22,12 @@ import java.io.IOException;
 
 import java.net.InetSocketAddress;
 
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import java.security.PrivilegedExceptionAction;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.hadoop.security.token.Token;
-/*end[HADOOP_NON_SECURE]*/
-
 import org.apache.log4j.Logger;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.GraphState;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import org.apache.giraph.hadoop.BspPolicyProvider;
-/*end[HADOOP_NON_SECURE]*/
 import org.apache.hadoop.conf.Configuration;
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.io.Text;
-/*end[HADOOP_NON_SECURE]*/
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.ipc.RPC;
@@ -67,11 +45,7 @@ import org.apache.hadoop.mapreduce.Mappe
 @SuppressWarnings("rawtypes")
 public class RPCCommunications<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
-  /*if[HADOOP_NON_SASL_RPC]
     extends BasicRPCCommunications<I, V, E, M, Object> {
-    else[HADOOP_NON_SASL_RPC]*/
-    extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
-  /*end[HADOOP_NON_SASL_RPC]*/
 
   /** Class logger */
   public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
@@ -99,23 +73,7 @@ public class RPCCommunications<I extends
     *
     * @return Job token.
     */
-  protected
-  /*if[HADOOP_NON_SECURE]
-  Object createJobToken() throws IOException {
-  else[HADOOP_NON_SECURE]*/
-  Token<JobTokenIdentifier> createJobToken() throws IOException {
-  /*end[HADOOP_NON_SECURE]*/
-  /*if[HADOOP_NON_SECURE]
-  else[HADOOP_NON_SECURE]*/
-    String localJobTokenFile = System.getenv().get(
-        UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-    if (localJobTokenFile != null) {
-      JobConf jobConf = new JobConf(conf);
-      Credentials credentials =
-          TokenCache.loadTokens(localJobTokenFile, jobConf);
-      return TokenCache.getJobToken(credentials);
-    }
-  /*end[HADOOP_NON_SECURE]*/
+  protected Object createJobToken() throws IOException {
     return null;
   }
 
@@ -131,34 +89,12 @@ public class RPCCommunications<I extends
   @Override
   protected Server getRPCServer(
       InetSocketAddress myAddress, int numHandlers, String jobId,
-      /*if[HADOOP_NON_SASL_RPC]
       Object jt) throws IOException {
-    return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
-        numHandlers, false, conf);
-    else[HADOOP_NON_SASL_RPC]*/
-      Token<JobTokenIdentifier> jt) throws IOException {
-    @SuppressWarnings("deprecation")
-    JobTokenSecretManager jobTokenSecretManager =
-        new JobTokenSecretManager();
-    if (jt != null) { //could be null in the case of some unit tests
-      jobTokenSecretManager.addTokenForJob(jobId, jt);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("getRPCServer: Added jobToken " + jt);
-      }
-    }
-    Server server = RPC.getServer(RPCCommunications.class, this,
-      myAddress.getHostName(), myAddress.getPort(),
-      numHandlers, false, conf, jobTokenSecretManager);
-    String hadoopSecurityAuthorization =
-      ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
-    if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
-      server.refreshServiceAcl(conf, new BspPolicyProvider());
-    }
+    Server server = RPC.getServer(this, myAddress.getHostName(),
+        myAddress.getPort(), numHandlers, false, conf);
     return server;
-    /*end[HADOOP_NON_SASL_RPC]*/
   }
 
-
   /**
    * Get the RPC proxy.
    *
@@ -169,47 +105,10 @@ public class RPCCommunications<I extends
    */
   @SuppressWarnings("unchecked")
   protected CommunicationsInterface<I, V, E, M> getRPCProxy(
-    final InetSocketAddress addr,
-    String jobId,
-    /*if[HADOOP_NON_SASL_RPC]
-    Object jt)
-      else[HADOOP_NON_SASL_RPC]*/
-    Token<JobTokenIdentifier> jt)
-    /*end[HADOOP_NON_SASL_RPC]*/
+    final InetSocketAddress addr, String jobId, Object jt)
     throws IOException, InterruptedException {
     final Configuration config = new Configuration(conf);
-    /*if[HADOOP_NON_SASL_RPC]
-        return (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
-                 CommunicationsInterface.class, VERSION_ID, addr, config);
-      else[HADOOP_NON_SASL_RPC]*/
-    if (jt == null) {
-      return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
-          CommunicationsInterface.class, VERSION_ID, addr, config);
-    }
-    jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
-        addr.getPort()));
-    UserGroupInformation current = UserGroupInformation.getCurrentUser();
-    current.addToken(jt);
-    UserGroupInformation owner =
-        UserGroupInformation.createRemoteUser(jobId);
-    owner.addToken(jt);
-    return
-      owner.doAs(new PrivilegedExceptionAction<
-        CommunicationsInterface<I, V, E, M>>() {
-        @Override
-        @SuppressWarnings("unchecked")
-        public CommunicationsInterface<I, V, E, M> run() throws Exception {
-          // All methods in CommunicationsInterface will be used for RPC
-          return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
-            CommunicationsInterface.class, VERSION_ID, addr, config);
-        }
-      });
-    /*end[HADOOP_NON_SASL_RPC]*/
-  }
-
-  @Override
-  public ServerData<I, V, E, M> getServerData() {
-    throw new IllegalStateException(
-        "getServerData: Tried to get ServerData while using RPC");
+    return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+      CommunicationsInterface.class, VERSION_ID, addr, config);
   }
 }

Copied: giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java (from r1393275, giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java)
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java?p2=giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java&p1=giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java&r1=1393275&r2=1393814&rev=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java Wed Oct  3 23:32:00 2012
@@ -21,11 +21,17 @@ package org.apache.giraph.comm;
 import java.io.IOException;
 
 import java.net.InetSocketAddress;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
 import java.security.PrivilegedExceptionAction;
+
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -33,28 +39,18 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.Token;
-/*end[HADOOP_NON_SECURE]*/
 
 import org.apache.log4j.Logger;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.ImmutableClassesGiraphConfiguration;
-
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
 import org.apache.giraph.hadoop.BspPolicyProvider;
-/*end[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.conf.Configuration;
-/*if[HADOOP_NON_SECURE]
-else[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.io.Text;
-/*end[HADOOP_NON_SECURE]*/
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.mapreduce.Mapper;
+
+
+/*if[HADOOP_NON_INTERVERSIONED_RPC]
+else[HADOOP_NON_INTERVERSIONED_RPC]*/
+import org.apache.hadoop.ipc.ProtocolSignature;
+/*end[HADOOP_NON_INTERVERSIONED_RPC]*/
 
 /**
  * Used to implement abstract {@link BasicRPCCommunications} methods.
@@ -64,61 +60,68 @@ import org.apache.hadoop.mapreduce.Mappe
  * @param <E> Edge data
  * @param <M> Message data
  */
-@SuppressWarnings("rawtypes")
-public class RPCCommunications<I extends WritableComparable,
-    V extends Writable, E extends Writable, M extends Writable>
-  /*if[HADOOP_NON_SASL_RPC]
-    extends BasicRPCCommunications<I, V, E, M, Object> {
-    else[HADOOP_NON_SASL_RPC]*/
-    extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
-  /*end[HADOOP_NON_SASL_RPC]*/
+public class SecureRPCCommunications<I extends WritableComparable,
+  V extends Writable, E extends Writable, M extends Writable>
+   extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
 
   /** Class logger */
-  public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
+  public static final Logger LOG =
+    Logger.getLogger(SecureRPCCommunications.class);
 
   /**
    * Constructor.
    *
    * @param context Context to be saved.
-   * @param configuration Configuration
    * @param service Server worker.
+   * @param configuration Configuration.
    * @param graphState Graph state from infrastructure.
    * @throws IOException
    * @throws InterruptedException
    */
-  public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
-      CentralizedServiceWorker<I, V, E, M> service,
-      ImmutableClassesGiraphConfiguration configuration,
-      GraphState<I, V, E, M> graphState) throws
-      IOException, InterruptedException {
+  public SecureRPCCommunications(Mapper<?, ?, ?, ?>.Context context,
+                           CentralizedServiceWorker<I, V, E, M> service,
+                           ImmutableClassesGiraphConfiguration configuration,
+                           GraphState<I, V, E, M> graphState) throws
+    IOException, InterruptedException {
     super(context, configuration, service);
   }
 
   /**
-    * Create the job token.
-    *
-    * @return Job token.
-    */
-  protected
-  /*if[HADOOP_NON_SECURE]
-  Object createJobToken() throws IOException {
-  else[HADOOP_NON_SECURE]*/
-  Token<JobTokenIdentifier> createJobToken() throws IOException {
-  /*end[HADOOP_NON_SECURE]*/
-  /*if[HADOOP_NON_SECURE]
-  else[HADOOP_NON_SECURE]*/
+   * Create the job token.
+   *
+   * @return Job token.
+   */
+  protected Token<JobTokenIdentifier> createJobToken() throws IOException {
     String localJobTokenFile = System.getenv().get(
-        UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+      UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
     if (localJobTokenFile != null) {
       JobConf jobConf = new JobConf(conf);
       Credentials credentials =
-          TokenCache.loadTokens(localJobTokenFile, jobConf);
+        TokenCache.loadTokens(localJobTokenFile, jobConf);
       return TokenCache.getJobToken(credentials);
     }
-  /*end[HADOOP_NON_SECURE]*/
     return null;
   }
 
+  /*if[HADOOP_NON_INTERVERSIONED_RPC]
+  else[HADOOP_NON_INTERVERSIONED_RPC]*/
+  /**
+   * Get the Protocol Signature for the given protocol,
+   * client version and method.
+   *
+   * @param protocol Protocol.
+   * @param clientVersion Version of Client.
+   * @param clientMethodsHash Hash of Client methods.
+   * @return ProtocolSignature for input parameters.
+   */
+  public ProtocolSignature getProtocolSignature(
+    String protocol,
+    long clientVersion,
+    int clientMethodsHash) throws IOException {
+    return new ProtocolSignature(VERSION_ID, null);
+  }
+  /*end[HADOOP_NON_INTERVERSIONED_RPC]*/
+
   /**
    * Get the RPC server.
    *
@@ -129,36 +132,50 @@ public class RPCCommunications<I extends
    * @return RPC server.
    */
   @Override
-  protected Server getRPCServer(
-      InetSocketAddress myAddress, int numHandlers, String jobId,
-      /*if[HADOOP_NON_SASL_RPC]
-      Object jt) throws IOException {
-    return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
-        numHandlers, false, conf);
-    else[HADOOP_NON_SASL_RPC]*/
-      Token<JobTokenIdentifier> jt) throws IOException {
+  protected RPC.Server getRPCServer(
+    InetSocketAddress myAddress, int numHandlers, String jobId,
+    Token<JobTokenIdentifier> jt) throws IOException {
     @SuppressWarnings("deprecation")
     JobTokenSecretManager jobTokenSecretManager =
-        new JobTokenSecretManager();
-    if (jt != null) { //could be null in the case of some unit tests
+      new JobTokenSecretManager();
+    if (jt != null) { //could be null in the case of some unit tests:
+      // TODO: unit tests should use SecureRPCCommunications or
+      // RPCCommunications
+      // TODO: remove jt from RPCCommunications.
       jobTokenSecretManager.addTokenForJob(jobId, jt);
       if (LOG.isInfoEnabled()) {
         LOG.info("getRPCServer: Added jobToken " + jt);
       }
     }
-    Server server = RPC.getServer(RPCCommunications.class, this,
+
+    // TODO: make munge tag more specific: just use HADOOP_1 maybe.
+    /*if[HADOOP_1_AUTHORIZATION]
+    // Hadoop 1-style authorization.
+    Server server = RPC.getServer(this,
       myAddress.getHostName(), myAddress.getPort(),
       numHandlers, false, conf, jobTokenSecretManager);
+
+    String hadoopSecurityAuthorization =
+      ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
+    if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
+      ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
+    }
+    else[HADOOP_1_AUTHORIZATION]*/
+    // Hadoop 2+-style authorization.
+    Server server = RPC.getServer(this,
+      myAddress.getHostName(), myAddress.getPort(),
+      numHandlers, false, conf);
+
     String hadoopSecurityAuthorization =
       ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
     if (conf.getBoolean(hadoopSecurityAuthorization, false)) {
-      server.refreshServiceAcl(conf, new BspPolicyProvider());
+      ServiceAuthorizationManager sam = new ServiceAuthorizationManager();
+      sam.refresh(conf, new BspPolicyProvider());
     }
+    /*end[HADOOP_1_AUTHORIZATION]*/
     return server;
-    /*end[HADOOP_NON_SASL_RPC]*/
   }
 
-
   /**
    * Get the RPC proxy.
    *
@@ -167,31 +184,24 @@ public class RPCCommunications<I extends
    * @param jt Job token.
    * @return Proxy of the RPC server.
    */
+  @Override
   @SuppressWarnings("unchecked")
   protected CommunicationsInterface<I, V, E, M> getRPCProxy(
     final InetSocketAddress addr,
     String jobId,
-    /*if[HADOOP_NON_SASL_RPC]
-    Object jt)
-      else[HADOOP_NON_SASL_RPC]*/
     Token<JobTokenIdentifier> jt)
-    /*end[HADOOP_NON_SASL_RPC]*/
     throws IOException, InterruptedException {
     final Configuration config = new Configuration(conf);
-    /*if[HADOOP_NON_SASL_RPC]
-        return (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
-                 CommunicationsInterface.class, VERSION_ID, addr, config);
-      else[HADOOP_NON_SASL_RPC]*/
     if (jt == null) {
       return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
-          CommunicationsInterface.class, VERSION_ID, addr, config);
+        CommunicationsInterface.class, VERSION_ID, addr, config);
     }
     jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
-        addr.getPort()));
+      addr.getPort()));
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
     current.addToken(jt);
     UserGroupInformation owner =
-        UserGroupInformation.createRemoteUser(jobId);
+      UserGroupInformation.createRemoteUser(jobId);
     owner.addToken(jt);
     return
       owner.doAs(new PrivilegedExceptionAction<
@@ -204,12 +214,5 @@ public class RPCCommunications<I extends
             CommunicationsInterface.class, VERSION_ID, addr, config);
         }
       });
-    /*end[HADOOP_NON_SASL_RPC]*/
-  }
-
-  @Override
-  public ServerData<I, V, E, M> getServerData() {
-    throw new IllegalStateException(
-        "getServerData: Tried to get ServerData while using RPC");
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Oct  3 23:32:00 2012
@@ -21,10 +21,14 @@ package org.apache.giraph.graph;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.RPCCommunications;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.giraph.comm.netty.NettyWorkerClientServer;
+/*if[HADOOP_NON_SECURE]
+import org.apache.giraph.comm.RPCCommunications;
+else[HADOOP_NON_SECURE]*/
+import org.apache.giraph.comm.SecureRPCCommunications;
+/*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionExchange;
 import org.apache.giraph.graph.partition.PartitionOwner;
@@ -159,10 +163,15 @@ public class BspServiceWorker<I extends 
       commService =  new NettyWorkerClientServer<I, V, E, M>(
           context, getConfiguration(), this);
     } else {
+/*if[HADOOP_NON_SECURE]
+      commService =
+          new RPCCommunications<I, V, E, M>(context, this, getConfiguration(),
+          graphState);
+else[HADOOP_NON_SECURE]*/
       commService =
-          new RPCCommunications<I, V, E, M>(context, this,
-              getConfiguration(),
-              graphState);
+        new SecureRPCCommunications<I, V, E, M>(context, this,
+          getConfiguration(), graphState);
+/*end[HADOOP_NON_SECURE]*/
     }
     if (LOG.isInfoEnabled()) {
       LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +

Modified: giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1393814&r1=1393813&r2=1393814&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Wed Oct  3 23:32:00 2012
@@ -30,6 +30,7 @@ import org.apache.giraph.examples.Simple
 import org.apache.giraph.examples.SimpleSuperstepVertex;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
 import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.LocalityInfoSorter;
@@ -53,10 +54,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobContext;
-/*if[HADOOP_NON_SASL_RPC]
-else[HADOOP_NON_SASL_RPC]*/
+/*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
+else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
-/*end[HADOOP_NON_SASL_RPC]*/
+/*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 
@@ -127,15 +128,15 @@ public class TestBspBasic extends BspCas
         ", graphState" + gs);
     VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable>
     inputFormat = configuration.createVertexInputFormat();
-    /*if[HADOOP_NON_SASL_RPC]
+/*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
       List<InputSplit> splitArray =
           inputFormat.getSplits(
               new JobContext(new Configuration(), new JobID()), 1);
-    else[HADOOP_NON_SASL_RPC]*/
+else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
       List<InputSplit> splitArray =
           inputFormat.getSplits(
               new JobContextImpl(new Configuration(), new JobID()), 1);
-      /*end[HADOOP_NON_SASL_RPC]*/
+/*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
     ByteArrayOutputStream byteArrayOutputStream =
         new ByteArrayOutputStream();
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);