You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2012/03/08 01:57:13 UTC

svn commit: r1298245 - in /hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common: ./ dev-support/ src/main/java/org/apache/hadoop/ipc/ src/main/java/org/apache/hadoop/util/ src/main/proto/

Author: suresh
Date: Thu Mar  8 00:57:13 2012
New Revision: 1298245

URL: http://svn.apache.org/viewvc?rev=1298245&view=rev
Log:
HADOOP-7557. Merge r1295261 from trunk to 0.23

Added:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/IpcException.java
      - copied unchanged from r1295261, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/IpcException.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/proto/IpcConnectionContext.proto
      - copied unchanged from r1295261, hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/proto/IpcConnectionContext.proto
Removed:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ConnectionHeader.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/CHANGES.txt Thu Mar  8 00:57:13 2012
@@ -79,6 +79,8 @@ Release 0.23.3 - UNRELEASED
     HADOOP-8108. Move method getHostPortString() from NameNode to NetUtils.
     (Brandon Li via jitendra)
 
+    HADOOP-7557 Make IPC header be extensible (sanjay radia)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml Thu Mar  8 00:57:13 2012
@@ -278,4 +278,8 @@
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
     </Match>
+		<Match>
+      <!-- protobuf generated code -->
+      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
+    </Match>
  </FindBugsFilter>

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java Thu Mar  8 00:57:13 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.RpcPayloadHeader.*;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -66,6 +67,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
@@ -211,7 +213,7 @@ public class Client {
   private class Connection extends Thread {
     private InetSocketAddress server;             // server ip:port
     private String serverPrincipal;  // server's krb5 principal name
-    private ConnectionHeader header;              // connection header
+    private IpcConnectionContextProto connectionContext;   // connection context
     private final ConnectionId remoteId;                // connection id
     private AuthMethod authMethod; // authentication method
     private boolean useSasl;
@@ -292,8 +294,8 @@ public class Client {
         authMethod = AuthMethod.KERBEROS;
       }
       
-      header = 
-        new ConnectionHeader(RPC.getProtocolName(protocol), ticket, authMethod);
+      connectionContext = ProtoUtil.makeIpcConnectionContext(
+          RPC.getProtocolName(protocol), ticket, authMethod);
       
       if (LOG.isDebugEnabled())
         LOG.debug("Use " + authMethod + " authentication for protocol "
@@ -563,7 +565,7 @@ public class Client {
           setupConnection();
           InputStream inStream = NetUtils.getInputStream(socket);
           OutputStream outStream = NetUtils.getOutputStream(socket);
-          writeRpcHeader(outStream);
+          writeConnectionHeader(outStream);
           if (useSasl) {
             final InputStream in2 = inStream;
             final OutputStream out2 = outStream;
@@ -597,8 +599,11 @@ public class Client {
             } else {
               // fall back to simple auth because server told us so.
               authMethod = AuthMethod.SIMPLE;
-              header = new ConnectionHeader(header.getProtocol(), header
-                  .getUgi(), authMethod);
+              // remake the connectionContext             
+              connectionContext = ProtoUtil.makeIpcConnectionContext(
+                  connectionContext.getProtocol(), 
+                  ProtoUtil.getUgi(connectionContext.getUserInfo()),
+                  authMethod);
               useSasl = false;
             }
           }
@@ -678,13 +683,26 @@ public class Client {
           ". Already tried " + curRetries + " time(s).");
     }
 
-    /* Write the RPC header */
-    private void writeRpcHeader(OutputStream outStream) throws IOException {
+    /**
+     * Write the connection header - this is sent when connection is established
+     * +----------------------------------+
+     * |  "hrpc" 4 bytes                  |      
+     * +----------------------------------+
+     * |  Version (1 bytes)               |      
+     * +----------------------------------+
+     * |  Authmethod (1 byte)             |      
+     * +----------------------------------+
+     * |  IpcSerializationType (1 byte)   |      
+     * +----------------------------------+
+     */
+    private void writeConnectionHeader(OutputStream outStream)
+        throws IOException {
       DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
       // Write out the header, version and authentication method
       out.write(Server.HEADER.array());
       out.write(Server.CURRENT_VERSION);
       authMethod.write(out);
+      Server.IpcSerializationType.PROTOBUF.write(out);
       out.flush();
     }
     
@@ -694,7 +712,7 @@ public class Client {
     private void writeHeader() throws IOException {
       // Write out the ConnectionHeader
       DataOutputBuffer buf = new DataOutputBuffer();
-      header.write(buf);
+      connectionContext.writeTo(buf);
       
       // Write out the payload length
       int bufLen = buf.getLength();
@@ -1261,16 +1279,16 @@ public class Client {
   public static class ConnectionId {
     InetSocketAddress address;
     UserGroupInformation ticket;
-    Class<?> protocol;
+    final Class<?> protocol;
     private static final int PRIME = 16777619;
-    private int rpcTimeout;
-    private String serverPrincipal;
-    private int maxIdleTime; //connections will be culled if it was idle for 
+    private final int rpcTimeout;
+    private final String serverPrincipal;
+    private final int maxIdleTime; //connections will be culled if it was idle for 
     //maxIdleTime msecs
-    private int maxRetries; //the max. no. of retries for socket connections
-    private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
-    private boolean doPing; //do we need to send ping message
-    private int pingInterval; // how often sends ping to the server in msecs
+    private final int maxRetries; //the max. no. of retries for socket connections
+    private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
+    private final boolean doPing; //do we need to send ping message
+    private final int pingInterval; // how often sends ping to the server in msecs
     
     ConnectionId(InetSocketAddress address, Class<?> protocol, 
                  UserGroupInformation ticket, int rpcTimeout,

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java Thu Mar  8 00:57:13 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.ipc;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.BindException;
@@ -74,6 +75,7 @@ import org.apache.hadoop.ipc.RpcPayloadH
 import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
 import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.metrics.RpcMetrics;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SaslRpcServer;
@@ -90,6 +92,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ProtoUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -111,6 +114,22 @@ public abstract class Server {
   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
   
   /**
+   * Serialization type for ConnectionContext and RpcPayloadHeader
+   */
+  public enum IpcSerializationType {
+    // Add new serialization type to the end without affecting the enum order
+    PROTOBUF;
+    
+    void write(DataOutput out) throws IOException {
+      out.writeByte(this.ordinal());
+    }
+    
+    static IpcSerializationType fromByte(byte b) {
+      return IpcSerializationType.values()[b];
+    }
+  }
+  
+  /**
    * If the user accidentally sends an HTTP GET to an IPC port, we detect this
    * and send back a nicer response.
    */
@@ -133,7 +152,8 @@ public abstract class Server {
   // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
   //     in ObjectWritable to efficiently transmit arrays of primitives
   // 6 : Made RPC payload header explicit
-  public static final byte CURRENT_VERSION = 6;
+  // 7 : Changed Ipc Connection Header to use Protocol buffers
+  public static final byte CURRENT_VERSION = 7;
 
   /**
    * Initial and max size of response buffer
@@ -968,9 +988,9 @@ public abstract class Server {
 
   /** Reads calls from a connection and queues them for handling. */
   public class Connection {
-    private boolean rpcHeaderRead = false; // if initial rpc header is read
-    private boolean headerRead = false;  //if the connection header that
-                                         //follows version is read.
+    private boolean connectionHeaderRead = false; // connection  header is read?
+    private boolean connectionContextRead = false; //if connection context that
+                                            //follows connection header is read
 
     private SocketChannel channel;
     private ByteBuffer data;
@@ -986,14 +1006,14 @@ public abstract class Server {
     private int remotePort;
     private InetAddress addr;
     
-    ConnectionHeader header = new ConnectionHeader();
+    IpcConnectionContextProto connectionContext;
     String protocolName;
     boolean useSasl;
     SaslServer saslServer;
     private AuthMethod authMethod;
     private boolean saslContextEstablished;
     private boolean skipInitialSaslHandshake;
-    private ByteBuffer rpcHeaderBuffer;
+    private ByteBuffer connectionHeaderBuf = null;
     private ByteBuffer unwrappedData;
     private ByteBuffer unwrappedDataLengthBuffer;
     
@@ -1241,17 +1261,17 @@ public abstract class Server {
             return count;
         }
         
-        if (!rpcHeaderRead) {
+        if (!connectionHeaderRead) {
           //Every connection is expected to send the header.
-          if (rpcHeaderBuffer == null) {
-            rpcHeaderBuffer = ByteBuffer.allocate(2);
+          if (connectionHeaderBuf == null) {
+            connectionHeaderBuf = ByteBuffer.allocate(3);
           }
-          count = channelRead(channel, rpcHeaderBuffer);
-          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
+          count = channelRead(channel, connectionHeaderBuf);
+          if (count < 0 || connectionHeaderBuf.remaining() > 0) {
             return count;
           }
-          int version = rpcHeaderBuffer.get(0);
-          byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
+          int version = connectionHeaderBuf.get(0);
+          byte[] method = new byte[] {connectionHeaderBuf.get(1)};
           authMethod = AuthMethod.read(new DataInputStream(
               new ByteArrayInputStream(method)));
           dataLengthBuffer.flip();
@@ -1273,6 +1293,14 @@ public abstract class Server {
             setupBadVersionResponse(version);
             return -1;
           }
+          
+          IpcSerializationType serializationType = IpcSerializationType
+              .fromByte(connectionHeaderBuf.get(2));
+          if (serializationType != IpcSerializationType.PROTOBUF) {
+            respondUnsupportedSerialization(serializationType);
+            return -1;
+          }
+          
           dataLengthBuffer.clear();
           if (authMethod == null) {
             throw new IOException("Unable to read authentication method");
@@ -1302,8 +1330,8 @@ public abstract class Server {
             useSasl = true;
           }
           
-          rpcHeaderBuffer = null;
-          rpcHeaderRead = true;
+          connectionHeaderBuf = null;
+          connectionHeaderRead = true;
           continue;
         }
         
@@ -1334,7 +1362,7 @@ public abstract class Server {
             skipInitialSaslHandshake = false;
             continue;
           }
-          boolean isHeaderRead = headerRead;
+          boolean isHeaderRead = connectionContextRead;
           if (useSasl) {
             saslReadAndProcess(data.array());
           } else {
@@ -1383,6 +1411,17 @@ public abstract class Server {
       }
     }
     
+    private void respondUnsupportedSerialization(IpcSerializationType st) throws IOException {
+      String errMsg = "Server IPC version " + CURRENT_VERSION
+          + " do not support serilization " + st.toString();
+      ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+      Call fakeCall = new Call(-1, null, this);
+      setupResponse(buffer, fakeCall, Status.FATAL, null,
+          IpcException.class.getName(), errMsg);
+      responder.doRespond(fakeCall);
+    }
+    
     private void setupHttpRequestOnIpcPortResponse() throws IOException {
       Call fakeCall =  new Call(0, null, this);
       fakeCall.setResponse(ByteBuffer.wrap(
@@ -1390,15 +1429,15 @@ public abstract class Server {
       responder.doRespond(fakeCall);
     }
 
-    /// Reads the connection header following version
-    private void processHeader(byte[] buf) throws IOException {
+    /** Reads the connection context following the connection header */
+    private void processConnectionContext(byte[] buf) throws IOException {
       DataInputStream in =
         new DataInputStream(new ByteArrayInputStream(buf));
-      header.readFields(in);
-      protocolName = header.getProtocol();
+      connectionContext = IpcConnectionContextProto.parseFrom(in);
+      protocolName = connectionContext.hasProtocol() ? connectionContext
+          .getProtocol() : null;
 
-      
-      UserGroupInformation protocolUser = header.getUgi();
+      UserGroupInformation protocolUser = ProtoUtil.getUgi(connectionContext);
       if (!useSasl) {
         user = protocolUser;
         if (user != null) {
@@ -1472,15 +1511,15 @@ public abstract class Server {
     
     private void processOneRpc(byte[] buf) throws IOException,
         InterruptedException {
-      if (headerRead) {
+      if (connectionContextRead) {
         processData(buf);
       } else {
-        processHeader(buf);
-        headerRead = true;
+        processConnectionContext(buf);
+        connectionContextRead = true;
         if (!authorizeConnection()) {
           throw new AccessControlException("Connection from " + this
-              + " for protocol " + header.getProtocol()
-              + " is unauthorized for user " + user);
+              + " for protocol " + connectionContext.getProtocol()
+              + " is unauthorized for user " + user);      
         }
       }
     }
@@ -1549,9 +1588,9 @@ public abstract class Server {
             && (authMethod != AuthMethod.DIGEST)) {
           ProxyUsers.authorize(user, this.getHostAddress(), conf);
         }
-        authorize(user, header, getHostInetAddress());
+        authorize(user, protocolName, getHostInetAddress());
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Successfully authorized " + header);
+          LOG.debug("Successfully authorized " + connectionContext);
         }
         rpcMetrics.incrAuthorizationSuccesses();
       } catch (AuthorizationException ae) {
@@ -1596,11 +1635,10 @@ public abstract class Server {
       while (running) {
         try {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
-
-          if (LOG.isDebugEnabled())
+          if (LOG.isDebugEnabled()) {
             LOG.debug(getName() + ": has Call#" + call.callId + 
                 "for RpcKind " + call.rpcKind + " from " + call.connection);
-          
+          }
           String errorClass = null;
           String error = null;
           Writable value = null;
@@ -1921,21 +1959,22 @@ public abstract class Server {
    * Authorize the incoming client connection.
    * 
    * @param user client user
-   * @param connection incoming connection
+   * @param protocolName - the protocol
    * @param addr InetAddress of incoming connection
    * @throws AuthorizationException when the client isn't authorized to talk the protocol
    */
-  public void authorize(UserGroupInformation user, 
-                        ConnectionHeader connection,
-                        InetAddress addr
-                        ) throws AuthorizationException {
+  private void authorize(UserGroupInformation user, String protocolName,
+      InetAddress addr) throws AuthorizationException {
     if (authorize) {
+      if (protocolName == null) {
+        throw new AuthorizationException("Null protocol not authorized");
+      }
       Class<?> protocol = null;
       try {
-        protocol = getProtocolClass(connection.getProtocol(), getConf());
+        protocol = getProtocolClass(protocolName, getConf());
       } catch (ClassNotFoundException cfne) {
         throw new AuthorizationException("Unknown protocol: " + 
-                                         connection.getProtocol());
+                                         protocolName);
       }
       serviceAuthorizationManager.authorize(user, protocol, getConf(), addr);
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java?rev=1298245&r1=1298244&r2=1298245&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java Thu Mar  8 00:57:13 2012
@@ -21,6 +21,11 @@ package org.apache.hadoop.util;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
+import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.UserGroupInformation;
+
 public abstract class ProtoUtil {
 
   /**
@@ -63,4 +68,71 @@ public abstract class ProtoUtil {
     return result;
   }
 
+  
+  /** 
+   * This method creates the connection context  using exactly the same logic
+   * as the old connection context as was done for writable where
+   * the effective and real users are set based on the auth method.
+   *
+   */
+  public static IpcConnectionContextProto makeIpcConnectionContext(
+      final String protocol,
+      final UserGroupInformation ugi, final AuthMethod authMethod) {
+    IpcConnectionContextProto.Builder result = IpcConnectionContextProto.newBuilder();
+    if (protocol != null) {
+      result.setProtocol(protocol);
+    }
+    UserInformationProto.Builder ugiProto =  UserInformationProto.newBuilder();
+    if (ugi != null) {
+      /*
+       * In the connection context we send only additional user info that
+       * is not derived from the authentication done during connection setup.
+       */
+      if (authMethod == AuthMethod.KERBEROS) {
+        // Real user was established as part of the connection.
+        // Send effective user only.
+        ugiProto.setEffectiveUser(ugi.getUserName());
+      } else if (authMethod == AuthMethod.DIGEST) {
+        // With token, the connection itself establishes 
+        // both real and effective user. Hence send none in header.
+      } else {  // Simple authentication
+        // No user info is established as part of the connection.
+        // Send both effective user and real user
+        ugiProto.setEffectiveUser(ugi.getUserName());
+        if (ugi.getRealUser() != null) {
+          ugiProto.setRealUser(ugi.getRealUser().getUserName());
+        }
+      }
+    }   
+    result.setUserInfo(ugiProto);
+    return result.build();
+  }
+  
+  public static UserGroupInformation getUgi(IpcConnectionContextProto context) {
+    if (context.hasUserInfo()) {
+      UserInformationProto userInfo = context.getUserInfo();
+        return getUgi(userInfo);
+    } else {
+      return null;
+    }
+  }
+  
+  public static UserGroupInformation getUgi(UserInformationProto userInfo) {
+    UserGroupInformation ugi = null;
+    String effectiveUser = userInfo.hasEffectiveUser() ? userInfo
+        .getEffectiveUser() : null;
+    String realUser = userInfo.hasRealUser() ? userInfo.getRealUser() : null;
+    if (effectiveUser != null) {
+      if (realUser != null) {
+        UserGroupInformation realUserUgi = UserGroupInformation
+            .createRemoteUser(realUser);
+        ugi = UserGroupInformation
+            .createProxyUser(effectiveUser, realUserUgi);
+      } else {
+        ugi = org.apache.hadoop.security.UserGroupInformation
+            .createRemoteUser(effectiveUser);
+      }
+    }
+    return ugi;
+  }
 }