You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/02/16 07:28:28 UTC

[2/2] kudu git commit: java: rename GetMasterRegistration -> ConnectToCluster/ConnectToMaster

java: rename GetMasterRegistration -> ConnectToCluster/ConnectToMaster

This is just a straight Eclipse-driven rename. Actual functional changes
to follow.

Change-Id: Ia8c326f1c0e664f0a2098179af359b2735a442eb
Reviewed-on: http://gerrit.cloudera.org:8080/6018
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7027a168
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7027a168
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7027a168

Branch: refs/heads/master
Commit: 7027a168b2f2eeba7f22e3ec5d3b1d3eaadf2df2
Parents: fc7255a
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 15 12:53:58 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Feb 16 07:15:24 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AsyncKuduClient.java |  16 +-
 .../apache/kudu/client/ConnectToCluster.java    | 247 +++++++++++++++++++
 .../kudu/client/ConnectToClusterResponse.java   |  89 +++++++
 .../kudu/client/ConnectToMasterRequest.java     |  76 ++++++
 .../client/GetMasterRegistrationReceived.java   | 247 -------------------
 .../client/GetMasterRegistrationRequest.java    |  76 ------
 .../client/GetMasterRegistrationResponse.java   |  89 -------
 .../org/apache/kudu/client/TabletClient.java    |   2 +-
 .../kudu/client/TestConnectToCluster.java       | 208 ++++++++++++++++
 .../TestGetMasterRegistrationReceived.java      | 208 ----------------
 10 files changed, 629 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index c1c2fbf..72df1b1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1066,10 +1066,10 @@ public class AsyncKuduClient implements AutoCloseable {
    */
   Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
     final Deferred<Master.GetTableLocationsResponsePB> responseD = new Deferred<>();
-    final GetMasterRegistrationReceived received =
-        new GetMasterRegistrationReceived(masterAddresses, responseD);
+    final ConnectToCluster connector =
+        new ConnectToCluster(masterAddresses, responseD);
     for (HostAndPort hostAndPort : masterAddresses) {
-      Deferred<GetMasterRegistrationResponse> d;
+      Deferred<ConnectToClusterResponse> d;
       // Note: we need to create a client for that host first, as there's a
       // chicken and egg problem: since there is no source of truth beyond
       // the master, the only way to get information about a master host is
@@ -1083,7 +1083,7 @@ public class AsyncKuduClient implements AutoCloseable {
       } else {
         d = getMasterRegistration(clientForHostAndPort, parentRpc);
       }
-      d.addCallbacks(received.callbackForNode(hostAndPort), received.errbackForNode(hostAndPort));
+      d.addCallbacks(connector.callbackForNode(hostAndPort), connector.errbackForNode(hostAndPort));
     }
     return responseD;
   }
