You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/12/11 08:21:14 UTC

svn commit: r725603 [1/2] - in /hadoop/core/trunk: ./ bin/ conf/ src/core/org/apache/hadoop/fs/permission/ src/core/org/apache/hadoop/ipc/ src/core/org/apache/hadoop/security/ src/core/org/apache/hadoop/security/authorize/ src/hdfs/org/apache/hadoop/hd...

Author: acmurthy
Date: Wed Dec 10 23:21:13 2008
New Revision: 725603

URL: http://svn.apache.org/viewvc?rev=725603&view=rev
Log:
HADOOP-4348. Add service-level authorization for Hadoop.

Added:
    hadoop/core/trunk/conf/hadoop-policy.xml.template
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/ConnectionHeader.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Status.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/Group.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/SecurityUtil.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/User.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/AuthorizationException.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConfiguredPolicy.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConnectionPermission.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/PolicyProvider.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/Service.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ServiceAuthorizationManager.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapReducePolicyProvider.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/tools/
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
    hadoop/core/trunk/src/test/hadoop-policy.xml
    hadoop/core/trunk/src/test/org/apache/hadoop/security/TestAccessControlList.java
    hadoop/core/trunk/src/test/org/apache/hadoop/security/authorize/
    hadoop/core/trunk/src/test/org/apache/hadoop/security/authorize/HadoopPolicyProvider.java
    hadoop/core/trunk/src/test/org/apache/hadoop/security/authorize/TestConfiguredPolicy.java
    hadoop/core/trunk/src/test/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java
Modified:
    hadoop/core/trunk/.gitignore
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/bin/hadoop
    hadoop/core/trunk/build.xml
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/permission/AccessControlException.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
    hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Server.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlException.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/UnixUserGroupInformation.java
    hadoop/core/trunk/src/core/org/apache/hadoop/security/UserGroupInformation.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/QueueManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/cli/TestCLI.java
    hadoop/core/trunk/src/test/org/apache/hadoop/cli/testConf.xml
    hadoop/core/trunk/src/test/org/apache/hadoop/cli/util/CLITestData.java
    hadoop/core/trunk/src/test/org/apache/hadoop/cli/util/CommandExecutor.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPC.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestIPCServerResponder.java
    hadoop/core/trunk/src/test/org/apache/hadoop/ipc/TestRPC.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: hadoop/core/trunk/.gitignore
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/.gitignore?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/.gitignore (original)
+++ hadoop/core/trunk/.gitignore Wed Dec 10 23:21:13 2008
@@ -23,6 +23,7 @@
 conf/slaves
 conf/hadoop-env.sh
 conf/hadoop-site.xml
+conf/hadoop-policy.xml
 conf/capacity-scheduler.xml
 docs/api/
 logs/

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Dec 10 23:21:13 2008
@@ -82,6 +82,8 @@
     move DataNode information to a separated page. (Boris Shkolnik via
     szetszwo)
 
