You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2017/02/08 22:03:10 UTC

[6/7] kudu git commit: Rename SecureRpcHelper to Negotiator

Rename SecureRpcHelper to Negotiator

Change-Id: Ia5790a8dbf1a147fd5e6bbc793bc3707456121d2
Reviewed-on: http://gerrit.cloudera.org:8080/5944
Reviewed-by: Dan Burkert <da...@apache.org>
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/67f24868
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/67f24868
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/67f24868

Branch: refs/heads/master
Commit: 67f2486891378c8344ac3f68f9e8f344b74881af
Parents: 3642e1f
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Feb 7 17:30:24 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Feb 8 21:56:30 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/kudu/client/Negotiator.java | 308 +++++++++++++++++++
 .../org/apache/kudu/client/SecureRpcHelper.java | 303 ------------------
 .../org/apache/kudu/client/TabletClient.java    |  10 +-
 3 files changed, 313 insertions(+), 308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/67f24868/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
new file mode 100644
index 0000000..307145c
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *   - Redistributions of source code must retain the aabove copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   - Redistributions in binary form must reproduce the above copyright notice,
+ *     this list of conditions and the following disclaimer in the documentation
+ *     and/or other materials provided with the distribution.
+ *   - Neither the name of the StumbleUpon nor the names of its contributors
+ *     may be used to endorse or promote products derived from this software
+ *     without specific prior written permission.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package org.apache.kudu.client;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.Set;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ZeroCopyLiteralByteString;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+
+/**
+ * Netty Pipeline handler which runs connection negotiation with
+ * the server. When negotiation is complete, this removes itself
+ * from the pipeline and fires a Negotiator.Result upstream.
+ */
+@InterfaceAudience.Private
+public class Negotiator extends SimpleChannelUpstreamHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
+
+  private static final Map<String, String> SASL_PROPS = ImmutableMap.of();
+  private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
+
+  /**
+   * List of SASL mechanisms supported by the client, in descending priority order.
+   * The client will pick the first of these mechanisms that is supported by
+   * the server and also succeeds to initialize.
+   */
+  private static final String[] PRIORITIZED_MECHS = new String[] { "GSSAPI", "PLAIN" };
+
+  static final String USER_AND_PASSWORD = "java_client";
+
+  private boolean finished;
+  private SaslClient saslClient;
+  public static final int CONNECTION_CTX_CALL_ID = -3;
+  private static final int SASL_CALL_ID = -33;
+  private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
+      ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
+  private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
+
+  private final Subject subject;
+  private final String remoteHostname;
+
+  public Negotiator(Subject subject, String remoteHostname) {
+    this.subject = subject;
+    this.remoteHostname = remoteHostname;
+  }
+
+  public void sendHello(Channel channel) {
+    sendNegotiateMessage(channel);
+  }
+
+  private void sendNegotiateMessage(Channel channel) {
+    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
+
+    // Advertise our supported features
+    for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
+      builder.addSupportedFeatures(flag);
+    }
+
+    builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
+    sendSaslMessage(channel, builder.build());
+  }
+
+  private void sendSaslMessage(Channel channel, RpcHeader.NegotiatePB msg) {
+    Preconditions.checkNotNull(channel);
+    RpcHeader.RequestHeader.Builder builder = RpcHeader.RequestHeader.newBuilder();
+    builder.setCallId(SASL_CALL_ID);
+    RpcHeader.RequestHeader header = builder.build();
+
+    ChannelBuffer buffer = KuduRpc.toChannelBuffer(header, msg);
+    Channels.write(channel, buffer);
+  }
+
+  @Override
+  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+      throws Exception {
+    Object m = evt.getMessage();
+    if (!(m instanceof CallResponse)) {
+      ctx.sendUpstream(evt);
+      return;
+    }
+    handleResponse(ctx.getChannel(), (CallResponse)m);
+  }
+
+  private void handleResponse(Channel chan, CallResponse callResponse) throws SaslException {
+    // TODO(todd): this needs to handle error responses, not just success responses.
+    Preconditions.checkState(!finished, "received a response after negotiation was complete");
+    RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
+    switch (response.getStep()) {
+      case NEGOTIATE:
+        handleNegotiateResponse(chan, response);
+        break;
+      case SASL_CHALLENGE:
+        handleChallengeResponse(chan, response);
+        break;
+      case SASL_SUCCESS:
+        handleSuccessResponse(chan);
+        break;
+      default:
+        LOG.error(String.format("Wrong negotiation step: %s", response.getStep()));
+    }
+  }
+
+
+  private RpcHeader.NegotiatePB parseSaslMsgResponse(CallResponse response) {
+    RpcHeader.ResponseHeader responseHeader = response.getHeader();
+    int id = responseHeader.getCallId();
+    if (id != SASL_CALL_ID) {
+      throw new IllegalStateException("Received a call that wasn't for SASL");
+    }
+
+    RpcHeader.NegotiatePB.Builder saslBuilder = RpcHeader.NegotiatePB.newBuilder();
+    KuduRpc.readProtobuf(response.getPBMessage(), saslBuilder);
+    return saslBuilder.build();
+  }
+
+  private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
+      SaslException {
+    // Store the supported features advertised by the server.
+    ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
+    for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
+      if (SUPPORTED_RPC_FEATURES.contains(feature)) {
+        features.add(feature);
+      }
+    }
+    serverFeatures = features.build();
+
+    // Gather the set of server-supported mechanisms.
+    Set<String> serverMechs = Sets.newHashSet();
+    for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
+      serverMechs.add(mech.getMechanism());
+    }
+
+    // For each of our own mechanisms, in descending priority, check if
+    // the server also supports them. If so, try to initialize saslClient.
+    // If we find a common mechanism that also can be successfully initialized,
+    // choose that mech.
+    byte[] initialResponse = null;
+    String chosenMech = null;
+    Map<String, String> errorsByMech = Maps.newHashMap();
+    for (String clientMech : PRIORITIZED_MECHS) {
+      if (!serverMechs.contains(clientMech)) {
+        errorsByMech.put(clientMech, "not advertised by server");
+        continue;
+      }
+      try {
+        saslClient = Sasl.createSaslClient(new String[]{ clientMech },
+                                           null,
+                                           "kudu",
+                                           remoteHostname,
+                                           SASL_PROPS,
+                                           SASL_CALLBACK);
+        if (saslClient.hasInitialResponse()) {
+          initialResponse = evaluateChallenge(new byte[0]);
+        }
+        chosenMech = clientMech;
+        break;
+      } catch (SaslException e) {
+        errorsByMech.put(clientMech, e.getMessage());
+        saslClient = null;
+      }
+    }
+
+    if (chosenMech == null) {
+      throw new SaslException("unable to negotiate a matching mechanism. Errors: [" +
+                              Joiner.on(",").withKeyValueSeparator(": ").join(errorsByMech) +
+                              "]");
+    }
+
+    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
+    if (initialResponse != null) {
+      builder.setToken(ZeroCopyLiteralByteString.wrap(initialResponse));
+    }
+    builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
+    builder.addSaslMechanismsBuilder().setMechanism(chosenMech);
+    sendSaslMessage(chan, builder.build());
+  }
+
+  private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response) throws
+      SaslException {
+    byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
+    if (saslToken == null) {
+      throw new IllegalStateException("Not expecting an empty token");
+    }
+    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
+    builder.setToken(ZeroCopyLiteralByteString.wrap(saslToken));
+    builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_RESPONSE);
+    sendSaslMessage(chan, builder.build());
+  }
+
+  private void handleSuccessResponse(Channel chan) {
+    finished = true;
+    chan.getPipeline().remove(this);
+
+    Channels.write(chan, makeConnectionContext());
+    Channels.fireMessageReceived(chan, new Result(serverFeatures));
+  }
+
+  private ChannelBuffer makeConnectionContext() {
+    RpcHeader.ConnectionContextPB.Builder builder = RpcHeader.ConnectionContextPB.newBuilder();
+
+    // The UserInformationPB is deprecated, but used by servers prior to Kudu 1.1.
+    RpcHeader.UserInformationPB.Builder userBuilder = RpcHeader.UserInformationPB.newBuilder();
+    userBuilder.setEffectiveUser(Negotiator.USER_AND_PASSWORD);
+    userBuilder.setRealUser(Negotiator.USER_AND_PASSWORD);
+    builder.setDEPRECATEDUserInfo(userBuilder.build());
+    RpcHeader.ConnectionContextPB pb = builder.build();
+    RpcHeader.RequestHeader header =
+        RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build();
+    return KuduRpc.toChannelBuffer(header, pb);
+  }
+
+  private byte[] evaluateChallenge(final byte[] challenge) throws SaslException {
+    try {
+      return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+          @Override
+          public byte[] run() throws Exception {
+            return saslClient.evaluateChallenge(challenge);
+          }
+        });
+    } catch (Exception e) {
+      Throwables.propagateIfInstanceOf(e, SaslException.class);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private static class SaslClientCallbackHandler implements CallbackHandler {
+    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+      for (Callback callback : callbacks) {
+        if (callback instanceof NameCallback) {
+          ((NameCallback) callback).setName(USER_AND_PASSWORD);
+        } else if (callback instanceof PasswordCallback) {
+          ((PasswordCallback) callback).setPassword(USER_AND_PASSWORD.toCharArray());
+        } else {
+          throw new UnsupportedCallbackException(callback,
+                                                 "Unrecognized SASL client callback");
+        }
+      }
+    }
+  }
+
+  /**
+   * The results of a successful negotiation. This is sent to upstream handlers in the
+   * Netty pipeline after negotiation completes.
+   */
+  static class Result {
+    final Set<RpcFeatureFlag> serverFeatures;
+
+    public Result(Set<RpcFeatureFlag> serverFeatures) {
+      this.serverFeatures = serverFeatures;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/67f24868/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
deleted file mode 100644
index bf2fece..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/SecureRpcHelper.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Copyright (C) 2010-2012  The Async HBase Authors.  All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *   - Redistributions of source code must retain the aabove copyright notice,
- *     this list of conditions and the following disclaimer.
- *   - Redistributions in binary form must reproduce the above copyright notice,
- *     this list of conditions and the following disclaimer in the documentation
- *     and/or other materials provided with the distribution.
- *   - Neither the name of the StumbleUpon nor the names of its contributors
- *     may be used to endorse or promote products derived from this software
- *     without specific prior written permission.
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-package org.apache.kudu.client;
-
-import java.security.PrivilegedExceptionAction;
-import java.util.Map;
-import java.util.Set;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ZeroCopyLiteralByteString;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.rpc.RpcHeader;
-import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
-
-@InterfaceAudience.Private
-public class SecureRpcHelper extends SimpleChannelUpstreamHandler {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TabletClient.class);
-
-  private static final Map<String, String> SASL_PROPS = ImmutableMap.of();
-  private static final SaslClientCallbackHandler SASL_CALLBACK = new SaslClientCallbackHandler();
-
-  /**
-   * List of SASL mechanisms supported by the client, in descending priority order.
-   * The client will pick the first of these mechanisms that is supported by
-   * the server and also succeeds to initialize.
-   */
-  private static final String[] PRIORITIZED_MECHS = new String[] { "GSSAPI", "PLAIN" };
-
-  static final String USER_AND_PASSWORD = "java_client";
-
-  private boolean finished;
-  private SaslClient saslClient;
-  public static final int CONNECTION_CTX_CALL_ID = -3;
-  private static final int SASL_CALL_ID = -33;
-  private static final Set<RpcHeader.RpcFeatureFlag> SUPPORTED_RPC_FEATURES =
-      ImmutableSet.of(RpcHeader.RpcFeatureFlag.APPLICATION_FEATURE_FLAGS);
-  private Set<RpcHeader.RpcFeatureFlag> serverFeatures;
-
-  private final Subject subject;
-  private final String remoteHostname;
-
-  public SecureRpcHelper(Subject subject, String remoteHostname) {
-    this.subject = subject;
-    this.remoteHostname = remoteHostname;
-  }
-
-  public void sendHello(Channel channel) {
-    sendNegotiateMessage(channel);
-  }
-
-  private void sendNegotiateMessage(Channel channel) {
-    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
-
-    // Advertise our supported features
-    for (RpcHeader.RpcFeatureFlag flag : SUPPORTED_RPC_FEATURES) {
-      builder.addSupportedFeatures(flag);
-    }
-
-    builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.NEGOTIATE);
-    sendSaslMessage(channel, builder.build());
-  }
-
-  private void sendSaslMessage(Channel channel, RpcHeader.NegotiatePB msg) {
-    Preconditions.checkNotNull(channel);
-    RpcHeader.RequestHeader.Builder builder = RpcHeader.RequestHeader.newBuilder();
-    builder.setCallId(SASL_CALL_ID);
-    RpcHeader.RequestHeader header = builder.build();
-
-    ChannelBuffer buffer = KuduRpc.toChannelBuffer(header, msg);
-    Channels.write(channel, buffer);
-  }
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
-      throws Exception {
-    Object m = evt.getMessage();
-    if (!(m instanceof CallResponse)) {
-      ctx.sendUpstream(evt);
-      return;
-    }
-    handleResponse(ctx.getChannel(), (CallResponse)m);
-  }
-
-  private void handleResponse(Channel chan, CallResponse callResponse) throws SaslException {
-    // TODO(todd): this needs to handle error responses, not just success responses.
-    Preconditions.checkState(!finished, "received a response after negotiation was complete");
-    RpcHeader.NegotiatePB response = parseSaslMsgResponse(callResponse);
-    switch (response.getStep()) {
-      case NEGOTIATE:
-        handleNegotiateResponse(chan, response);
-        break;
-      case SASL_CHALLENGE:
-        handleChallengeResponse(chan, response);
-        break;
-      case SASL_SUCCESS:
-        handleSuccessResponse(chan);
-        break;
-      default:
-        LOG.error(String.format("Wrong negotiation step: %s", response.getStep()));
-    }
-  }
-
-
-  private RpcHeader.NegotiatePB parseSaslMsgResponse(CallResponse response) {
-    RpcHeader.ResponseHeader responseHeader = response.getHeader();
-    int id = responseHeader.getCallId();
-    if (id != SASL_CALL_ID) {
-      throw new IllegalStateException("Received a call that wasn't for SASL");
-    }
-
-    RpcHeader.NegotiatePB.Builder saslBuilder = RpcHeader.NegotiatePB.newBuilder();
-    KuduRpc.readProtobuf(response.getPBMessage(), saslBuilder);
-    return saslBuilder.build();
-  }
-
-  private void handleNegotiateResponse(Channel chan, RpcHeader.NegotiatePB response) throws
-      SaslException {
-    // Store the supported features advertised by the server.
-    ImmutableSet.Builder<RpcHeader.RpcFeatureFlag> features = ImmutableSet.builder();
-    for (RpcHeader.RpcFeatureFlag feature : response.getSupportedFeaturesList()) {
-      if (SUPPORTED_RPC_FEATURES.contains(feature)) {
-        features.add(feature);
-      }
-    }
-    serverFeatures = features.build();
-
-    // Gather the set of server-supported mechanisms.
-    Set<String> serverMechs = Sets.newHashSet();
-    for (RpcHeader.NegotiatePB.SaslMechanism mech : response.getSaslMechanismsList()) {
-      serverMechs.add(mech.getMechanism());
-    }
-
-    // For each of our own mechanisms, in descending priority, check if
-    // the server also supports them. If so, try to initialize saslClient.
-    // If we find a common mechanism that also can be successfully initialized,
-    // choose that mech.
-    byte[] initialResponse = null;
-    String chosenMech = null;
-    Map<String, String> errorsByMech = Maps.newHashMap();
-    for (String clientMech : PRIORITIZED_MECHS) {
-      if (!serverMechs.contains(clientMech)) {
-        errorsByMech.put(clientMech, "not advertised by server");
-        continue;
-      }
-      try {
-        saslClient = Sasl.createSaslClient(new String[]{ clientMech },
-                                           null,
-                                           "kudu",
-                                           remoteHostname,
-                                           SASL_PROPS,
-                                           SASL_CALLBACK);
-        if (saslClient.hasInitialResponse()) {
-          initialResponse = evaluateChallenge(new byte[0]);
-        }
-        chosenMech = clientMech;
-        break;
-      } catch (SaslException e) {
-        errorsByMech.put(clientMech, e.getMessage());
-        saslClient = null;
-      }
-    }
-
-    if (chosenMech == null) {
-      throw new SaslException("unable to negotiate a matching mechanism. Errors: [" +
-                              Joiner.on(",").withKeyValueSeparator(": ").join(errorsByMech) +
-                              "]");
-    }
-
-    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
-    if (initialResponse != null) {
-      builder.setToken(ZeroCopyLiteralByteString.wrap(initialResponse));
-    }
-    builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_INITIATE);
-    builder.addSaslMechanismsBuilder().setMechanism(chosenMech);
-    sendSaslMessage(chan, builder.build());
-  }
-
-  private void handleChallengeResponse(Channel chan, RpcHeader.NegotiatePB response) throws
-      SaslException {
-    byte[] saslToken = evaluateChallenge(response.getToken().toByteArray());
-    if (saslToken == null) {
-      throw new IllegalStateException("Not expecting an empty token");
-    }
-    RpcHeader.NegotiatePB.Builder builder = RpcHeader.NegotiatePB.newBuilder();
-    builder.setToken(ZeroCopyLiteralByteString.wrap(saslToken));
-    builder.setStep(RpcHeader.NegotiatePB.NegotiateStep.SASL_RESPONSE);
-    sendSaslMessage(chan, builder.build());
-  }
-
-  private void handleSuccessResponse(Channel chan) {
-    finished = true;
-    chan.getPipeline().remove(this);
-
-    Channels.write(chan, makeConnectionContext());
-    Channels.fireMessageReceived(chan, new NegotiationResult(serverFeatures));
-  }
-
-  private ChannelBuffer makeConnectionContext() {
-    RpcHeader.ConnectionContextPB.Builder builder = RpcHeader.ConnectionContextPB.newBuilder();
-
-    // The UserInformationPB is deprecated, but used by servers prior to Kudu 1.1.
-    RpcHeader.UserInformationPB.Builder userBuilder = RpcHeader.UserInformationPB.newBuilder();
-    userBuilder.setEffectiveUser(SecureRpcHelper.USER_AND_PASSWORD);
-    userBuilder.setRealUser(SecureRpcHelper.USER_AND_PASSWORD);
-    builder.setDEPRECATEDUserInfo(userBuilder.build());
-    RpcHeader.ConnectionContextPB pb = builder.build();
-    RpcHeader.RequestHeader header =
-        RpcHeader.RequestHeader.newBuilder().setCallId(CONNECTION_CTX_CALL_ID).build();
-    return KuduRpc.toChannelBuffer(header, pb);
-  }
-
-  private byte[] evaluateChallenge(final byte[] challenge) throws SaslException {
-    try {
-      return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
-          @Override
-          public byte[] run() throws Exception {
-            return saslClient.evaluateChallenge(challenge);
-          }
-        });
-    } catch (Exception e) {
-      Throwables.propagateIfInstanceOf(e, SaslException.class);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private static class SaslClientCallbackHandler implements CallbackHandler {
-    public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
-      for (Callback callback : callbacks) {
-        if (callback instanceof NameCallback) {
-          ((NameCallback) callback).setName(USER_AND_PASSWORD);
-        } else if (callback instanceof PasswordCallback) {
-          ((PasswordCallback) callback).setPassword(USER_AND_PASSWORD.toCharArray());
-        } else {
-          throw new UnsupportedCallbackException(callback,
-                                                 "Unrecognized SASL client callback");
-        }
-      }
-    }
-  }
-
-  /**
-   * The results of a successful negotiation. This is sent to upstream handlers in the
-   * Netty pipeline after negotiation completes.
-   */
-  static class NegotiationResult {
-    final Set<RpcFeatureFlag> serverFeatures;
-
-    public NegotiationResult(Set<RpcFeatureFlag> serverFeatures) {
-      this.serverFeatures = serverFeatures;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/67f24868/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 33ec662..cf2aef8 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -56,7 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.WireProtocol;
 import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.client.SecureRpcHelper.NegotiationResult;
+import org.apache.kudu.client.Negotiator.Result;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.rpc.RpcHeader;
 import org.apache.kudu.tserver.Tserver;
@@ -143,7 +143,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
   // differently by also clearing the caches.
   private volatile boolean gotUncaughtException = false;
 
-  private NegotiationResult negotiationResult;
+  private Result negotiationResult;
 
   public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) {
     this.kuduClient = client;
@@ -378,8 +378,8 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
   @SuppressWarnings("unchecked")
   public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
     Object m = evt.getMessage();
-    if (m instanceof SecureRpcHelper.NegotiationResult) {
-      this.negotiationResult = (NegotiationResult) m;
+    if (m instanceof Negotiator.Result) {
+      this.negotiationResult = (Result) m;
       this.chan = ctx.getChannel();
       sendQueuedRpcs();
       return;
@@ -603,7 +603,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
     final Channel chan = e.getChannel();
     Channels.write(chan, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
 
-    SecureRpcHelper secureRpcHelper = new SecureRpcHelper(getSubject(), serverInfo.getHostname());
+    Negotiator secureRpcHelper = new Negotiator(getSubject(), serverInfo.getHostname());
     ctx.getPipeline().addBefore(ctx.getName(), "negotiation", secureRpcHelper);
     secureRpcHelper.sendHello(chan);
   }