@@ -1484,24 +1484,24 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Retrieve the master registration (see {@link GetMasterRegistrationResponse}
+   * Retrieve the master registration (see {@link ConnectToClusterResponse}
    * for a replica.
    * @param masterClient an initialized client for the master replica
    * @param parentRpc RPC that prompted a master lookup, can be null
    * @return a Deferred object for the master replica's current registration
    */
-  Deferred<GetMasterRegistrationResponse> getMasterRegistration(
+  Deferred<ConnectToClusterResponse> getMasterRegistration(
       TabletClient masterClient, KuduRpc<?> parentRpc) {
     // TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
     // basically reuse in some way the master permits.
-    GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(masterTable);
+    ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable);
     if (parentRpc != null) {
       rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
       rpc.setParentRpc(parentRpc);
     } else {
       rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
     }
-    Deferred<GetMasterRegistrationResponse> d = rpc.getDeferred();
+    Deferred<ConnectToClusterResponse> d = rpc.getDeferred();
     rpc.attempt++;
     masterClient.sendRpc(rpc);
     return d;

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
new file mode 100644
index 0000000..1de40d0
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import com.google.protobuf.ByteString;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.Common;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.util.NetUtil;
+
+/**
+ * Class responsible for fanning out RPCs to all of the configured masters,
+ * finding a leader, and responding when the leader has been located.
+ */
+@InterfaceAudience.Private
+final class ConnectToCluster {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConnectToCluster.class);
+
+  private final List<HostAndPort> masterAddrs;
+  private final Deferred<Master.GetTableLocationsResponsePB> responseD;
+  private final int numMasters;
+
+  // Used to avoid calling 'responseD' twice.
+  private final AtomicBoolean responseDCalled = new AtomicBoolean(false);
+
+  // Number of responses we've receives: used to tell whether or not we've received
+  // errors/replies from all of the masters, or if there are any
+  // GetMasterRegistrationRequests still pending.
+  private final AtomicInteger countResponsesReceived = new AtomicInteger(0);
+
+  // Exceptions received so far: kept for debugging purposes.
+  private final List<Exception> exceptionsReceived =
+      Collections.synchronizedList(new ArrayList<Exception>());
+
+  /**
+   * Creates an object that holds the state needed to retrieve master table's location.
+   * @param masterAddrs Addresses of all master replicas that we want to retrieve the
+   *                    registration from.
+   * @param responseD Deferred object that will hold the GetTableLocationsResponsePB object for
+   *                  the master table.
+   */
+  public ConnectToCluster(List<HostAndPort> masterAddrs,
+                          Deferred<Master.GetTableLocationsResponsePB> responseD) {
+    this.masterAddrs = masterAddrs;
+    this.responseD = responseD;
+    this.numMasters = masterAddrs.size();
+  }
+
+  /**
+   * Creates a callback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
+   * @see GetMasterRegistrationCB
+   * @param hostAndPort Host and part for the RPC we're attaching this to. Host and port must
+   *                    be valid.
+   * @return The callback object that can be added to the RPC request.
+   */
+  public Callback<Void, ConnectToClusterResponse> callbackForNode(HostAndPort hostAndPort) {
+    return new GetMasterRegistrationCB(hostAndPort);
+  }
+
+  /**
+   * Creates an errback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
+   * @see GetMasterRegistrationErrCB
+   * @param hostAndPort Host and port for the RPC we're attaching this to. Used for debugging
+   *                    purposes.
+   * @return The errback object that can be added to the RPC request.
+   */
+  public Callback<Void, Exception> errbackForNode(HostAndPort hostAndPort) {
+    return new GetMasterRegistrationErrCB(hostAndPort);
+  }
+
+  /**
+   * Checks if we've already received a response or an exception from every master that
+   * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has been found
+   * (that is, 'responseD' was never called) -- pass a {@link NoLeaderFoundException}
+   * to responseD.
+   */
+  private void incrementCountAndCheckExhausted() {
+    if (countResponsesReceived.incrementAndGet() == numMasters) {
+      if (responseDCalled.compareAndSet(false, true)) {
+
+        // We want `allUnrecoverable` to only be true if all the masters came back with
+        // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
+        // that replies with RecoverableException or with an ok response but is a FOLLOWER is
+        // enough to keep us retrying.
+        boolean allUnrecoverable = true;
+        if (exceptionsReceived.size() == countResponsesReceived.get()) {
+          for (Exception ex : exceptionsReceived) {
+            if (!(ex instanceof NonRecoverableException)) {
+              allUnrecoverable = false;
+              break;
+            }
+          }
+        } else {
+          allUnrecoverable = false;
+        }
+
+        String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
+        if (allUnrecoverable) {
+          // This will stop retries.
+          String msg = String.format("Couldn't find a valid master in (%s). " +
+              "Exceptions received: %s", allHosts,
+              Joiner.on(",").join(Lists.transform(
+                  exceptionsReceived, Functions.toStringFunction())));
+          Status s = Status.ServiceUnavailable(msg);
+          responseD.callback(new NonRecoverableException(s));
+        } else {
+          String message = String.format("Master config (%s) has no leader.",
+              allHosts);
+          Exception ex;
+          if (exceptionsReceived.isEmpty()) {
+            LOG.warn(String.format(
+                "None of the provided masters (%s) is a leader, will retry.",
+                allHosts));
+            ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
+          } else {
+            LOG.warn(String.format(
+                "Unable to find the leader master (%s), will retry",
+                allHosts));
+            String joinedMsg = message + " Exceptions received: " +
+                Joiner.on(",").join(Lists.transform(
+                    exceptionsReceived, Functions.toStringFunction()));
+            Status s = Status.ServiceUnavailable(joinedMsg);
+            ex = new NoLeaderFoundException(s,
+                exceptionsReceived.get(exceptionsReceived.size() - 1));
+          }
+          responseD.callback(ex);
+        }
+      }
+    }
+  }
+
+  /**
+   * Callback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
+   * If a request (paired to a specific master) returns a reply that indicates it's a leader,
+   * the callback in 'responseD' is invoked with an initialized GetTableLocationResponsePB
+   * object containing the leader's RPC address.
+   * If the master is not a leader, increment 'countResponsesReceived': if the count equals to
+   * the number of masters, pass {@link NoLeaderFoundException} into
+   * 'responseD' if no one else had called 'responseD' before; otherwise, do nothing.
+   */
+  final class GetMasterRegistrationCB implements Callback<Void, ConnectToClusterResponse> {
+    private final HostAndPort hostAndPort;
+
+    public GetMasterRegistrationCB(HostAndPort hostAndPort) {
+      this.hostAndPort = hostAndPort;
+    }
+
+    @Override
+    public Void call(ConnectToClusterResponse r) throws Exception {
+      Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
+          Master.TabletLocationsPB.ReplicaPB.newBuilder();
+
+      Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
+      tsInfoBuilder.addRpcAddresses(ProtobufHelper.hostAndPortToPB(hostAndPort));
+      tsInfoBuilder.setPermanentUuid(r.getInstanceId().getPermanentUuid());
+      replicaBuilder.setTsInfo(tsInfoBuilder);
+      if (r.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
+        replicaBuilder.setRole(r.getRole());
+        Master.TabletLocationsPB.Builder locationBuilder = Master.TabletLocationsPB.newBuilder();
+        locationBuilder.setPartition(
+            Common.PartitionPB.newBuilder().setPartitionKeyStart(ByteString.EMPTY)
+                                           .setPartitionKeyEnd(ByteString.EMPTY));
+        locationBuilder.setTabletId(
+            ByteString.copyFromUtf8(AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER));
+        locationBuilder.addReplicas(replicaBuilder);
+        // No one else has called this before us.
+        if (responseDCalled.compareAndSet(false, true)) {
+          responseD.callback(
+              Master.GetTableLocationsResponsePB.newBuilder().addTabletLocations(
+                  locationBuilder.build()).build()
+          );
+        } else {
+          LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
+              hostAndPort.toString());
+        }
+      } else {
+        incrementCountAndCheckExhausted();
+      }
+      return null;
+    }
+
+    @Override
+    public String toString() {
+      return "get master registration for " + hostAndPort.toString();
+    }
+  }
+
+  /**
+   * Errback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
+   * Stores each exception in 'exceptionsReceived'. Increments 'countResponseReceived': if
+   * the count is equal to the number of masters and no one else had called 'responseD' before,
+   * pass a {@link NoLeaderFoundException} into 'responseD'; otherwise, do
+   * nothing.
+   */
+  final class GetMasterRegistrationErrCB implements Callback<Void, Exception> {
+    private final HostAndPort hostAndPort;
+
+    public GetMasterRegistrationErrCB(HostAndPort hostAndPort) {
+      this.hostAndPort = hostAndPort;
+    }
+
+    @Override
+    public Void call(Exception e) throws Exception {
+      LOG.warn("Error receiving a response from: " + hostAndPort, e);
+      exceptionsReceived.add(e);
+      incrementCountAndCheckExhausted();
+      return null;
+    }
+
+    @Override
+    public String toString() {
+      return "get master registration errback for " + hostAndPort.toString();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
new file mode 100644
index 0000000..daac6c4
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.kudu.WireProtocol;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+
+/**
+ * Response for {@link ConnectToMasterRequest}.
+ */
+@InterfaceAudience.Private
+public class ConnectToClusterResponse extends KuduRpcResponse {
+
+  private final Metadata.RaftPeerPB.Role role;
+  private final WireProtocol.ServerRegistrationPB serverRegistration;
+  private final WireProtocol.NodeInstancePB instanceId;
+
+  /**
+   * Describes a response to a {@link ConnectToMasterRequest}, built from
+   * {@link Master.GetMasterRegistrationResponsePB}.
+   *
+   * @param role Master's role in the config.
+   * @param serverRegistration server registration (RPC and HTTP addresses) for this master.
+   * @param instanceId Node instance (permanent uuid and
+   */
+  public ConnectToClusterResponse(long elapsedMillis, String tsUUID,
+                                       Metadata.RaftPeerPB.Role role,
+                                       WireProtocol.ServerRegistrationPB serverRegistration,
+                                       WireProtocol.NodeInstancePB instanceId) {
+    super(elapsedMillis, tsUUID);
+    this.role = role;
+    this.serverRegistration = serverRegistration;
+    this.instanceId = instanceId;
+  }
+
+  /**
+   * Returns this master's role in the config.
+   *
+   * @see Metadata.RaftPeerPB.Role
+   * @return Node's role in the cluster, or FOLLOWER if the node is not initialized.
+   */
+  public Metadata.RaftPeerPB.Role getRole() {
+    return role;
+  }
+
+  /**
+   * Returns the server registration (list of RPC and HTTP ports) for this master.
+   *
+   * @return The {@link WireProtocol.ServerRegistrationPB} object for this master.
+   */
+  public WireProtocol.ServerRegistrationPB getServerRegistration() {
+    return serverRegistration;
+  }
+
+  /**
+   * The node instance (initial sequence number and permanent uuid) for this master.
+   *
+   * @return The {@link WireProtocol.NodeInstancePB} object for this master.
+   */
+  public WireProtocol.NodeInstancePB getInstanceId() {
+    return instanceId;
+  }
+
+  @Override
+  public String toString() {
+    return "GetMasterRegistrationResponse{" +
+        "role=" + role +
+        ", serverRegistration=" + serverRegistration +
+        ", instanceId=" + instanceId +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
new file mode 100644
index 0000000..a98ac38
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.apache.kudu.consensus.Metadata.RaftPeerPB;
+import static org.apache.kudu.master.Master.GetMasterRegistrationRequestPB;
+import static org.apache.kudu.master.Master.GetMasterRegistrationResponsePB;
+import static org.apache.kudu.master.Master.MasterErrorPB;
+
+import com.google.protobuf.Message;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.util.Pair;
+
+/**
+ * Package-private RPC that can only go to master.
+ */
+@InterfaceAudience.Private
+public class ConnectToMasterRequest extends KuduRpc<ConnectToClusterResponse> {
+  private static final String GET_MASTER_REGISTRATION = "GetMasterRegistration";
+
+  public ConnectToMasterRequest(KuduTable masterTable) {
+    super(masterTable);
+  }
+
+  @Override
+  Message createRequestPB() {
+    return GetMasterRegistrationRequestPB.getDefaultInstance();
+  }
+
+  @Override
+  String serviceName() {
+    return MASTER_SERVICE_NAME;
+  }
+
+  @Override
+  String method() {
+    return GET_MASTER_REGISTRATION;
+  }
+
+  @Override
+  Pair<ConnectToClusterResponse, Object> deserialize(CallResponse callResponse,
+                                                     String tsUUID) throws KuduException {
+    final GetMasterRegistrationResponsePB.Builder respBuilder =
+        GetMasterRegistrationResponsePB.newBuilder();
+    readProtobuf(callResponse.getPBMessage(), respBuilder);
+    RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER;
+    if (!respBuilder.hasError() || respBuilder.getError().getCode() !=
+        MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) {
+      role = respBuilder.getRole();
+    }
+    ConnectToClusterResponse response = new ConnectToClusterResponse(
+        deadlineTracker.getElapsedMillis(),
+        tsUUID,
+        role,
+        respBuilder.getRegistration(),
+        respBuilder.getInstanceId());
+    return new Pair<ConnectToClusterResponse, Object>(
+        response, respBuilder.hasError() ? respBuilder.getError() : null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
deleted file mode 100644
index 9bd71fc..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
+++ /dev/null
@@ -1,247 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.net.HostAndPort;
-import com.google.protobuf.ByteString;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.Common;
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.util.NetUtil;
-
-/**
- * Class grouping the callback and the errback for GetMasterRegistration calls
- * made in getMasterTableLocationsPB.
- */
-@InterfaceAudience.Private
-final class GetMasterRegistrationReceived {
-
-  private static final Logger LOG = LoggerFactory.getLogger(GetMasterRegistrationReceived.class);
-
-  private final List<HostAndPort> masterAddrs;
-  private final Deferred<Master.GetTableLocationsResponsePB> responseD;
-  private final int numMasters;
-
-  // Used to avoid calling 'responseD' twice.
-  private final AtomicBoolean responseDCalled = new AtomicBoolean(false);
-
-  // Number of responses we've receives: used to tell whether or not we've received
-  // errors/replies from all of the masters, or if there are any
-  // GetMasterRegistrationRequests still pending.
-  private final AtomicInteger countResponsesReceived = new AtomicInteger(0);
-
-  // Exceptions received so far: kept for debugging purposes.
-  private final List<Exception> exceptionsReceived =
-      Collections.synchronizedList(new ArrayList<Exception>());
-
-  /**
-   * Creates an object that holds the state needed to retrieve master table's location.
-   * @param masterAddrs Addresses of all master replicas that we want to retrieve the
-   *                    registration from.
-   * @param responseD Deferred object that will hold the GetTableLocationsResponsePB object for
-   *                  the master table.
-   */
-  public GetMasterRegistrationReceived(List<HostAndPort> masterAddrs,
-                                       Deferred<Master.GetTableLocationsResponsePB> responseD) {
-    this.masterAddrs = masterAddrs;
-    this.responseD = responseD;
-    this.numMasters = masterAddrs.size();
-  }
-
-  /**
-   * Creates a callback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
-   * @see GetMasterRegistrationCB
-   * @param hostAndPort Host and part for the RPC we're attaching this to. Host and port must
-   *                    be valid.
-   * @return The callback object that can be added to the RPC request.
-   */
-  public Callback<Void, GetMasterRegistrationResponse> callbackForNode(HostAndPort hostAndPort) {
-    return new GetMasterRegistrationCB(hostAndPort);
-  }
-
-  /**
-   * Creates an errback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
-   * @see GetMasterRegistrationErrCB
-   * @param hostAndPort Host and port for the RPC we're attaching this to. Used for debugging
-   *                    purposes.
-   * @return The errback object that can be added to the RPC request.
-   */
-  public Callback<Void, Exception> errbackForNode(HostAndPort hostAndPort) {
-    return new GetMasterRegistrationErrCB(hostAndPort);
-  }
-
-  /**
-   * Checks if we've already received a response or an exception from every master that
-   * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has been found
-   * (that is, 'responseD' was never called) -- pass a {@link NoLeaderFoundException}
-   * to responseD.
-   */
-  private void incrementCountAndCheckExhausted() {
-    if (countResponsesReceived.incrementAndGet() == numMasters) {
-      if (responseDCalled.compareAndSet(false, true)) {
-
-        // We want `allUnrecoverable` to only be true if all the masters came back with
-        // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
-        // that replies with RecoverableException or with an ok response but is a FOLLOWER is
-        // enough to keep us retrying.
-        boolean allUnrecoverable = true;
-        if (exceptionsReceived.size() == countResponsesReceived.get()) {
-          for (Exception ex : exceptionsReceived) {
-            if (!(ex instanceof NonRecoverableException)) {
-              allUnrecoverable = false;
-              break;
-            }
-          }
-        } else {
-          allUnrecoverable = false;
-        }
-
-        String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
-        if (allUnrecoverable) {
-          // This will stop retries.
-          String msg = String.format("Couldn't find a valid master in (%s). " +
-              "Exceptions received: %s", allHosts,
-              Joiner.on(",").join(Lists.transform(
-                  exceptionsReceived, Functions.toStringFunction())));
-          Status s = Status.ServiceUnavailable(msg);
-          responseD.callback(new NonRecoverableException(s));
-        } else {
-          String message = String.format("Master config (%s) has no leader.",
-              allHosts);
-          Exception ex;
-          if (exceptionsReceived.isEmpty()) {
-            LOG.warn(String.format(
-                "None of the provided masters (%s) is a leader, will retry.",
-                allHosts));
-            ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
-          } else {
-            LOG.warn(String.format(
-                "Unable to find the leader master (%s), will retry",
-                allHosts));
-            String joinedMsg = message + " Exceptions received: " +
-                Joiner.on(",").join(Lists.transform(
-                    exceptionsReceived, Functions.toStringFunction()));
-            Status s = Status.ServiceUnavailable(joinedMsg);
-            ex = new NoLeaderFoundException(s,
-                exceptionsReceived.get(exceptionsReceived.size() - 1));
-          }
-          responseD.callback(ex);
-        }
-      }
-    }
-  }
-
-  /**
-   * Callback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
-   * If a request (paired to a specific master) returns a reply that indicates it's a leader,
-   * the callback in 'responseD' is invoked with an initialized GetTableLocationResponsePB
-   * object containing the leader's RPC address.
-   * If the master is not a leader, increment 'countResponsesReceived': if the count equals to
-   * the number of masters, pass {@link NoLeaderFoundException} into
-   * 'responseD' if no one else had called 'responseD' before; otherwise, do nothing.
-   */
-  final class GetMasterRegistrationCB implements Callback<Void, GetMasterRegistrationResponse> {
-    private final HostAndPort hostAndPort;
-
-    public GetMasterRegistrationCB(HostAndPort hostAndPort) {
-      this.hostAndPort = hostAndPort;
-    }
-
-    @Override
-    public Void call(GetMasterRegistrationResponse r) throws Exception {
-      Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
-          Master.TabletLocationsPB.ReplicaPB.newBuilder();
-
-      Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
-      tsInfoBuilder.addRpcAddresses(ProtobufHelper.hostAndPortToPB(hostAndPort));
-      tsInfoBuilder.setPermanentUuid(r.getInstanceId().getPermanentUuid());
-      replicaBuilder.setTsInfo(tsInfoBuilder);
-      if (r.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
-        replicaBuilder.setRole(r.getRole());
-        Master.TabletLocationsPB.Builder locationBuilder = Master.TabletLocationsPB.newBuilder();
-        locationBuilder.setPartition(
-            Common.PartitionPB.newBuilder().setPartitionKeyStart(ByteString.EMPTY)
-                                           .setPartitionKeyEnd(ByteString.EMPTY));
-        locationBuilder.setTabletId(
-            ByteString.copyFromUtf8(AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER));
-        locationBuilder.addReplicas(replicaBuilder);
-        // No one else has called this before us.
-        if (responseDCalled.compareAndSet(false, true)) {
-          responseD.callback(
-              Master.GetTableLocationsResponsePB.newBuilder().addTabletLocations(
-                  locationBuilder.build()).build()
-          );
-        } else {
-          LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
-              hostAndPort.toString());
-        }
-      } else {
-        incrementCountAndCheckExhausted();
-      }
-      return null;
-    }
-
-    @Override
-    public String toString() {
-      return "get master registration for " + hostAndPort.toString();
-    }
-  }
-
-  /**
-   * Errback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
-   * Stores each exception in 'exceptionsReceived'. Increments 'countResponseReceived': if
-   * the count is equal to the number of masters and no one else had called 'responseD' before,
-   * pass a {@link NoLeaderFoundException} into 'responseD'; otherwise, do
-   * nothing.
-   */
-  final class GetMasterRegistrationErrCB implements Callback<Void, Exception> {
-    private final HostAndPort hostAndPort;
-
-    public GetMasterRegistrationErrCB(HostAndPort hostAndPort) {
-      this.hostAndPort = hostAndPort;
-    }
-
-    @Override
-    public Void call(Exception e) throws Exception {
-      LOG.warn("Error receiving a response from: " + hostAndPort, e);
-      exceptionsReceived.add(e);
-      incrementCountAndCheckExhausted();
-      return null;
-    }
-
-    @Override
-    public String toString() {
-      return "get master registration errback for " + hostAndPort.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
deleted file mode 100644
index bf5c864..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import static org.apache.kudu.consensus.Metadata.RaftPeerPB;
-import static org.apache.kudu.master.Master.GetMasterRegistrationRequestPB;
-import static org.apache.kudu.master.Master.GetMasterRegistrationResponsePB;
-import static org.apache.kudu.master.Master.MasterErrorPB;
-
-import com.google.protobuf.Message;
-
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.util.Pair;
-
-/**
- * Package-private RPC that can only go to master.
- */
-@InterfaceAudience.Private
-public class GetMasterRegistrationRequest extends KuduRpc<GetMasterRegistrationResponse> {
-  private static final String GET_MASTER_REGISTRATION = "GetMasterRegistration";
-
-  public GetMasterRegistrationRequest(KuduTable masterTable) {
-    super(masterTable);
-  }
-
-  @Override
-  Message createRequestPB() {
-    return GetMasterRegistrationRequestPB.getDefaultInstance();
-  }
-
-  @Override
-  String serviceName() {
-    return MASTER_SERVICE_NAME;
-  }
-
-  @Override
-  String method() {
-    return GET_MASTER_REGISTRATION;
-  }
-
-  @Override
-  Pair<GetMasterRegistrationResponse, Object> deserialize(CallResponse callResponse,
-                                                          String tsUUID) throws KuduException {
-    final GetMasterRegistrationResponsePB.Builder respBuilder =
-        GetMasterRegistrationResponsePB.newBuilder();
-    readProtobuf(callResponse.getPBMessage(), respBuilder);
-    RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER;
-    if (!respBuilder.hasError() || respBuilder.getError().getCode() !=
-        MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) {
-      role = respBuilder.getRole();
-    }
-    GetMasterRegistrationResponse response = new GetMasterRegistrationResponse(
-        deadlineTracker.getElapsedMillis(),
-        tsUUID,
-        role,
-        respBuilder.getRegistration(),
-        respBuilder.getInstanceId());
-    return new Pair<GetMasterRegistrationResponse, Object>(
-        response, respBuilder.hasError() ? respBuilder.getError() : null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
deleted file mode 100644
index 7fa9f0c..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import org.apache.kudu.WireProtocol;
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
-
-/**
- * Response for {@link GetMasterRegistrationRequest}.
- */
-@InterfaceAudience.Private
-public class GetMasterRegistrationResponse extends KuduRpcResponse {
-
-  private final Metadata.RaftPeerPB.Role role;
-  private final WireProtocol.ServerRegistrationPB serverRegistration;
-  private final WireProtocol.NodeInstancePB instanceId;
-
-  /**
-   * Describes a response to a {@link GetMasterRegistrationRequest}, built from
-   * {@link Master.GetMasterRegistrationResponsePB}.
-   *
-   * @param role Master's role in the config.
-   * @param serverRegistration server registration (RPC and HTTP addresses) for this master.
-   * @param instanceId Node instance (permanent uuid and
-   */
-  public GetMasterRegistrationResponse(long elapsedMillis, String tsUUID,
-                                       Metadata.RaftPeerPB.Role role,
-                                       WireProtocol.ServerRegistrationPB serverRegistration,
-                                       WireProtocol.NodeInstancePB instanceId) {
-    super(elapsedMillis, tsUUID);
-    this.role = role;
-    this.serverRegistration = serverRegistration;
-    this.instanceId = instanceId;
-  }
-
-  /**
-   * Returns this master's role in the config.
-   *
-   * @see Metadata.RaftPeerPB.Role
-   * @return Node's role in the cluster, or FOLLOWER if the node is not initialized.
-   */
-  public Metadata.RaftPeerPB.Role getRole() {
-    return role;
-  }
-
-  /**
-   * Returns the server registration (list of RPC and HTTP ports) for this master.
-   *
-   * @return The {@link WireProtocol.ServerRegistrationPB} object for this master.
-   */
-  public WireProtocol.ServerRegistrationPB getServerRegistration() {
-    return serverRegistration;
-  }
-
-  /**
-   * The node instance (initial sequence number and permanent uuid) for this master.
-   *
-   * @return The {@link WireProtocol.NodeInstancePB} object for this master.
-   */
-  public WireProtocol.NodeInstancePB getInstanceId() {
-    return instanceId;
-  }
-
-  @Override
-  public String toString() {
-    return "GetMasterRegistrationResponse{" +
-        "role=" + role +
-        ", serverRegistration=" + serverRegistration +
-        ", instanceId=" + instanceId +
-        '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index b107fbb..af4e2b3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -536,7 +536,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
     if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
       kuduClient.handleNotLeader(rpc, new RecoverableException(status), this);
     } else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
-      if (rpc instanceof GetMasterRegistrationRequest) {
+      if (rpc instanceof ConnectToMasterRequest) {
         // Special case:
         // We never want to retry this RPC, we only use it to poke masters to learn where the leader
         // is. If the error is truly non recoverable, it'll be handled later.

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
new file mode 100644
index 0000000..eb10443
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.FOLLOWER;
+import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.LEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.HostAndPort;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.junit.Test;
+
+import org.apache.kudu.WireProtocol;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+
+public class TestConnectToCluster {
+
+  private static final List<HostAndPort> MASTERS = ImmutableList.of(
+      HostAndPort.fromParts("0", 9000),
+      HostAndPort.fromParts("1", 9000),
+      HostAndPort.fromParts("2", 9000));
+
+  @Test(timeout = 10000)
+  public void test() throws Exception {
+    NonRecoverableException reusableNRE = new NonRecoverableException(
+        Status.RuntimeError(""));
+    RecoverableException reusableRE = new RecoverableException(
+        Status.RuntimeError(""));
+    NoLeaderFoundException retryResponse =
+        new NoLeaderFoundException(Status.RuntimeError(""));
+    // We don't test for a particular good response, so as long as we pass something that's not an
+    // exception to runTest() we're good.
+    Object successResponse = new Object();
+
+    // Success cases.
+
+    // Normal case.
+    runTest(
+        makeGMRR(LEADER),
+        makeGMRR(FOLLOWER),
+        makeGMRR(FOLLOWER),
+        successResponse);
+
+    // Permutation works too.
+    runTest(
+        makeGMRR(FOLLOWER),
+        makeGMRR(LEADER),
+        makeGMRR(FOLLOWER),
+        successResponse);
+
+    // Multiple leaders, that's fine since it might be a TOCTOU situation, or one master
+    // is confused. Raft handles this if the client then tries to do something that requires a
+    // replication on the master-side.
+    runTest(
+        makeGMRR(LEADER),
+        makeGMRR(LEADER),
+        makeGMRR(FOLLOWER),
+        successResponse);
+
+    // Mixed bag, still works because there's a leader.
+    runTest(
+        reusableNRE,
+        makeGMRR(FOLLOWER),
+        makeGMRR(LEADER),
+        successResponse);
+
+    // All unreachable except one leader, still good.
+    runTest(
+        reusableNRE,
+        reusableNRE,
+        makeGMRR(LEADER),
+        successResponse);
+
+    // Permutation of the previous.
+    runTest(
+        reusableNRE,
+        makeGMRR(LEADER),
+        reusableNRE,
+        successResponse);
+
+    // Retry cases.
+
+    // Just followers means we retry.
+    runTest(
+        makeGMRR(FOLLOWER),
+        makeGMRR(FOLLOWER),
+        makeGMRR(FOLLOWER),
+        retryResponse);
+
+    // One NRE but we have responsive masters, retry.
+    runTest(
+        makeGMRR(FOLLOWER),
+        makeGMRR(FOLLOWER),
+        reusableNRE,
+        retryResponse);
+
+    // One good master but no leader, retry.
+    runTest(
+        reusableNRE,
+        makeGMRR(FOLLOWER),
+        reusableNRE,
+        retryResponse);
+
+    // Different case but same outcome.
+    runTest(
+        reusableRE,
+        reusableNRE,
+        makeGMRR(FOLLOWER),
+        retryResponse);
+
+    // All recoverable means retry.
+    runTest(
+        reusableRE,
+        reusableRE,
+        reusableRE,
+        retryResponse);
+
+    // Just one recoverable still means retry.
+    runTest(
+        reusableRE,
+        reusableNRE,
+        reusableNRE,
+        retryResponse);
+
+    // Failure case.
+
+    // Can't recover anything, give up.
+    runTest(
+        reusableNRE,
+        reusableNRE,
+        reusableNRE,
+        reusableNRE);
+  }
+
+  private void runTest(Object response0,
+                       Object response1,
+                       Object response2,
+                       Object expectedResponse) throws Exception {
+
+    // Here we basically do what AsyncKuduClient would do, add all the callbacks and then we also
+    // add the responses. We then check for the right response.
+
+    Deferred<Master.GetTableLocationsResponsePB> d = new Deferred<>();
+
+    ConnectToCluster grrm = new ConnectToCluster(MASTERS, d);
+
+    Callback<Void, ConnectToClusterResponse> cb0 = grrm.callbackForNode(MASTERS.get(0));
+    Callback<Void, ConnectToClusterResponse> cb1 = grrm.callbackForNode(MASTERS.get(1));
+    Callback<Void, ConnectToClusterResponse> cb2 = grrm.callbackForNode(MASTERS.get(2));
+
+    Callback<Void, Exception> eb0 = grrm.errbackForNode(MASTERS.get(0));
+    Callback<Void, Exception> eb1 = grrm.errbackForNode(MASTERS.get(1));
+    Callback<Void, Exception> eb2 = grrm.errbackForNode(MASTERS.get(2));
+
+    callTheRightCallback(cb0, eb0, response0);
+    callTheRightCallback(cb1, eb1, response1);
+    callTheRightCallback(cb2, eb2, response2);
+
+    try {
+      d.join(); // Don't care about the response.
+      if ((expectedResponse instanceof Exception)) {
+        fail("Should not work " + expectedResponse.getClass());
+      } else {
+        // ok
+      }
+    } catch (Exception ex) {
+      assertEquals(expectedResponse.getClass(), ex.getClass());
+    }
+  }
+
+  // Helper method that determines if the callback or errback should be called.
+  private static void callTheRightCallback(
+      Callback<Void, ConnectToClusterResponse> cb,
+      Callback<Void, Exception> eb,
+      Object response) throws Exception {
+    if (response instanceof Exception) {
+      eb.call((Exception) response);
+    } else {
+      cb.call((ConnectToClusterResponse) response);
+    }
+  }
+
+  private static ConnectToClusterResponse makeGMRR(Metadata.RaftPeerPB.Role role) {
+    return new ConnectToClusterResponse(0, "", role, null,
+        WireProtocol.NodeInstancePB.getDefaultInstance());
+  }
+}

http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
deleted file mode 100644
index b57a18c..0000000
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
+++ /dev/null
@@ -1,208 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.FOLLOWER;
-import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.LEADER;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.junit.Test;
-
-import org.apache.kudu.WireProtocol;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
-
-public class TestGetMasterRegistrationReceived {
-
-  private static final List<HostAndPort> MASTERS = ImmutableList.of(
-      HostAndPort.fromParts("0", 9000),
-      HostAndPort.fromParts("1", 9000),
-      HostAndPort.fromParts("2", 9000));
-
-  @Test(timeout = 10000)
-  public void test() throws Exception {
-    NonRecoverableException reusableNRE = new NonRecoverableException(
-        Status.RuntimeError(""));
-    RecoverableException reusableRE = new RecoverableException(
-        Status.RuntimeError(""));
-    NoLeaderFoundException retryResponse =
-        new NoLeaderFoundException(Status.RuntimeError(""));
-    // We don't test for a particular good response, so as long as we pass something that's not an
-    // exception to runTest() we're good.
-    Object successResponse = new Object();
-
-    // Success cases.
-
-    // Normal case.
-    runTest(
-        makeGMRR(LEADER),
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
-        successResponse);
-
-    // Permutation works too.
-    runTest(
-        makeGMRR(FOLLOWER),
-        makeGMRR(LEADER),
-        makeGMRR(FOLLOWER),
-        successResponse);
-
-    // Multiple leaders, that's fine since it might be a TOCTOU situation, or one master
-    // is confused. Raft handles this if the client then tries to do something that requires a
-    // replication on the master-side.
-    runTest(
-        makeGMRR(LEADER),
-        makeGMRR(LEADER),
-        makeGMRR(FOLLOWER),
-        successResponse);
-
-    // Mixed bag, still works because there's a leader.
-    runTest(
-        reusableNRE,
-        makeGMRR(FOLLOWER),
-        makeGMRR(LEADER),
-        successResponse);
-
-    // All unreachable except one leader, still good.
-    runTest(
-        reusableNRE,
-        reusableNRE,
-        makeGMRR(LEADER),
-        successResponse);
-
-    // Permutation of the previous.
-    runTest(
-        reusableNRE,
-        makeGMRR(LEADER),
-        reusableNRE,
-        successResponse);
-
-    // Retry cases.
-
-    // Just followers means we retry.
-    runTest(
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
-        retryResponse);
-
-    // One NRE but we have responsive masters, retry.
-    runTest(
-        makeGMRR(FOLLOWER),
-        makeGMRR(FOLLOWER),
-        reusableNRE,
-        retryResponse);
-
-    // One good master but no leader, retry.
-    runTest(
-        reusableNRE,
-        makeGMRR(FOLLOWER),
-        reusableNRE,
-        retryResponse);
-
-    // Different case but same outcome.
-    runTest(
-        reusableRE,
-        reusableNRE,
-        makeGMRR(FOLLOWER),
-        retryResponse);
-
-    // All recoverable means retry.
-    runTest(
-        reusableRE,
-        reusableRE,
-        reusableRE,
-        retryResponse);
-
-    // Just one recoverable still means retry.
-    runTest(
-        reusableRE,
-        reusableNRE,
-        reusableNRE,
-        retryResponse);
-
-    // Failure case.
-
-    // Can't recover anything, give up.
-    runTest(
-        reusableNRE,
-        reusableNRE,
-        reusableNRE,
-        reusableNRE);
-  }
-
-  private void runTest(Object response0,
-                       Object response1,
-                       Object response2,
-                       Object expectedResponse) throws Exception {
-
-    // Here we basically do what AsyncKuduClient would do, add all the callbacks and then we also
-    // add the responses. We then check for the right response.
-
-    Deferred<Master.GetTableLocationsResponsePB> d = new Deferred<>();
-
-    GetMasterRegistrationReceived grrm = new GetMasterRegistrationReceived(MASTERS, d);
-
-    Callback<Void, GetMasterRegistrationResponse> cb0 = grrm.callbackForNode(MASTERS.get(0));
-    Callback<Void, GetMasterRegistrationResponse> cb1 = grrm.callbackForNode(MASTERS.get(1));
-    Callback<Void, GetMasterRegistrationResponse> cb2 = grrm.callbackForNode(MASTERS.get(2));
-
-    Callback<Void, Exception> eb0 = grrm.errbackForNode(MASTERS.get(0));
-    Callback<Void, Exception> eb1 = grrm.errbackForNode(MASTERS.get(1));
-    Callback<Void, Exception> eb2 = grrm.errbackForNode(MASTERS.get(2));
-
-    callTheRightCallback(cb0, eb0, response0);
-    callTheRightCallback(cb1, eb1, response1);
-    callTheRightCallback(cb2, eb2, response2);
-
-    try {
-      d.join(); // Don't care about the response.
-      if ((expectedResponse instanceof Exception)) {
-        fail("Should not work " + expectedResponse.getClass());
-      } else {
-        // ok
-      }
-    } catch (Exception ex) {
-      assertEquals(expectedResponse.getClass(), ex.getClass());
-    }
-  }
-
-  // Helper method that determines if the callback or errback should be called.
-  private static void callTheRightCallback(
-      Callback<Void, GetMasterRegistrationResponse> cb,
-      Callback<Void, Exception> eb,
-      Object response) throws Exception {
-    if (response instanceof Exception) {
-      eb.call((Exception) response);
-    } else {
-      cb.call((GetMasterRegistrationResponse) response);
-    }
-  }
-
-  private static GetMasterRegistrationResponse makeGMRR(Metadata.RaftPeerPB.Role role) {
-    return new GetMasterRegistrationResponse(0, "", role, null,
-        WireProtocol.NodeInstancePB.getDefaultInstance());
-  }
-}