+    HADOOP-4348. Add service-level authorization for Hadoop. (acmurthy) 
+
   IMPROVEMENTS
 
     HADOOP-4749. Added a new counter REDUCE_INPUT_BYTES. (Yongqiang He via 

Modified: hadoop/core/trunk/bin/hadoop
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/bin/hadoop?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/bin/hadoop (original)
+++ hadoop/core/trunk/bin/hadoop Wed Dec 10 23:21:13 2008
@@ -60,6 +60,7 @@
   echo "  namenode             run the DFS namenode"
   echo "  datanode             run a DFS datanode"
   echo "  dfsadmin             run a DFS admin client"
+  echo "  mradmin              run a Map-Reduce admin client"
   echo "  fsck                 run a DFS filesystem checking utility"
   echo "  fs                   run a generic filesystem user client"
   echo "  balancer             run a cluster balancing utility"
@@ -166,6 +167,11 @@
   HADOOP_LOGFILE='hadoop.log'
 fi
 
+# default policy file for service-level authorization
+if [ "$HADOOP_POLICYFILE" = "" ]; then
+  HADOOP_POLICYFILE="hadoop-policy.xml"
+fi
+
 # restore ordinary behaviour
 unset IFS
 
@@ -188,6 +194,9 @@
 elif [ "$COMMAND" = "dfsadmin" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
+elif [ "$COMMAND" = "mradmin" ] ; then
+  CLASS=org.apache.hadoop.mapred.tools.MRAdmin
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
 elif [ "$COMMAND" = "fsck" ] ; then
   CLASS=org.apache.hadoop.hdfs.tools.DFSck
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
@@ -268,6 +277,7 @@
 if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
   HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$JAVA_LIBRARY_PATH"
 fi  
+HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.policy.file=$HADOOP_POLICYFILE"
 
 # run it
 exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS -classpath "$CLASSPATH" $CLASS "$@"

Modified: hadoop/core/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Wed Dec 10 23:21:13 2008
@@ -96,6 +96,7 @@
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.testjar" value="${test.build.dir}/testjar"/>
   <property name="test.build.testshell" value="${test.build.dir}/testshell"/>
+  <property name="test.build.extraconf" value="${test.build.dir}/extraconf"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
   <property name="test.build.javadoc.dev" value="${test.build.dir}/docs/dev-api"/>
   <property name="test.include" value="Test*"/>
@@ -168,6 +169,7 @@
 
   <!-- the unit test classpath: uses test.src.dir for configuration -->
   <path id="test.classpath">
+    <pathelement location="${test.build.extraconf}"/>
     <pathelement location="${test.build.classes}" />
     <pathelement location="${test.src.dir}"/>
     <pathelement location="${build.dir}"/>
@@ -226,6 +228,7 @@
     <mkdir dir="${test.build.classes}"/>
     <mkdir dir="${test.build.testjar}"/>
     <mkdir dir="${test.build.testshell}"/>
+    <mkdir dir="${test.build.extraconf}"/>
     <tempfile property="touch.temp.file" destDir="${java.io.tmpdir}"/>
     <touch millis="0" file="${touch.temp.file}">
       <fileset dir="${conf.dir}" includes="**/*.template"/>
@@ -685,6 +688,8 @@
     <mkdir dir="${test.build.data}"/>
     <delete dir="${test.log.dir}"/>
     <mkdir dir="${test.log.dir}"/>
+  	<copy file="${test.src.dir}/hadoop-policy.xml" 
+  	  todir="${test.build.extraconf}" />
     <junit showoutput="${test.output}"
       printsummary="${test.junit.printsummary}"
       haltonfailure="${test.junit.haltonfailure}"
@@ -698,6 +703,8 @@
       <sysproperty key="test.debug.data" value="${test.debug.data}"/>
       <sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
       <sysproperty key="test.src.dir" value="${test.src.dir}"/>
+      <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
+      <sysproperty key="hadoop.policy.file" value="hadoop-policy.xml"/>
       <sysproperty key="java.library.path"
        value="${build.native}/lib:${lib.dir}/native/${build.platform}"/>
       <sysproperty key="install.c++.examples" value="${install.c++.examples}"/>

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Wed Dec 10 23:21:13 2008
@@ -31,6 +31,12 @@
   ordering of the filters.</description>
 </property>
 
+<property>
+  <name>hadoop.security.authorization</name>
+  <value>false</value>
+  <description>Is service-level authorization enabled?</description>
+</property>
+
 <!--- logging properties -->
 
 <property>

Added: hadoop/core/trunk/conf/hadoop-policy.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-policy.xml.template?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/conf/hadoop-policy.xml.template (added)
+++ hadoop/core/trunk/conf/hadoop-policy.xml.template Wed Dec 10 23:21:13 2008
@@ -0,0 +1,97 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+  <property>
+    <name>security.client.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for ClientProtocol, which is used by user code 
+    via the DistributedFileSystem. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.client.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for ClientDatanodeProtocol, the client-to-datanode protocol 
+    for block recovery.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for DatanodeProtocol, which is used by datanodes to 
+    communicate with the namenode.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.inter.datanode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for InterDatanodeProtocol, the inter-datanode protocol
+    for updating generation timestamp.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.namenode.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for NamenodeProtocol, the protocol used by the secondary
+    namenode to communicate with the namenode.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.inter.tracker.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for InterTrackerProtocol, used by the tasktrackers to 
+    communicate with the jobtracker.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.job.submission.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for JobSubmissionProtocol, used by job clients to 
+    communciate with the jobtracker for job submission, querying job status etc.
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.task.umbilical.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for TaskUmbilicalProtocol, used by the map and reduce 
+    tasks to communicate with the parent tasktracker. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+  <property>
+    <name>security.refresh.policy.protocol.acl</name>
+    <value>*</value>
+    <description>ACL for RefreshAuthorizationPolicyProtocol, used by the 
+    dfsadmin and mradmin commands to refresh the security policy in-effect. 
+    The ACL is a comma-separated list of user and group names. The user and 
+    group list is separated by a blank. For e.g. "alice,bob users,wheel". 
+    A special value of "*" means all users are allowed.</description>
+  </property>
+
+</configuration>

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/permission/AccessControlException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/permission/AccessControlException.java?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/permission/AccessControlException.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/permission/AccessControlException.java Wed Dec 10 23:21:13 2008
@@ -40,4 +40,17 @@
   public AccessControlException(String s) {
     super(s);
   }
+  
+  /**
+   * Constructs a new exception with the specified cause and a detail
+   * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+   * typically contains the class and detail message of <tt>cause</tt>).
+   * @param  cause the cause (which is saved for later retrieval by the
+   *         {@link #getCause()} method).  (A <tt>null</tt> value is
+   *         permitted, and indicates that the cause is nonexistent or
+   *         unknown.)
+   */
+  public AccessControlException(Throwable cause) {
+    super(cause);
+  }
 }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Client.java Wed Dec 10 23:21:13 2008
@@ -44,7 +44,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -175,7 +174,10 @@
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
   private class Connection extends Thread {
-    private ConnectionId remoteId;
+    private InetSocketAddress server;             // server ip:port
+    private ConnectionHeader header;              // connection header
+    private ConnectionId remoteId;                // connection id
+    
     private Socket socket = null;                 // connected socket
     private DataInputStream in;
     private DataOutputStream out;
@@ -186,17 +188,19 @@
     private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
     private IOException closeException; // close reason
 
-    public Connection(InetSocketAddress address) throws IOException {
-      this(new ConnectionId(address, null));
-    }
-    
     public Connection(ConnectionId remoteId) throws IOException {
-      if (remoteId.getAddress().isUnresolved()) {
+      this.remoteId = remoteId;
+      this.server = remoteId.getAddress();
+      if (server.isUnresolved()) {
         throw new UnknownHostException("unknown host: " + 
                                        remoteId.getAddress().getHostName());
       }
-      this.remoteId = remoteId;
+      
       UserGroupInformation ticket = remoteId.getTicket();
+      Class<?> protocol = remoteId.getProtocol();
+      header = 
+        new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket);
+      
       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
           remoteId.getAddress().toString() +
           " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
@@ -290,7 +294,7 @@
       short timeoutFailures = 0;
       try {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Connecting to "+remoteId.getAddress());
+          LOG.debug("Connecting to "+server);
         }
         while (true) {
           try {
@@ -362,7 +366,7 @@
         Thread.sleep(1000);
       } catch (InterruptedException ignored) {}
       
-      LOG.info("Retrying connect to server: " + remoteId.getAddress() + 
+      LOG.info("Retrying connect to server: " + server + 
           ". Already tried " + curRetries + " time(s).");
     }
 
@@ -370,12 +374,15 @@
      * Out is not synchronized because only the first thread does this.
      */
     private void writeHeader() throws IOException {
+      // Write out the header and version
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);
-      //When there are more fields we can have ConnectionHeader Writable.
+
+      // Write out the ConnectionHeader
       DataOutputBuffer buf = new DataOutputBuffer();
-      ObjectWritable.writeObject(buf, remoteId.getTicket(), 
-                                 UserGroupInformation.class, conf);
+      header.write(buf);
+      
+      // Write out the payload length
       int bufLen = buf.getLength();
       out.writeInt(bufLen);
       out.write(buf.getData(), 0, bufLen);
@@ -413,7 +420,7 @@
     }
 
     public InetSocketAddress getRemoteAddress() {
-      return remoteId.getAddress();
+      return server;
     }
 
     /* Send a ping to the server if the time elapsed 
@@ -498,14 +505,18 @@
 
         Call call = calls.remove(id);
 
-        boolean isError = in.readBoolean();     // read if error
-        if (isError) {
-          call.setException(new RemoteException( WritableUtils.readString(in),
-              WritableUtils.readString(in)));
-        } else {
+        int state = in.readInt();     // read call status
+        if (state == Status.SUCCESS.state) {
           Writable value = ReflectionUtils.newInstance(valueClass, conf);
           value.readFields(in);                 // read value
           call.setValue(value);
+        } else if (state == Status.ERROR.state) {
+          call.setException(new RemoteException(WritableUtils.readString(in),
+                                                WritableUtils.readString(in)));
+        } else if (state == Status.FATAL.state) {
+          // Close the connection
+          markClosed(new RemoteException(WritableUtils.readString(in), 
+                                         WritableUtils.readString(in)));
         }
       } catch (IOException e) {
         markClosed(e);
@@ -551,7 +562,7 @@
       } else {
         // log the info
         if (LOG.isDebugEnabled()) {
-          LOG.debug("closing ipc connection to " + remoteId.address + ": " +
+          LOG.debug("closing ipc connection to " + server + ": " +
               closeException.getMessage(),closeException);
         }
 
@@ -673,17 +684,39 @@
 
   /** Make a call, passing <code>param</code>, to the IPC server running at
    * <code>address</code>, returning the value.  Throws exceptions if there are
-   * network problems or if the remote code threw an exception. */
+   * network problems or if the remote code threw an exception.
+   * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead 
+   */
+  @Deprecated
   public Writable call(Writable param, InetSocketAddress address)
   throws InterruptedException, IOException {
       return call(param, address, null);
   }
   
+  /** Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> with the <code>ticket</code> credentials, returning 
+   * the value.  
+   * Throws exceptions if there are network problems or if the remote code 
+   * threw an exception.
+   * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead 
+   */
+  @Deprecated
   public Writable call(Writable param, InetSocketAddress addr, 
-                       UserGroupInformation ticket)  
+      UserGroupInformation ticket)  
+      throws InterruptedException, IOException {
+    return call(param, addr, null, ticket);
+  }
+  
+  /** Make a call, passing <code>param</code>, to the IPC server running at
+   * <code>address</code> which is servicing the <code>protocol</code> protocol, 
+   * with the <code>ticket</code> credentials, returning the value.  
+   * Throws exceptions if there are network problems or if the remote code 
+   * threw an exception. */
+  public Writable call(Writable param, InetSocketAddress addr, 
+                       Class<?> protocol, UserGroupInformation ticket)  
                        throws InterruptedException, IOException {
     Call call = new Call(param);
-    Connection connection = getConnection(addr, ticket, call);
+    Connection connection = getConnection(addr, protocol, ticket, call);
     connection.sendParam(call);                 // send the parameter
     synchronized (call) {
       while (!call.done) {
@@ -736,11 +769,25 @@
     }
   }
 
+  /** 
+   * Makes a set of calls in parallel.  Each parameter is sent to the
+   * corresponding address.  When all values are available, or have timed out
+   * or errored, the collected results are returned in an array.  The array
+   * contains nulls for calls that timed out or errored.
+   * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, UserGroupInformation)} instead 
+   */
+  @Deprecated
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+    throws IOException {
+    return call(params, addresses, null, null);
+  }
+  
   /** Makes a set of calls in parallel.  Each parameter is sent to the
    * corresponding address.  When all values are available, or have timed out
    * or errored, the collected results are returned in an array.  The array
    * contains nulls for calls that timed out or errored.  */
-  public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
+  public Writable[] call(Writable[] params, InetSocketAddress[] addresses, 
+                         Class<?> protocol, UserGroupInformation ticket)
     throws IOException {
     if (addresses.length == 0) return new Writable[0];
 
@@ -749,7 +796,8 @@
       for (int i = 0; i < params.length; i++) {
         ParallelCall call = new ParallelCall(params[i], results, i);
         try {
-          Connection connection = getConnection(addresses[i], null, call);
+          Connection connection = 
+            getConnection(addresses[i], protocol, ticket, call);
           connection.sendParam(call);             // send each parameter
         } catch (IOException e) {
           // log errors
@@ -770,7 +818,8 @@
 
   /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given host/port are reused. */
-  private Connection getConnection(InetSocketAddress addr, 
+  private Connection getConnection(InetSocketAddress addr,
+                                   Class<?> protocol,
                                    UserGroupInformation ticket,
                                    Call call)
                                    throws IOException {
@@ -783,7 +832,7 @@
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
-    ConnectionId remoteId = new ConnectionId(addr, ticket);
+    ConnectionId remoteId = new ConnectionId(addr, protocol, ticket);
     do {
       synchronized (connections) {
         connection = connections.get(remoteId);
@@ -804,13 +853,17 @@
 
   /**
    * This class holds the address and the user ticket. The client connections
-   * to servers are uniquely identified by <remoteAddress, ticket>
+   * to servers are uniquely identified by <remoteAddress, protocol, ticket>
    */
   private static class ConnectionId {
     InetSocketAddress address;
     UserGroupInformation ticket;
+    Class<?> protocol;
+    private static final int PRIME = 16777619;
     
-    ConnectionId(InetSocketAddress address, UserGroupInformation ticket) {
+    ConnectionId(InetSocketAddress address, Class<?> protocol, 
+                 UserGroupInformation ticket) {
+      this.protocol = protocol;
       this.address = address;
       this.ticket = ticket;
     }
@@ -818,15 +871,22 @@
     InetSocketAddress getAddress() {
       return address;
     }
+    
+    Class<?> getProtocol() {
+      return protocol;
+    }
+    
     UserGroupInformation getTicket() {
       return ticket;
     }
     
+    
     @Override
     public boolean equals(Object obj) {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
-       return address.equals(id.address) && ticket == id.ticket;
+       return address.equals(id.address) && protocol == id.protocol && 
+              ticket == id.ticket;
        //Note : ticket is a ref comparision.
      }
      return false;
@@ -834,7 +894,8 @@
     
     @Override
     public int hashCode() {
-      return address.hashCode() ^ System.identityHashCode(ticket);
+      return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^ 
+             System.identityHashCode(ticket);
     }
   }  
 }

Added: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/ConnectionHeader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/ConnectionHeader.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/ConnectionHeader.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/ConnectionHeader.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * The IPC connection header sent by the client to the server
+ * on connection establishment.
+ */
+class ConnectionHeader implements Writable {
+  public static final Log LOG = LogFactory.getLog(ConnectionHeader.class);
+  
+  private String protocol;
+  private UserGroupInformation ugi = new UnixUserGroupInformation();
+  
+  public ConnectionHeader() {}
+  
+  /**
+   * Create a new {@link ConnectionHeader} with the given <code>protocol</code>
+   * and {@link UserGroupInformation}. 
+   * @param protocol protocol used for communication between the IPC client
+   *                 and the server
+   * @param ugi {@link UserGroupInformation} of the client communicating with
+   *            the server
+   */
+  public ConnectionHeader(String protocol, UserGroupInformation ugi) {
+    this.protocol = protocol;
+    this.ugi = ugi;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    protocol = Text.readString(in);
+    if (protocol.isEmpty()) {
+      protocol = null;
+    }
+    
+    boolean ugiPresent = in.readBoolean();
+    if (ugiPresent) {
+      ugi.readFields(in);
+    } else {
+      ugi = null;
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, (protocol == null) ? "" : protocol);
+    if (ugi != null) {
+      out.writeBoolean(true);
+      ugi.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  public String getProtocol() {
+    return protocol;
+  }
+
+  public UserGroupInformation getUgi() {
+    return ugi;
+  }
+
+  public String toString() {
+    return protocol + "-" + ugi;
+  }
+}

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/RPC.java Wed Dec 10 23:21:13 2008
@@ -30,15 +30,18 @@
 import java.io.*;
 import java.util.Map;
 import java.util.HashMap;
-import java.util.Collection;
 
 import javax.net.SocketFactory;
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginException;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
@@ -213,8 +216,10 @@
       if (logDebug) {
         startTime = System.currentTimeMillis();
       }
+
       ObjectWritable value = (ObjectWritable)
-        client.call(new Invocation(method, args), address, ticket);
+        client.call(new Invocation(method, args), address, 
+                    method.getDeclaringClass(), ticket);
       if (logDebug) {
         long callTime = System.currentTimeMillis() - startTime;
         LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -332,7 +337,13 @@
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, Configuration conf,
       SocketFactory factory) throws IOException {
-    return getProxy(protocol, clientVersion, addr, null, conf, factory);
+    UserGroupInformation ugi = null;
+    try {
+      ugi = UserGroupInformation.login(conf);
+    } catch (LoginException le) {
+      throw new RuntimeException("Couldn't login!");
+    }
+    return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
   }
   
   /** Construct a client-side proxy object that implements the named protocol,
@@ -383,17 +394,29 @@
     }
   }
 
-  /** Expert: Make multiple, parallel calls to a set of servers. */
+  /** 
+   * Expert: Make multiple, parallel calls to a set of servers.
+   * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead 
+   */
   public static Object[] call(Method method, Object[][] params,
                               InetSocketAddress[] addrs, Configuration conf)
     throws IOException {
+    return call(method, params, addrs, null, conf);
+  }
+  
+  /** Expert: Make multiple, parallel calls to a set of servers. */
+  public static Object[] call(Method method, Object[][] params,
+                              InetSocketAddress[] addrs, 
+                              UserGroupInformation ticket, Configuration conf)
+    throws IOException {
 
     Invocation[] invocations = new Invocation[params.length];
     for (int i = 0; i < params.length; i++)
       invocations[i] = new Invocation(method, params[i]);
     Client client = CLIENTS.getClient(conf);
     try {
-    Writable[] wrappedValues = client.call(invocations, addrs);
+    Writable[] wrappedValues = 
+      client.call(invocations, addrs, method.getDeclaringClass(), ticket);
     
     if (method.getReturnType() == Void.TYPE) {
       return null;
@@ -430,8 +453,8 @@
   /** An RPC Server. */
   public static class Server extends org.apache.hadoop.ipc.Server {
     private Object instance;
-    private Class<?> implementation;
     private boolean verbose;
+    private boolean authorize = false;
 
     /** Construct an RPC server.
      * @param instance the instance whose methods will be called
@@ -464,26 +487,32 @@
                   int numHandlers, boolean verbose) throws IOException {
       super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
       this.instance = instance;
-      this.implementation = instance.getClass();
       this.verbose = verbose;
+      this.authorize = 
+        conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
+                        false);
     }
 
-    public Writable call(Writable param, long receivedTime) throws IOException {
+    public Writable call(Class<?> protocol, Writable param, long receivedTime) 
+    throws IOException {
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
-        
+
         Method method =
-          implementation.getMethod(call.getMethodName(),
+          protocol.getMethod(call.getMethodName(),
                                    call.getParameterClasses());
+        method.setAccessible(true);
 
         long startTime = System.currentTimeMillis();
         Object value = method.invoke(instance, call.getParameters());
         int processingTime = (int) (System.currentTimeMillis() - startTime);
         int qTime = (int) (startTime-receivedTime);
-        LOG.debug("Served: " + call.getMethodName() +
-            " queueTime= " + qTime +
-            " procesingTime= " + processingTime);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Served: " + call.getMethodName() +
+                    " queueTime= " + qTime +
+                    " procesingTime= " + processingTime);
+        }
         rpcMetrics.rpcQueueTime.inc(qTime);
         rpcMetrics.rpcProcessingTime.inc(processingTime);
 
@@ -517,6 +546,21 @@
         throw ioe;
       }
     }
+
+    @Override
+    public void authorize(Subject user, ConnectionHeader connection) 
+    throws AuthorizationException {
+      if (authorize) {
+        Class<?> protocol = null;
+        try {
+          protocol = getProtocolClass(connection.getProtocol(), getConf());
+        } catch (ClassNotFoundException cfne) {
+          throw new AuthorizationException("Unknown protocol: " + 
+                                           connection.getProtocol());
+        }
+        ServiceAuthorizationManager.authorize(user, protocol);
+      }
+    }
   }
 
   private static void log(String value) {

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Server.java?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Server.java Wed Dec 10 23:21:13 2008
@@ -40,25 +40,31 @@
 import java.net.SocketException;
 import java.net.UnknownHostException;
 
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import javax.security.auth.Subject;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -74,18 +80,31 @@
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
   
   // 1 : Introduce ping and server does not throw away RPCs
-  public static final byte CURRENT_VERSION = 2;
+  // 3 : Introduce the protocol into the RPC connection header
+  public static final byte CURRENT_VERSION = 3;
   
   /**
    * How many calls/handler are allowed in the queue.
    */
   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
   
-  public static final Log LOG =
-    LogFactory.getLog(Server.class);
+  public static final Log LOG = LogFactory.getLog(Server.class);
 
   private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
 
+  private static final Map<String, Class<?>> PROTOCOL_CACHE = 
+    new ConcurrentHashMap<String, Class<?>>();
+  
+  static Class<?> getProtocolClass(String protocolName, Configuration conf) 
+  throws ClassNotFoundException {
+    Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
+    if (protocol == null) {
+      protocol = conf.getClassByName(protocolName);
+      PROTOCOL_CACHE.put(protocolName, protocol);
+    }
+    return protocol;
+  }
+  
   /** Returns the server instance called under or null.  May be called under
    * {@link #call(Writable, long)} implementations, and under {@link Writable}
    * methods of paramters and return values.  Permits applications to access
@@ -191,7 +210,7 @@
                                    // the time served when response is not null
     private ByteBuffer response;                      // the response for this call
 
-    public Call(int id, Writable param, Connection connection) {
+    public Call(int id, Writable param, Connection connection) { 
       this.id = id;
       this.param = param;
       this.connection = connection;
@@ -397,9 +416,10 @@
       try {
         count = c.readAndProcess();
       } catch (InterruptedException ieo) {
+        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
         throw ieo;
       } catch (Exception e) {
-        LOG.debug(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
+        LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {
@@ -679,6 +699,7 @@
                                          //version are read
     private boolean headerRead = false;  //if the connection header that
                                          //follows version is read.
+
     private SocketChannel channel;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
@@ -691,8 +712,18 @@
     // disconnected, we can say where it used to connect to.
     private String hostAddress;
     private int remotePort;
-    private UserGroupInformation ticket = null;
+    
+    ConnectionHeader header = new ConnectionHeader();
+    Class<?> protocol;
+    
+    Subject user = null;
 
+    // Fake 'call' for failed authorization response
+    private final int AUTHROIZATION_FAILED_CALLID = -1;
+    private final Call authFailedCall = 
+      new Call(AUTHROIZATION_FAILED_CALLID, null, null);
+    private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
+    
     public Connection(SelectionKey key, SocketChannel channel, 
                       long lastContact) {
       this.channel = channel;
@@ -816,6 +847,25 @@
             processHeader();
             headerRead = true;
             data = null;
+            
+            // Authorize the connection
+            try {
+              authorize(user, header);
+              
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Successfully authorized " + header);
+              }
+            } catch (AuthorizationException ae) {
+              authFailedCall.connection = this;
+              setupResponse(authFailedResponse, authFailedCall, 
+                            Status.FATAL, null, 
+                            ae.getClass().getName(), ae.getMessage());
+              responder.doRespond(authFailedCall);
+              
+              // Close this connection
+              return -1;
+            }
+
             continue;
           }
         } 
@@ -823,14 +873,23 @@
       }
     }
 
-    /// Reads the header following version
+    /// Reads the connection header following version
     private void processHeader() throws IOException {
-      /* In the current version, it is just a ticket.
-       * Later we could introduce a "ConnectionHeader" class.
-       */
       DataInputStream in =
         new DataInputStream(new ByteArrayInputStream(data.array()));
-      ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
+      header.readFields(in);
+      try {
+        String protocolClassName = header.getProtocol();
+        if (protocolClassName != null) {
+          protocol = getProtocolClass(header.getProtocol(), conf);
+        }
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException("Unknown protocol: " + header.getProtocol());
+      }
+      
+      // TODO: Get the user name from the GSS API for Kerberbos-based security
+      // Create the user subject
+      user = SecurityUtil.getSubject(header.getUgi());
     }
     
     private void processData() throws  IOException, InterruptedException {
@@ -840,7 +899,7 @@
         
       if (LOG.isDebugEnabled())
         LOG.debug(" got #" + id);
-            
+
       Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
       param.readFields(dis);        
         
@@ -875,7 +934,7 @@
       ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
       while (running) {
         try {
-          Call call = callQueue.take(); // pop the queue; maybe blocked here
+          final Call call = callQueue.take(); // pop the queue; maybe blocked here
 
           if (LOG.isDebugEnabled())
             LOG.debug(getName() + ": has #" + call.id + " from " +
@@ -884,32 +943,39 @@
           String errorClass = null;
           String error = null;
           Writable value = null;
-          
+
           CurCall.set(call);
-          UserGroupInformation previous = UserGroupInformation.getCurrentUGI();
-          UserGroupInformation.setCurrentUGI(call.connection.ticket);
           try {
-            value = call(call.param, call.timestamp);             // make the call
+            // Make the call as the user via Subject.doAs, thus associating
+            // the call with the Subject
+            value = 
+              Subject.doAs(call.connection.user, 
+                           new PrivilegedExceptionAction<Writable>() {
+                              @Override
+                              public Writable run() throws Exception {
+                                // make the call
+                                return call(call.connection.protocol, 
+                                            call.param, call.timestamp);
+
+                              }
+                           }
+                          );
+              
+          } catch (PrivilegedActionException pae) {
+            Exception e = pae.getException();
+            LOG.info(getName()+", call "+call+": error: " + e, e);
+            errorClass = e.getClass().getName();
+            error = StringUtils.stringifyException(e);
           } catch (Throwable e) {
             LOG.info(getName()+", call "+call+": error: " + e, e);
             errorClass = e.getClass().getName();
             error = StringUtils.stringifyException(e);
           }
-          UserGroupInformation.setCurrentUGI(previous);
           CurCall.set(null);
 
-          buf.reset();
-          DataOutputStream out = new DataOutputStream(buf);
-          out.writeInt(call.id);                // write call id
-          out.writeBoolean(error != null);      // write error flag
-
-          if (error == null) {
-            value.write(out);
-          } else {
-            WritableUtils.writeString(out, errorClass);
-            WritableUtils.writeString(out, error);
-          }
-          call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
+          setupResponse(buf, call, 
+                        (error == null) ? Status.SUCCESS : Status.ERROR, 
+                        value, errorClass, error);
           responder.doRespond(call);
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
@@ -977,6 +1043,39 @@
     }
   }
   
+  /**
+   * Setup response for the IPC Call.
+   * 
+   * @param response buffer to serialize the response into
+   * @param call {@link Call} to which we are setting up the response
+   * @param status {@link Status} of the IPC call
+   * @param rv return value for the IPC Call, if the call was successful
+   * @param errorClass error class, if the the call failed
+   * @param error error message, if the call failed
+   * @throws IOException
+   */
+  private void setupResponse(ByteArrayOutputStream response, 
+                             Call call, Status status, 
+                             Writable rv, String errorClass, String error) 
+  throws IOException {
+    response.reset();
+    DataOutputStream out = new DataOutputStream(response);
+    out.writeInt(call.id);                // write call id
+    out.writeInt(status.state);           // write status
+
+    if (status == Status.SUCCESS) {
+      rv.write(out);
+    } else {
+      WritableUtils.writeString(out, errorClass);
+      WritableUtils.writeString(out, error);
+    }
+    call.setResponse(ByteBuffer.wrap(response.toByteArray()));
+  }
+  
+  Configuration getConf() {
+    return conf;
+  }
+  
   /** Sets the socket buffer size used for responding to RPCs */
   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
 
@@ -1030,10 +1129,29 @@
     return listener.getAddress();
   }
   
+  /** 
+   * Called for each call. 
+   * @deprecated Use {@link #call(Class, Writable, long)} instead
+   */
+  @Deprecated
+  public Writable call(Writable param, long receiveTime) throws IOException {
+    return call(null, param, receiveTime);
+  }
+  
   /** Called for each call. */
-  public abstract Writable call(Writable param, long receiveTime)
-                                                throws IOException;
+  public abstract Writable call(Class<?> protocol,
+                               Writable param, long receiveTime)
+  throws IOException;
   
+  /**
+   * Authorize the incoming client connection.
+   * 
+   * @param user client user
+   * @param connection incoming connection
+   * @throws AuthorizationException when the client isn't authorized to talk the protocol
+   */
+  public void authorize(Subject user, ConnectionHeader connection) 
+  throws AuthorizationException {}
   
   /**
    * The number of open RPC conections

Added: hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Status.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Status.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Status.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/ipc/Status.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+/**
+ * Status of a Hadoop IPC call.
+ */
+enum Status {
+  SUCCESS (0),
+  ERROR (1),
+  FATAL (-1);
+  
+  int state;
+  private Status(int state) {
+    this.state = state;
+  }
+}

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlException.java?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlException.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/AccessControlException.java Wed Dec 10 23:21:13 2008
@@ -40,4 +40,17 @@
    * @param s the detail message.
    */
   public AccessControlException(String s) {super(s);}
+  
+  /**
+   * Constructs a new exception with the specified cause and a detail
+   * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+   * typically contains the class and detail message of <tt>cause</tt>).
+   * @param  cause the cause (which is saved for later retrieval by the
+   *         {@link #getCause()} method).  (A <tt>null</tt> value is
+   *         permitted, and indicates that the cause is nonexistent or
+   *         unknown.)
+   */
+  public AccessControlException(Throwable cause) {
+    super(cause);
+  }
 }

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/Group.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/Group.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/Group.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/Group.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.security.Principal;
+
+/**
+ * A group to which a user belongs to.
+ */
+public class Group implements Principal {
+  final String group;
+
+  /**
+   * Create a new <code>Group</code> with the given groupname.
+   * @param group group name
+   */
+  public Group(String group) {
+    this.group = group;
+  }
+
+  @Override
+  public String getName() {
+    return group;
+  }
+
+  @Override
+  public String toString() {
+    return group;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((group == null) ? 0 : group.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    Group other = (Group) obj;
+    if (group == null) {
+      if (other.group != null)
+        return false;
+    } else if (!group.equals(other.group))
+      return false;
+    return true;
+  }
+}

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/SecurityUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/SecurityUtil.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/SecurityUtil.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/SecurityUtil.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.security.Policy;
+import java.security.Principal;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.security.auth.Subject;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.ConfiguredPolicy;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+
+public class SecurityUtil {
+
+  private static final Log LOG = LogFactory.getLog(SecurityUtil.class);
+  
+  static {
+    // Set an empty default policy
+    setPolicy(new ConfiguredPolicy(new Configuration(), 
+                                   PolicyProvider.DEFAULT_POLICY_PROVIDER));
+  }
+  
+  /**
+   * Set the global security policy for Hadoop.
+   * 
+   * @param policy {@link Policy} used for authorization.
+   */
+  public static void setPolicy(Policy policy) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Setting Hadoop security policy");
+    }
+    Policy.setPolicy(policy);
+  }
+
+  /**
+   * Get the current global security policy for Hadoop.
+   * @return the current {@link Policy}
+   */
+  public static Policy getPolicy() {
+    return Policy.getPolicy();
+  }
+  
+  /**
+   * Get the {@link Subject} for the user identified by <code>ugi</code>.
+   * @param ugi user
+   * @return the {@link Subject} for the user identified by <code>ugi</code>
+   */
+  public static Subject getSubject(UserGroupInformation ugi) {
+    if (ugi == null) {
+      return null;
+    }
+    
+    Set<Principal> principals =       // Number of principals = username + #groups 
+      new HashSet<Principal>(ugi.getGroupNames().length+1);
+    User userPrincipal = new User(ugi.getUserName()); 
+    principals.add(userPrincipal);
+    for (String group : ugi.getGroupNames()) {
+      Group groupPrincipal = new Group(group);
+      principals.add(groupPrincipal);
+    }
+    principals.add(ugi);
+    Subject user = 
+      new Subject(false, principals, new HashSet<Object>(), new HashSet<Object>());
+    
+    return user;
+  }
+  
+  /**
+   * Class representing a configured access control list.
+   */
+  public static class AccessControlList {
+    
+    // Indicates an ACL string that represents access to all users
+    public static final String WILDCARD_ACL_VALUE = "*";
+
+    // Set of users who are granted access.
+    private Set<String> users;
+    // Set of groups which are granted access
+    private Set<String> groups;
+    // Whether all users are granted access.
+    private boolean allAllowed;
+    
+    /**
+     * Construct a new ACL from a String representation of the same.
+     * 
+     * The String is a a comma separated list of users and groups.
+     * The user list comes first and is separated by a space followed 
+     * by the group list. For e.g. "user1,user2 group1,group2"
+     * 
+     * @param aclString String representation of the ACL
+     */
+    public AccessControlList(String aclString) {
+      users = new TreeSet<String>();
+      groups = new TreeSet<String>();
+      if (aclString.contains(WILDCARD_ACL_VALUE) && 
+          aclString.trim().equals(WILDCARD_ACL_VALUE)) {
+        allAllowed = true;
+      } else {
+        String[] userGroupStrings = aclString.split(" ", 2);
+        
+        if (userGroupStrings.length >= 1) {
+          String[] usersStr = userGroupStrings[0].split(",");
+          if (usersStr.length >= 1) {
+            addToSet(users, usersStr);
+          }
+        }
+        
+        if (userGroupStrings.length == 2) {
+          String[] groupsStr = userGroupStrings[1].split(",");
+          if (groupsStr.length >= 1) {
+            addToSet(groups, groupsStr);
+          }
+        }
+      }
+    }
+    
+    public boolean allAllowed() {
+      return allAllowed;
+    }
+    
+    public Set<String> getUsers() {
+      return users;
+    }
+    
+    public Set<String> getGroups() {
+      return groups;
+    }
+    
+    private static final void addToSet(Set<String> set, String[] strings) {
+      for (String s : strings) {
+        s = s.trim();
+        if (s.length() > 0) {
+          set.add(s);
+        }
+      }
+    }
+  }
+}

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/security/UnixUserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/UnixUserGroupInformation.java?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/UnixUserGroupInformation.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/UnixUserGroupInformation.java Wed Dec 10 23:21:13 2008
@@ -424,4 +424,9 @@
     }
     return buf.toString();
   }
+
+  @Override
+  public String getName() {
+    return toString();
+  }
 }

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/User.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/User.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/User.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/User.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import java.security.Principal;
+
+/**
+ * The username of a user.
+ */
+public class User implements Principal {
+  final String user;
+
+  /**
+   * Create a new <code>User</code> with the given username.
+   * @param user user name
+   */
+  public User(String user) {
+    this.user = user;
+  }
+  
+  @Override
+  public String getName() {
+    return user;
+  }
+
+  @Override
+  public String toString() {
+    return user;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((user == null) ? 0 : user.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    User other = (User) obj;
+    if (user == null) {
+      if (other.user != null)
+        return false;
+    } else if (!user.equals(other.user))
+      return false;
+    return true;
+  }
+}

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/security/UserGroupInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/UserGroupInformation.java?rev=725603&r1=725602&r2=725603&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/UserGroupInformation.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/UserGroupInformation.java Wed Dec 10 23:21:13 2008
@@ -18,7 +18,11 @@
 package org.apache.hadoop.security;
 
 import java.io.IOException;
+import java.security.AccessController;
+import java.security.Principal;
+import java.util.Set;
 
+import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
 
 import org.apache.commons.logging.Log;
@@ -28,26 +32,69 @@
 
 /** A {@link Writable} abstract class for storing user and groups information.
  */
-public abstract class UserGroupInformation implements Writable {
+public abstract class UserGroupInformation implements Writable, Principal {
   public static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
   private static UserGroupInformation LOGIN_UGI = null;
-
-  private static final ThreadLocal<UserGroupInformation> currentUGI
-    = new ThreadLocal<UserGroupInformation>();
-
+  
+  private static final ThreadLocal<Subject> currentUser =
+    new ThreadLocal<Subject>();
+  
   /** @return the {@link UserGroupInformation} for the current thread */ 
   public static UserGroupInformation getCurrentUGI() {
-    return currentUGI.get();
+    Subject user = getCurrentUser();
+    
+    if (user == null) {
+      user = currentUser.get();
+      if (user == null) {
+        return null;
+      }
+    }
+    
+    Set<UserGroupInformation> ugiPrincipals = 
+      user.getPrincipals(UserGroupInformation.class);
+    
+    UserGroupInformation ugi = null;
+    if (ugiPrincipals != null && ugiPrincipals.size() == 1) {
+      ugi = ugiPrincipals.iterator().next();
+      if (ugi == null) {
+        throw new RuntimeException("Cannot find _current user_ UGI in the Subject!");
+      }
+    } else {
+      throw new RuntimeException("Cannot resolve current user from subject, " +
+      		                       "which had " + ugiPrincipals.size() + 
+      		                       " UGI principals!");
+    }
+    return ugi;
   }
 
-  /** Set the {@link UserGroupInformation} for the current thread */ 
+  /** 
+   * Set the {@link UserGroupInformation} for the current thread
+   * @deprecated Use {@link #setCurrentUser(UserGroupInformation)} 
+   */ 
+  @Deprecated
   public static void setCurrentUGI(UserGroupInformation ugi) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(Thread.currentThread().getName() + ", ugi=" + ugi);
-    }
-    currentUGI.set(ugi);
+    setCurrentUser(ugi);
   }
 
+  /**
+   * Return the current user <code>Subject</code>.
+   * @return the current user <code>Subject</code>
+   */
+  static Subject getCurrentUser() {
+    return Subject.getSubject(AccessController.getContext());
+  }
+  
+  /**
+   * Set the {@link UserGroupInformation} for the current thread
+   * WARNING - This method should be used only in test cases and other exceptional
+   * cases!
+   * @param ugi {@link UserGroupInformation} for the current thread
+   */
+  public static void setCurrentUser(UserGroupInformation ugi) {
+    Subject user = SecurityUtil.getSubject(ugi);
+    currentUser.set(user);
+  }
+  
   /** Get username
    * 
    * @return the user's name

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/AuthorizationException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/AuthorizationException.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/AuthorizationException.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/AuthorizationException.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.security.AccessControlException;
+
+/**
+ * An exception class for authorization-related issues.
+ * 
+ * This class <em>does not</em> provide the stack trace for security purposes.
+ */
+public class AuthorizationException extends AccessControlException {
+  private static final long serialVersionUID = 1L;
+
+  public AuthorizationException() {
+    super();
+  }
+
+  public AuthorizationException(String message) {
+    super(message);
+  }
+  
+  /**
+   * Constructs a new exception with the specified cause and a detail
+   * message of <tt>(cause==null ? null : cause.toString())</tt> (which
+   * typically contains the class and detail message of <tt>cause</tt>).
+   * @param  cause the cause (which is saved for later retrieval by the
+   *         {@link #getCause()} method).  (A <tt>null</tt> value is
+   *         permitted, and indicates that the cause is nonexistent or
+   *         unknown.)
+   */
+  public AuthorizationException(Throwable cause) {
+    super(cause);
+  }
+  
+  private static StackTraceElement[] stackTrace = new StackTraceElement[0];
+  @Override
+  public StackTraceElement[] getStackTrace() {
+    // Do not provide the stack-trace
+    return stackTrace;
+  }
+
+  @Override
+  public void printStackTrace() {
+    // Do not provide the stack-trace
+  }
+
+  @Override
+  public void printStackTrace(PrintStream s) {
+    // Do not provide the stack-trace
+  }
+
+  @Override
+  public void printStackTrace(PrintWriter s) {
+    // Do not provide the stack-trace
+  }
+  
+}

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConfiguredPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConfiguredPolicy.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConfiguredPolicy.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConfiguredPolicy.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Permission;
+import java.security.PermissionCollection;
+import java.security.Policy;
+import java.security.Principal;
+import java.security.ProtectionDomain;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Group;
+import org.apache.hadoop.security.User;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+
+/**
+ * A {@link Configuration} based security {@link Policy} for Hadoop.
+ *
+ * {@link ConfiguredPolicy} works in conjunction with a {@link PolicyProvider}
+ * for providing service-level authorization for Hadoop.
+ */
+public class ConfiguredPolicy extends Policy implements Configurable {
+  public static final String HADOOP_POLICY_FILE = "hadoop-policy.xml";
+  private static final Log LOG = LogFactory.getLog(ConfiguredPolicy.class);
+      
+  private Configuration conf;
+  private PolicyProvider policyProvider;
+  private volatile Map<Principal, Set<Permission>> permissions;
+  private volatile Set<Permission> allowedPermissions;
+
+  public ConfiguredPolicy(Configuration conf, PolicyProvider policyProvider) {
+    this.conf = conf;      
+    this.policyProvider = policyProvider;
+    refresh();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    refresh();
+  }
+
+  @Override
+  public boolean implies(ProtectionDomain domain, Permission permission) {
+    // Only make checks for domains having principals 
+    if(domain.getPrincipals().length == 0) {
+      return true; 
+    }
+
+    return super.implies(domain, permission);
+  }
+
+  @Override
+  public PermissionCollection getPermissions(ProtectionDomain domain) {
+    PermissionCollection permissionCollection = super.getPermissions(domain);
+    for (Principal principal : domain.getPrincipals()) {
+      Set<Permission> principalPermissions = permissions.get(principal);
+      if (principalPermissions != null) {
+        for (Permission permission : principalPermissions) {
+          permissionCollection.add(permission);
+        }
+      }
+
+      for (Permission permission : allowedPermissions) {
+        permissionCollection.add(permission);
+      }
+    }
+    return permissionCollection;
+  }
+
+  @Override
+  public void refresh() {
+    // Get the system property 'hadoop.policy.file'
+    String policyFile = 
+      System.getProperty("hadoop.policy.file", HADOOP_POLICY_FILE);
+    
+    // Make a copy of the original config, and load the policy file
+    Configuration policyConf = new Configuration(conf);
+    policyConf.addResource(policyFile);
+    
+    Map<Principal, Set<Permission>> newPermissions = 
+      new HashMap<Principal, Set<Permission>>();
+    Set<Permission> newAllowPermissions = new HashSet<Permission>();
+
+    // Parse the config file
+    Service[] services = policyProvider.getServices();
+    if (services != null) {
+      for (Service service : services) {
+        AccessControlList acl = 
+          new AccessControlList(
+              policyConf.get(service.getServiceKey(), 
+                             AccessControlList.WILDCARD_ACL_VALUE)
+              );
+        
+        if (acl.allAllowed()) {
+          newAllowPermissions.add(service.getPermission());
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Policy - " + service.getPermission() + " * ");
+          }
+        } else {
+          for (String user : acl.getUsers()) {
+            addPermission(newPermissions, new User(user), service.getPermission());
+          }
+
+          for (String group : acl.getGroups()) {
+            addPermission(newPermissions, new Group(group), service.getPermission());
+          }
+        }
+      }
+    }
+
+    // Flip to the newly parsed permissions
+    allowedPermissions = newAllowPermissions;
+    permissions = newPermissions;
+  }
+
+  private void addPermission(Map<Principal, Set<Permission>> permissions,
+                             Principal principal, Permission permission) {
+    Set<Permission> principalPermissions = permissions.get(principal);
+    if (principalPermissions == null) {
+      principalPermissions = new HashSet<Permission>();
+      permissions.put(principal, principalPermissions);
+    }
+    principalPermissions.add(permission);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Policy - Adding  " + permission + " to " + principal);
+    }
+  }
+}

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConnectionPermission.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConnectionPermission.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConnectionPermission.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/ConnectionPermission.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Permission;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * {@link Permission} to initiate a connection to a given service.
+ */
+public class ConnectionPermission extends Permission {
+
+  private static final long serialVersionUID = 1L;
+  private final Class<?> protocol;
+
+  /**
+   * {@link ConnectionPermission} for a given service.
+   * @param protocol service to be accessed
+   */
+  public ConnectionPermission(Class<?> protocol) {
+    super(protocol.getName());
+    this.protocol = protocol;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ConnectionPermission) {
+      return protocol == ((ConnectionPermission)obj).protocol;
+    }
+    return false;
+  }
+
+  @Override
+  public String getActions() {
+    return "ALLOW";
+  }
+
+  @Override
+  public int hashCode() {
+    return protocol.hashCode();
+  }
+
+  @Override
+  public boolean implies(Permission permission) {
+    if (permission instanceof ConnectionPermission) {
+      ConnectionPermission that = (ConnectionPermission)permission;
+      if (that.protocol.equals(VersionedProtocol.class)) {
+        return true;
+      }
+      return this.protocol.equals(that.protocol);
+    }
+    return false;
+  }
+
+  public String toString() {
+    return "ConnectionPermission(" + protocol.getName() + ")";
+  }
+}

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/PolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/PolicyProvider.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/PolicyProvider.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/PolicyProvider.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Policy;
+
+/**
+ * {@link PolicyProvider} provides the {@link Service} definitions to the
+ * security {@link Policy} in effect for Hadoop.
+ *
+ */
+public abstract class PolicyProvider {
+
+  /**
+   * Configuration key for the {@link PolicyProvider} implementation.
+   */
+  public static final String POLICY_PROVIDER_CONFIG = 
+    "hadoop.security.authorization.policyprovider";
+  
+  /**
+   * A default {@link PolicyProvider} without any defined services.
+   */
+  public static final PolicyProvider DEFAULT_POLICY_PROVIDER =
+    new PolicyProvider() {
+    public Service[] getServices() {
+      return null;
+    }
+  };
+  
+  /**
+   * Get the {@link Service} definitions from the {@link PolicyProvider}.
+   * @return the {@link Service} definitions
+   */
+  public abstract Service[] getServices();
+}

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/RefreshAuthorizationPolicyProtocol.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * Protocol which is used to refresh the authorization policy in use currently.
+ */
+public interface RefreshAuthorizationPolicyProtocol extends VersionedProtocol {
+  
+  /**
+   * Version 1: Initial version
+   */
+  public static final long versionID = 1L;
+
+  /**
+   * Refresh the service-level authorization policy in-effect.
+   * @throws IOException
+   */
+  void refreshServiceAcl() throws IOException;
+}

Added: hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/Service.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/Service.java?rev=725603&view=auto
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/Service.java (added)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/security/authorize/Service.java Wed Dec 10 23:21:13 2008
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security.authorize;
+
+import java.security.Permission;
+
+/**
+ * An abstract definition of <em>service</em> as related to 
+ * Service Level Authorization for Hadoop.
+ * 
+ * Each service defines it's configuration key and also the necessary
+ * {@link Permission} required to access the service.
+ */
+public class Service {
+  private String key;
+  private Permission permission;
+  
+  public Service(String key, Class<?> protocol) {
+    this.key = key;
+    this.permission = new ConnectionPermission(protocol);
+  }
+  
+  /**
+   * Get the configuration key for the service.
+   * @return the configuration key for the service
+   */
+  public String getServiceKey() {
+    return key;
+  }
+  
+  /**
+   * Get the {@link Permission} required to access the service.
+   * @return the {@link Permission} required to access the service
+   */
+  public Permission getPermission() {
+    return permission;
+  }
+}