You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/02/16 07:28:28 UTC
[2/2] kudu git commit: java: rename GetMasterRegistration ->
ConnectToCluster/ConnectToMaster
java: rename GetMasterRegistration -> ConnectToCluster/ConnectToMaster
This is just a straight Eclipse-driven rename. Actual functional changes
to follow.
Change-Id: Ia8c326f1c0e664f0a2098179af359b2735a442eb
Reviewed-on: http://gerrit.cloudera.org:8080/6018
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7027a168
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7027a168
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7027a168
Branch: refs/heads/master
Commit: 7027a168b2f2eeba7f22e3ec5d3b1d3eaadf2df2
Parents: fc7255a
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 15 12:53:58 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Feb 16 07:15:24 2017 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 16 +-
.../apache/kudu/client/ConnectToCluster.java | 247 +++++++++++++++++++
.../kudu/client/ConnectToClusterResponse.java | 89 +++++++
.../kudu/client/ConnectToMasterRequest.java | 76 ++++++
.../client/GetMasterRegistrationReceived.java | 247 -------------------
.../client/GetMasterRegistrationRequest.java | 76 ------
.../client/GetMasterRegistrationResponse.java | 89 -------
.../org/apache/kudu/client/TabletClient.java | 2 +-
.../kudu/client/TestConnectToCluster.java | 208 ++++++++++++++++
.../TestGetMasterRegistrationReceived.java | 208 ----------------
10 files changed, 629 insertions(+), 629 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index c1c2fbf..72df1b1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1066,10 +1066,10 @@ public class AsyncKuduClient implements AutoCloseable {
*/
Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
final Deferred<Master.GetTableLocationsResponsePB> responseD = new Deferred<>();
- final GetMasterRegistrationReceived received =
- new GetMasterRegistrationReceived(masterAddresses, responseD);
+ final ConnectToCluster connector =
+ new ConnectToCluster(masterAddresses, responseD);
for (HostAndPort hostAndPort : masterAddresses) {
- Deferred<GetMasterRegistrationResponse> d;
+ Deferred<ConnectToClusterResponse> d;
// Note: we need to create a client for that host first, as there's a
// chicken and egg problem: since there is no source of truth beyond
// the master, the only way to get information about a master host is
@@ -1083,7 +1083,7 @@ public class AsyncKuduClient implements AutoCloseable {
} else {
d = getMasterRegistration(clientForHostAndPort, parentRpc);
}
- d.addCallbacks(received.callbackForNode(hostAndPort), received.errbackForNode(hostAndPort));
+ d.addCallbacks(connector.callbackForNode(hostAndPort), connector.errbackForNode(hostAndPort));
}
return responseD;
}
@@ -1484,24 +1484,24 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * Retrieve the master registration (see {@link GetMasterRegistrationResponse}
+ * Retrieve the master registration (see {@link ConnectToClusterResponse}
* for a replica.
* @param masterClient an initialized client for the master replica
* @param parentRpc RPC that prompted a master lookup, can be null
* @return a Deferred object for the master replica's current registration
*/
- Deferred<GetMasterRegistrationResponse> getMasterRegistration(
+ Deferred<ConnectToClusterResponse> getMasterRegistration(
TabletClient masterClient, KuduRpc<?> parentRpc) {
// TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
// basically reuse in some way the master permits.
- GetMasterRegistrationRequest rpc = new GetMasterRegistrationRequest(masterTable);
+ ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable);
if (parentRpc != null) {
rpc.setTimeoutMillis(parentRpc.deadlineTracker.getMillisBeforeDeadline());
rpc.setParentRpc(parentRpc);
} else {
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
}
- Deferred<GetMasterRegistrationResponse> d = rpc.getDeferred();
+ Deferred<ConnectToClusterResponse> d = rpc.getDeferred();
rpc.attempt++;
masterClient.sendRpc(rpc);
return d;
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
new file mode 100644
index 0000000..1de40d0
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import com.google.protobuf.ByteString;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.Common;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+import org.apache.kudu.util.NetUtil;
+
+/**
+ * Class responsible for fanning out RPCs to all of the configured masters,
+ * finding a leader, and responding when the leader has been located.
+ */
+@InterfaceAudience.Private
+final class ConnectToCluster {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectToCluster.class);
+
+ private final List<HostAndPort> masterAddrs;
+ private final Deferred<Master.GetTableLocationsResponsePB> responseD;
+ private final int numMasters;
+
+ // Used to avoid calling 'responseD' twice.
+ private final AtomicBoolean responseDCalled = new AtomicBoolean(false);
+
+ // Number of responses we've receives: used to tell whether or not we've received
+ // errors/replies from all of the masters, or if there are any
+ // GetMasterRegistrationRequests still pending.
+ private final AtomicInteger countResponsesReceived = new AtomicInteger(0);
+
+ // Exceptions received so far: kept for debugging purposes.
+ private final List<Exception> exceptionsReceived =
+ Collections.synchronizedList(new ArrayList<Exception>());
+
+ /**
+ * Creates an object that holds the state needed to retrieve master table's location.
+ * @param masterAddrs Addresses of all master replicas that we want to retrieve the
+ * registration from.
+ * @param responseD Deferred object that will hold the GetTableLocationsResponsePB object for
+ * the master table.
+ */
+ public ConnectToCluster(List<HostAndPort> masterAddrs,
+ Deferred<Master.GetTableLocationsResponsePB> responseD) {
+ this.masterAddrs = masterAddrs;
+ this.responseD = responseD;
+ this.numMasters = masterAddrs.size();
+ }
+
+ /**
+ * Creates a callback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
+ * @see GetMasterRegistrationCB
+ * @param hostAndPort Host and part for the RPC we're attaching this to. Host and port must
+ * be valid.
+ * @return The callback object that can be added to the RPC request.
+ */
+ public Callback<Void, ConnectToClusterResponse> callbackForNode(HostAndPort hostAndPort) {
+ return new GetMasterRegistrationCB(hostAndPort);
+ }
+
+ /**
+ * Creates an errback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
+ * @see GetMasterRegistrationErrCB
+ * @param hostAndPort Host and port for the RPC we're attaching this to. Used for debugging
+ * purposes.
+ * @return The errback object that can be added to the RPC request.
+ */
+ public Callback<Void, Exception> errbackForNode(HostAndPort hostAndPort) {
+ return new GetMasterRegistrationErrCB(hostAndPort);
+ }
+
+ /**
+ * Checks if we've already received a response or an exception from every master that
+ * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has been found
+ * (that is, 'responseD' was never called) -- pass a {@link NoLeaderFoundException}
+ * to responseD.
+ */
+ private void incrementCountAndCheckExhausted() {
+ if (countResponsesReceived.incrementAndGet() == numMasters) {
+ if (responseDCalled.compareAndSet(false, true)) {
+
+ // We want `allUnrecoverable` to only be true if all the masters came back with
+ // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
+ // that replies with RecoverableException or with an ok response but is a FOLLOWER is
+ // enough to keep us retrying.
+ boolean allUnrecoverable = true;
+ if (exceptionsReceived.size() == countResponsesReceived.get()) {
+ for (Exception ex : exceptionsReceived) {
+ if (!(ex instanceof NonRecoverableException)) {
+ allUnrecoverable = false;
+ break;
+ }
+ }
+ } else {
+ allUnrecoverable = false;
+ }
+
+ String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
+ if (allUnrecoverable) {
+ // This will stop retries.
+ String msg = String.format("Couldn't find a valid master in (%s). " +
+ "Exceptions received: %s", allHosts,
+ Joiner.on(",").join(Lists.transform(
+ exceptionsReceived, Functions.toStringFunction())));
+ Status s = Status.ServiceUnavailable(msg);
+ responseD.callback(new NonRecoverableException(s));
+ } else {
+ String message = String.format("Master config (%s) has no leader.",
+ allHosts);
+ Exception ex;
+ if (exceptionsReceived.isEmpty()) {
+ LOG.warn(String.format(
+ "None of the provided masters (%s) is a leader, will retry.",
+ allHosts));
+ ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
+ } else {
+ LOG.warn(String.format(
+ "Unable to find the leader master (%s), will retry",
+ allHosts));
+ String joinedMsg = message + " Exceptions received: " +
+ Joiner.on(",").join(Lists.transform(
+ exceptionsReceived, Functions.toStringFunction()));
+ Status s = Status.ServiceUnavailable(joinedMsg);
+ ex = new NoLeaderFoundException(s,
+ exceptionsReceived.get(exceptionsReceived.size() - 1));
+ }
+ responseD.callback(ex);
+ }
+ }
+ }
+ }
+
+ /**
+ * Callback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
+ * If a request (paired to a specific master) returns a reply that indicates it's a leader,
+ * the callback in 'responseD' is invoked with an initialized GetTableLocationResponsePB
+ * object containing the leader's RPC address.
+ * If the master is not a leader, increment 'countResponsesReceived': if the count equals to
+ * the number of masters, pass {@link NoLeaderFoundException} into
+ * 'responseD' if no one else had called 'responseD' before; otherwise, do nothing.
+ */
+ final class GetMasterRegistrationCB implements Callback<Void, ConnectToClusterResponse> {
+ private final HostAndPort hostAndPort;
+
+ public GetMasterRegistrationCB(HostAndPort hostAndPort) {
+ this.hostAndPort = hostAndPort;
+ }
+
+ @Override
+ public Void call(ConnectToClusterResponse r) throws Exception {
+ Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
+ Master.TabletLocationsPB.ReplicaPB.newBuilder();
+
+ Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
+ tsInfoBuilder.addRpcAddresses(ProtobufHelper.hostAndPortToPB(hostAndPort));
+ tsInfoBuilder.setPermanentUuid(r.getInstanceId().getPermanentUuid());
+ replicaBuilder.setTsInfo(tsInfoBuilder);
+ if (r.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
+ replicaBuilder.setRole(r.getRole());
+ Master.TabletLocationsPB.Builder locationBuilder = Master.TabletLocationsPB.newBuilder();
+ locationBuilder.setPartition(
+ Common.PartitionPB.newBuilder().setPartitionKeyStart(ByteString.EMPTY)
+ .setPartitionKeyEnd(ByteString.EMPTY));
+ locationBuilder.setTabletId(
+ ByteString.copyFromUtf8(AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER));
+ locationBuilder.addReplicas(replicaBuilder);
+ // No one else has called this before us.
+ if (responseDCalled.compareAndSet(false, true)) {
+ responseD.callback(
+ Master.GetTableLocationsResponsePB.newBuilder().addTabletLocations(
+ locationBuilder.build()).build()
+ );
+ } else {
+ LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
+ hostAndPort.toString());
+ }
+ } else {
+ incrementCountAndCheckExhausted();
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "get master registration for " + hostAndPort.toString();
+ }
+ }
+
+ /**
+ * Errback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
+ * Stores each exception in 'exceptionsReceived'. Increments 'countResponseReceived': if
+ * the count is equal to the number of masters and no one else had called 'responseD' before,
+ * pass a {@link NoLeaderFoundException} into 'responseD'; otherwise, do
+ * nothing.
+ */
+ final class GetMasterRegistrationErrCB implements Callback<Void, Exception> {
+ private final HostAndPort hostAndPort;
+
+ public GetMasterRegistrationErrCB(HostAndPort hostAndPort) {
+ this.hostAndPort = hostAndPort;
+ }
+
+ @Override
+ public Void call(Exception e) throws Exception {
+ LOG.warn("Error receiving a response from: " + hostAndPort, e);
+ exceptionsReceived.add(e);
+ incrementCountAndCheckExhausted();
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "get master registration errback for " + hostAndPort.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
new file mode 100644
index 0000000..daac6c4
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToClusterResponse.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.kudu.WireProtocol;
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+
+/**
+ * Response for {@link ConnectToMasterRequest}.
+ */
+@InterfaceAudience.Private
+public class ConnectToClusterResponse extends KuduRpcResponse {
+
+ private final Metadata.RaftPeerPB.Role role;
+ private final WireProtocol.ServerRegistrationPB serverRegistration;
+ private final WireProtocol.NodeInstancePB instanceId;
+
+ /**
+ * Describes a response to a {@link ConnectToMasterRequest}, built from
+ * {@link Master.GetMasterRegistrationResponsePB}.
+ *
+ * @param role Master's role in the config.
+ * @param serverRegistration server registration (RPC and HTTP addresses) for this master.
+ * @param instanceId Node instance (permanent uuid and
+ */
+ public ConnectToClusterResponse(long elapsedMillis, String tsUUID,
+ Metadata.RaftPeerPB.Role role,
+ WireProtocol.ServerRegistrationPB serverRegistration,
+ WireProtocol.NodeInstancePB instanceId) {
+ super(elapsedMillis, tsUUID);
+ this.role = role;
+ this.serverRegistration = serverRegistration;
+ this.instanceId = instanceId;
+ }
+
+ /**
+ * Returns this master's role in the config.
+ *
+ * @see Metadata.RaftPeerPB.Role
+ * @return Node's role in the cluster, or FOLLOWER if the node is not initialized.
+ */
+ public Metadata.RaftPeerPB.Role getRole() {
+ return role;
+ }
+
+ /**
+ * Returns the server registration (list of RPC and HTTP ports) for this master.
+ *
+ * @return The {@link WireProtocol.ServerRegistrationPB} object for this master.
+ */
+ public WireProtocol.ServerRegistrationPB getServerRegistration() {
+ return serverRegistration;
+ }
+
+ /**
+ * The node instance (initial sequence number and permanent uuid) for this master.
+ *
+ * @return The {@link WireProtocol.NodeInstancePB} object for this master.
+ */
+ public WireProtocol.NodeInstancePB getInstanceId() {
+ return instanceId;
+ }
+
+ @Override
+ public String toString() {
+ return "GetMasterRegistrationResponse{" +
+ "role=" + role +
+ ", serverRegistration=" + serverRegistration +
+ ", instanceId=" + instanceId +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
new file mode 100644
index 0000000..a98ac38
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToMasterRequest.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.apache.kudu.consensus.Metadata.RaftPeerPB;
+import static org.apache.kudu.master.Master.GetMasterRegistrationRequestPB;
+import static org.apache.kudu.master.Master.GetMasterRegistrationResponsePB;
+import static org.apache.kudu.master.Master.MasterErrorPB;
+
+import com.google.protobuf.Message;
+
+import org.apache.kudu.annotations.InterfaceAudience;
+import org.apache.kudu.util.Pair;
+
+/**
+ * Package-private RPC that can only go to master.
+ */
+@InterfaceAudience.Private
+public class ConnectToMasterRequest extends KuduRpc<ConnectToClusterResponse> {
+ private static final String GET_MASTER_REGISTRATION = "GetMasterRegistration";
+
+ public ConnectToMasterRequest(KuduTable masterTable) {
+ super(masterTable);
+ }
+
+ @Override
+ Message createRequestPB() {
+ return GetMasterRegistrationRequestPB.getDefaultInstance();
+ }
+
+ @Override
+ String serviceName() {
+ return MASTER_SERVICE_NAME;
+ }
+
+ @Override
+ String method() {
+ return GET_MASTER_REGISTRATION;
+ }
+
+ @Override
+ Pair<ConnectToClusterResponse, Object> deserialize(CallResponse callResponse,
+ String tsUUID) throws KuduException {
+ final GetMasterRegistrationResponsePB.Builder respBuilder =
+ GetMasterRegistrationResponsePB.newBuilder();
+ readProtobuf(callResponse.getPBMessage(), respBuilder);
+ RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER;
+ if (!respBuilder.hasError() || respBuilder.getError().getCode() !=
+ MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) {
+ role = respBuilder.getRole();
+ }
+ ConnectToClusterResponse response = new ConnectToClusterResponse(
+ deadlineTracker.getElapsedMillis(),
+ tsUUID,
+ role,
+ respBuilder.getRegistration(),
+ respBuilder.getInstanceId());
+ return new Pair<ConnectToClusterResponse, Object>(
+ response, respBuilder.hasError() ? respBuilder.getError() : null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
deleted file mode 100644
index 9bd71fc..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationReceived.java
+++ /dev/null
@@ -1,247 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.net.HostAndPort;
-import com.google.protobuf.ByteString;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kudu.Common;
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.util.NetUtil;
-
-/**
- * Class grouping the callback and the errback for GetMasterRegistration calls
- * made in getMasterTableLocationsPB.
- */
-@InterfaceAudience.Private
-final class GetMasterRegistrationReceived {
-
- private static final Logger LOG = LoggerFactory.getLogger(GetMasterRegistrationReceived.class);
-
- private final List<HostAndPort> masterAddrs;
- private final Deferred<Master.GetTableLocationsResponsePB> responseD;
- private final int numMasters;
-
- // Used to avoid calling 'responseD' twice.
- private final AtomicBoolean responseDCalled = new AtomicBoolean(false);
-
- // Number of responses we've receives: used to tell whether or not we've received
- // errors/replies from all of the masters, or if there are any
- // GetMasterRegistrationRequests still pending.
- private final AtomicInteger countResponsesReceived = new AtomicInteger(0);
-
- // Exceptions received so far: kept for debugging purposes.
- private final List<Exception> exceptionsReceived =
- Collections.synchronizedList(new ArrayList<Exception>());
-
- /**
- * Creates an object that holds the state needed to retrieve master table's location.
- * @param masterAddrs Addresses of all master replicas that we want to retrieve the
- * registration from.
- * @param responseD Deferred object that will hold the GetTableLocationsResponsePB object for
- * the master table.
- */
- public GetMasterRegistrationReceived(List<HostAndPort> masterAddrs,
- Deferred<Master.GetTableLocationsResponsePB> responseD) {
- this.masterAddrs = masterAddrs;
- this.responseD = responseD;
- this.numMasters = masterAddrs.size();
- }
-
- /**
- * Creates a callback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
- * @see GetMasterRegistrationCB
- * @param hostAndPort Host and part for the RPC we're attaching this to. Host and port must
- * be valid.
- * @return The callback object that can be added to the RPC request.
- */
- public Callback<Void, GetMasterRegistrationResponse> callbackForNode(HostAndPort hostAndPort) {
- return new GetMasterRegistrationCB(hostAndPort);
- }
-
- /**
- * Creates an errback for a GetMasterRegistrationRequest that was sent to 'hostAndPort'.
- * @see GetMasterRegistrationErrCB
- * @param hostAndPort Host and port for the RPC we're attaching this to. Used for debugging
- * purposes.
- * @return The errback object that can be added to the RPC request.
- */
- public Callback<Void, Exception> errbackForNode(HostAndPort hostAndPort) {
- return new GetMasterRegistrationErrCB(hostAndPort);
- }
-
- /**
- * Checks if we've already received a response or an exception from every master that
- * we've sent a GetMasterRegistrationRequest to. If so -- and no leader has been found
- * (that is, 'responseD' was never called) -- pass a {@link NoLeaderFoundException}
- * to responseD.
- */
- private void incrementCountAndCheckExhausted() {
- if (countResponsesReceived.incrementAndGet() == numMasters) {
- if (responseDCalled.compareAndSet(false, true)) {
-
- // We want `allUnrecoverable` to only be true if all the masters came back with
- // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
- // that replies with RecoverableException or with an ok response but is a FOLLOWER is
- // enough to keep us retrying.
- boolean allUnrecoverable = true;
- if (exceptionsReceived.size() == countResponsesReceived.get()) {
- for (Exception ex : exceptionsReceived) {
- if (!(ex instanceof NonRecoverableException)) {
- allUnrecoverable = false;
- break;
- }
- }
- } else {
- allUnrecoverable = false;
- }
-
- String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
- if (allUnrecoverable) {
- // This will stop retries.
- String msg = String.format("Couldn't find a valid master in (%s). " +
- "Exceptions received: %s", allHosts,
- Joiner.on(",").join(Lists.transform(
- exceptionsReceived, Functions.toStringFunction())));
- Status s = Status.ServiceUnavailable(msg);
- responseD.callback(new NonRecoverableException(s));
- } else {
- String message = String.format("Master config (%s) has no leader.",
- allHosts);
- Exception ex;
- if (exceptionsReceived.isEmpty()) {
- LOG.warn(String.format(
- "None of the provided masters (%s) is a leader, will retry.",
- allHosts));
- ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
- } else {
- LOG.warn(String.format(
- "Unable to find the leader master (%s), will retry",
- allHosts));
- String joinedMsg = message + " Exceptions received: " +
- Joiner.on(",").join(Lists.transform(
- exceptionsReceived, Functions.toStringFunction()));
- Status s = Status.ServiceUnavailable(joinedMsg);
- ex = new NoLeaderFoundException(s,
- exceptionsReceived.get(exceptionsReceived.size() - 1));
- }
- responseD.callback(ex);
- }
- }
- }
- }
-
- /**
- * Callback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
- * If a request (paired to a specific master) returns a reply that indicates it's a leader,
- * the callback in 'responseD' is invoked with an initialized GetTableLocationResponsePB
- * object containing the leader's RPC address.
- * If the master is not a leader, increment 'countResponsesReceived': if the count equals to
- * the number of masters, pass {@link NoLeaderFoundException} into
- * 'responseD' if no one else had called 'responseD' before; otherwise, do nothing.
- */
- final class GetMasterRegistrationCB implements Callback<Void, GetMasterRegistrationResponse> {
- private final HostAndPort hostAndPort;
-
- public GetMasterRegistrationCB(HostAndPort hostAndPort) {
- this.hostAndPort = hostAndPort;
- }
-
- @Override
- public Void call(GetMasterRegistrationResponse r) throws Exception {
- Master.TabletLocationsPB.ReplicaPB.Builder replicaBuilder =
- Master.TabletLocationsPB.ReplicaPB.newBuilder();
-
- Master.TSInfoPB.Builder tsInfoBuilder = Master.TSInfoPB.newBuilder();
- tsInfoBuilder.addRpcAddresses(ProtobufHelper.hostAndPortToPB(hostAndPort));
- tsInfoBuilder.setPermanentUuid(r.getInstanceId().getPermanentUuid());
- replicaBuilder.setTsInfo(tsInfoBuilder);
- if (r.getRole().equals(Metadata.RaftPeerPB.Role.LEADER)) {
- replicaBuilder.setRole(r.getRole());
- Master.TabletLocationsPB.Builder locationBuilder = Master.TabletLocationsPB.newBuilder();
- locationBuilder.setPartition(
- Common.PartitionPB.newBuilder().setPartitionKeyStart(ByteString.EMPTY)
- .setPartitionKeyEnd(ByteString.EMPTY));
- locationBuilder.setTabletId(
- ByteString.copyFromUtf8(AsyncKuduClient.MASTER_TABLE_NAME_PLACEHOLDER));
- locationBuilder.addReplicas(replicaBuilder);
- // No one else has called this before us.
- if (responseDCalled.compareAndSet(false, true)) {
- responseD.callback(
- Master.GetTableLocationsResponsePB.newBuilder().addTabletLocations(
- locationBuilder.build()).build()
- );
- } else {
- LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
- hostAndPort.toString());
- }
- } else {
- incrementCountAndCheckExhausted();
- }
- return null;
- }
-
- @Override
- public String toString() {
- return "get master registration for " + hostAndPort.toString();
- }
- }
-
- /**
- * Errback for each GetMasterRegistrationRequest sent in getMasterTableLocations() above.
- * Stores each exception in 'exceptionsReceived'. Increments 'countResponseReceived': if
- * the count is equal to the number of masters and no one else had called 'responseD' before,
- * pass a {@link NoLeaderFoundException} into 'responseD'; otherwise, do
- * nothing.
- */
- final class GetMasterRegistrationErrCB implements Callback<Void, Exception> {
- private final HostAndPort hostAndPort;
-
- public GetMasterRegistrationErrCB(HostAndPort hostAndPort) {
- this.hostAndPort = hostAndPort;
- }
-
- @Override
- public Void call(Exception e) throws Exception {
- LOG.warn("Error receiving a response from: " + hostAndPort, e);
- exceptionsReceived.add(e);
- incrementCountAndCheckExhausted();
- return null;
- }
-
- @Override
- public String toString() {
- return "get master registration errback for " + hostAndPort.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
deleted file mode 100644
index bf5c864..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationRequest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import static org.apache.kudu.consensus.Metadata.RaftPeerPB;
-import static org.apache.kudu.master.Master.GetMasterRegistrationRequestPB;
-import static org.apache.kudu.master.Master.GetMasterRegistrationResponsePB;
-import static org.apache.kudu.master.Master.MasterErrorPB;
-
-import com.google.protobuf.Message;
-
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.util.Pair;
-
-/**
- * Package-private RPC that can only go to master.
- */
-@InterfaceAudience.Private
-public class GetMasterRegistrationRequest extends KuduRpc<GetMasterRegistrationResponse> {
- private static final String GET_MASTER_REGISTRATION = "GetMasterRegistration";
-
- public GetMasterRegistrationRequest(KuduTable masterTable) {
- super(masterTable);
- }
-
- @Override
- Message createRequestPB() {
- return GetMasterRegistrationRequestPB.getDefaultInstance();
- }
-
- @Override
- String serviceName() {
- return MASTER_SERVICE_NAME;
- }
-
- @Override
- String method() {
- return GET_MASTER_REGISTRATION;
- }
-
- @Override
- Pair<GetMasterRegistrationResponse, Object> deserialize(CallResponse callResponse,
- String tsUUID) throws KuduException {
- final GetMasterRegistrationResponsePB.Builder respBuilder =
- GetMasterRegistrationResponsePB.newBuilder();
- readProtobuf(callResponse.getPBMessage(), respBuilder);
- RaftPeerPB.Role role = RaftPeerPB.Role.FOLLOWER;
- if (!respBuilder.hasError() || respBuilder.getError().getCode() !=
- MasterErrorPB.Code.CATALOG_MANAGER_NOT_INITIALIZED) {
- role = respBuilder.getRole();
- }
- GetMasterRegistrationResponse response = new GetMasterRegistrationResponse(
- deadlineTracker.getElapsedMillis(),
- tsUUID,
- role,
- respBuilder.getRegistration(),
- respBuilder.getInstanceId());
- return new Pair<GetMasterRegistrationResponse, Object>(
- response, respBuilder.hasError() ? respBuilder.getError() : null);
- }
-}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
deleted file mode 100644
index 7fa9f0c..0000000
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetMasterRegistrationResponse.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import org.apache.kudu.WireProtocol;
-import org.apache.kudu.annotations.InterfaceAudience;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
-
-/**
- * Response for {@link GetMasterRegistrationRequest}.
- */
-@InterfaceAudience.Private
-public class GetMasterRegistrationResponse extends KuduRpcResponse {
-
- private final Metadata.RaftPeerPB.Role role;
- private final WireProtocol.ServerRegistrationPB serverRegistration;
- private final WireProtocol.NodeInstancePB instanceId;
-
- /**
- * Describes a response to a {@link GetMasterRegistrationRequest}, built from
- * {@link Master.GetMasterRegistrationResponsePB}.
- *
- * @param role Master's role in the config.
- * @param serverRegistration server registration (RPC and HTTP addresses) for this master.
- * @param instanceId Node instance (permanent uuid and
- */
- public GetMasterRegistrationResponse(long elapsedMillis, String tsUUID,
- Metadata.RaftPeerPB.Role role,
- WireProtocol.ServerRegistrationPB serverRegistration,
- WireProtocol.NodeInstancePB instanceId) {
- super(elapsedMillis, tsUUID);
- this.role = role;
- this.serverRegistration = serverRegistration;
- this.instanceId = instanceId;
- }
-
- /**
- * Returns this master's role in the config.
- *
- * @see Metadata.RaftPeerPB.Role
- * @return Node's role in the cluster, or FOLLOWER if the node is not initialized.
- */
- public Metadata.RaftPeerPB.Role getRole() {
- return role;
- }
-
- /**
- * Returns the server registration (list of RPC and HTTP ports) for this master.
- *
- * @return The {@link WireProtocol.ServerRegistrationPB} object for this master.
- */
- public WireProtocol.ServerRegistrationPB getServerRegistration() {
- return serverRegistration;
- }
-
- /**
- * The node instance (initial sequence number and permanent uuid) for this master.
- *
- * @return The {@link WireProtocol.NodeInstancePB} object for this master.
- */
- public WireProtocol.NodeInstancePB getInstanceId() {
- return instanceId;
- }
-
- @Override
- public String toString() {
- return "GetMasterRegistrationResponse{" +
- "role=" + role +
- ", serverRegistration=" + serverRegistration +
- ", instanceId=" + instanceId +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index b107fbb..af4e2b3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -536,7 +536,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
if (error.getCode() == Master.MasterErrorPB.Code.NOT_THE_LEADER) {
kuduClient.handleNotLeader(rpc, new RecoverableException(status), this);
} else if (code == WireProtocol.AppStatusPB.ErrorCode.SERVICE_UNAVAILABLE) {
- if (rpc instanceof GetMasterRegistrationRequest) {
+ if (rpc instanceof ConnectToMasterRequest) {
// Special case:
// We never want to retry this RPC, we only use it to poke masters to learn where the leader
// is. If the error is truly non recoverable, it'll be handled later.
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
new file mode 100644
index 0000000..eb10443
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestConnectToCluster.java
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.FOLLOWER;
+import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.LEADER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.HostAndPort;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.junit.Test;
+
+import org.apache.kudu.WireProtocol;
+import org.apache.kudu.consensus.Metadata;
+import org.apache.kudu.master.Master;
+
+public class TestConnectToCluster {
+
+ private static final List<HostAndPort> MASTERS = ImmutableList.of(
+ HostAndPort.fromParts("0", 9000),
+ HostAndPort.fromParts("1", 9000),
+ HostAndPort.fromParts("2", 9000));
+
+ @Test(timeout = 10000)
+ public void test() throws Exception {
+ NonRecoverableException reusableNRE = new NonRecoverableException(
+ Status.RuntimeError(""));
+ RecoverableException reusableRE = new RecoverableException(
+ Status.RuntimeError(""));
+ NoLeaderFoundException retryResponse =
+ new NoLeaderFoundException(Status.RuntimeError(""));
+ // We don't test for a particular good response, so as long as we pass something that's not an
+ // exception to runTest() we're good.
+ Object successResponse = new Object();
+
+ // Success cases.
+
+ // Normal case.
+ runTest(
+ makeGMRR(LEADER),
+ makeGMRR(FOLLOWER),
+ makeGMRR(FOLLOWER),
+ successResponse);
+
+ // Permutation works too.
+ runTest(
+ makeGMRR(FOLLOWER),
+ makeGMRR(LEADER),
+ makeGMRR(FOLLOWER),
+ successResponse);
+
+ // Multiple leaders, that's fine since it might be a TOCTOU situation, or one master
+ // is confused. Raft handles this if the client then tries to do something that requires a
+ // replication on the master-side.
+ runTest(
+ makeGMRR(LEADER),
+ makeGMRR(LEADER),
+ makeGMRR(FOLLOWER),
+ successResponse);
+
+ // Mixed bag, still works because there's a leader.
+ runTest(
+ reusableNRE,
+ makeGMRR(FOLLOWER),
+ makeGMRR(LEADER),
+ successResponse);
+
+ // All unreachable except one leader, still good.
+ runTest(
+ reusableNRE,
+ reusableNRE,
+ makeGMRR(LEADER),
+ successResponse);
+
+ // Permutation of the previous.
+ runTest(
+ reusableNRE,
+ makeGMRR(LEADER),
+ reusableNRE,
+ successResponse);
+
+ // Retry cases.
+
+ // Just followers means we retry.
+ runTest(
+ makeGMRR(FOLLOWER),
+ makeGMRR(FOLLOWER),
+ makeGMRR(FOLLOWER),
+ retryResponse);
+
+ // One NRE but we have responsive masters, retry.
+ runTest(
+ makeGMRR(FOLLOWER),
+ makeGMRR(FOLLOWER),
+ reusableNRE,
+ retryResponse);
+
+ // One good master but no leader, retry.
+ runTest(
+ reusableNRE,
+ makeGMRR(FOLLOWER),
+ reusableNRE,
+ retryResponse);
+
+ // Different case but same outcome.
+ runTest(
+ reusableRE,
+ reusableNRE,
+ makeGMRR(FOLLOWER),
+ retryResponse);
+
+ // All recoverable means retry.
+ runTest(
+ reusableRE,
+ reusableRE,
+ reusableRE,
+ retryResponse);
+
+ // Just one recoverable still means retry.
+ runTest(
+ reusableRE,
+ reusableNRE,
+ reusableNRE,
+ retryResponse);
+
+ // Failure case.
+
+ // Can't recover anything, give up.
+ runTest(
+ reusableNRE,
+ reusableNRE,
+ reusableNRE,
+ reusableNRE);
+ }
+
+ private void runTest(Object response0,
+ Object response1,
+ Object response2,
+ Object expectedResponse) throws Exception {
+
+ // Here we basically do what AsyncKuduClient would do, add all the callbacks and then we also
+ // add the responses. We then check for the right response.
+
+ Deferred<Master.GetTableLocationsResponsePB> d = new Deferred<>();
+
+ ConnectToCluster grrm = new ConnectToCluster(MASTERS, d);
+
+ Callback<Void, ConnectToClusterResponse> cb0 = grrm.callbackForNode(MASTERS.get(0));
+ Callback<Void, ConnectToClusterResponse> cb1 = grrm.callbackForNode(MASTERS.get(1));
+ Callback<Void, ConnectToClusterResponse> cb2 = grrm.callbackForNode(MASTERS.get(2));
+
+ Callback<Void, Exception> eb0 = grrm.errbackForNode(MASTERS.get(0));
+ Callback<Void, Exception> eb1 = grrm.errbackForNode(MASTERS.get(1));
+ Callback<Void, Exception> eb2 = grrm.errbackForNode(MASTERS.get(2));
+
+ callTheRightCallback(cb0, eb0, response0);
+ callTheRightCallback(cb1, eb1, response1);
+ callTheRightCallback(cb2, eb2, response2);
+
+ try {
+ d.join(); // Don't care about the response.
+ if ((expectedResponse instanceof Exception)) {
+ fail("Should not work " + expectedResponse.getClass());
+ } else {
+ // ok
+ }
+ } catch (Exception ex) {
+ assertEquals(expectedResponse.getClass(), ex.getClass());
+ }
+ }
+
+ // Helper method that determines if the callback or errback should be called.
+ private static void callTheRightCallback(
+ Callback<Void, ConnectToClusterResponse> cb,
+ Callback<Void, Exception> eb,
+ Object response) throws Exception {
+ if (response instanceof Exception) {
+ eb.call((Exception) response);
+ } else {
+ cb.call((ConnectToClusterResponse) response);
+ }
+ }
+
+ private static ConnectToClusterResponse makeGMRR(Metadata.RaftPeerPB.Role role) {
+ return new ConnectToClusterResponse(0, "", role, null,
+ WireProtocol.NodeInstancePB.getDefaultInstance());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/7027a168/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
deleted file mode 100644
index b57a18c..0000000
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestGetMasterRegistrationReceived.java
+++ /dev/null
@@ -1,208 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.kudu.client;
-
-import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.FOLLOWER;
-import static org.apache.kudu.consensus.Metadata.RaftPeerPB.Role.LEADER;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.List;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
-import com.stumbleupon.async.Callback;
-import com.stumbleupon.async.Deferred;
-import org.junit.Test;
-
-import org.apache.kudu.WireProtocol;
-import org.apache.kudu.consensus.Metadata;
-import org.apache.kudu.master.Master;
-
-public class TestGetMasterRegistrationReceived {
-
- private static final List<HostAndPort> MASTERS = ImmutableList.of(
- HostAndPort.fromParts("0", 9000),
- HostAndPort.fromParts("1", 9000),
- HostAndPort.fromParts("2", 9000));
-
- @Test(timeout = 10000)
- public void test() throws Exception {
- NonRecoverableException reusableNRE = new NonRecoverableException(
- Status.RuntimeError(""));
- RecoverableException reusableRE = new RecoverableException(
- Status.RuntimeError(""));
- NoLeaderFoundException retryResponse =
- new NoLeaderFoundException(Status.RuntimeError(""));
- // We don't test for a particular good response, so as long as we pass something that's not an
- // exception to runTest() we're good.
- Object successResponse = new Object();
-
- // Success cases.
-
- // Normal case.
- runTest(
- makeGMRR(LEADER),
- makeGMRR(FOLLOWER),
- makeGMRR(FOLLOWER),
- successResponse);
-
- // Permutation works too.
- runTest(
- makeGMRR(FOLLOWER),
- makeGMRR(LEADER),
- makeGMRR(FOLLOWER),
- successResponse);
-
- // Multiple leaders, that's fine since it might be a TOCTOU situation, or one master
- // is confused. Raft handles this if the client then tries to do something that requires a
- // replication on the master-side.
- runTest(
- makeGMRR(LEADER),
- makeGMRR(LEADER),
- makeGMRR(FOLLOWER),
- successResponse);
-
- // Mixed bag, still works because there's a leader.
- runTest(
- reusableNRE,
- makeGMRR(FOLLOWER),
- makeGMRR(LEADER),
- successResponse);
-
- // All unreachable except one leader, still good.
- runTest(
- reusableNRE,
- reusableNRE,
- makeGMRR(LEADER),
- successResponse);
-
- // Permutation of the previous.
- runTest(
- reusableNRE,
- makeGMRR(LEADER),
- reusableNRE,
- successResponse);
-
- // Retry cases.
-
- // Just followers means we retry.
- runTest(
- makeGMRR(FOLLOWER),
- makeGMRR(FOLLOWER),
- makeGMRR(FOLLOWER),
- retryResponse);
-
- // One NRE but we have responsive masters, retry.
- runTest(
- makeGMRR(FOLLOWER),
- makeGMRR(FOLLOWER),
- reusableNRE,
- retryResponse);
-
- // One good master but no leader, retry.
- runTest(
- reusableNRE,
- makeGMRR(FOLLOWER),
- reusableNRE,
- retryResponse);
-
- // Different case but same outcome.
- runTest(
- reusableRE,
- reusableNRE,
- makeGMRR(FOLLOWER),
- retryResponse);
-
- // All recoverable means retry.
- runTest(
- reusableRE,
- reusableRE,
- reusableRE,
- retryResponse);
-
- // Just one recoverable still means retry.
- runTest(
- reusableRE,
- reusableNRE,
- reusableNRE,
- retryResponse);
-
- // Failure case.
-
- // Can't recover anything, give up.
- runTest(
- reusableNRE,
- reusableNRE,
- reusableNRE,
- reusableNRE);
- }
-
- private void runTest(Object response0,
- Object response1,
- Object response2,
- Object expectedResponse) throws Exception {
-
- // Here we basically do what AsyncKuduClient would do, add all the callbacks and then we also
- // add the responses. We then check for the right response.
-
- Deferred<Master.GetTableLocationsResponsePB> d = new Deferred<>();
-
- GetMasterRegistrationReceived grrm = new GetMasterRegistrationReceived(MASTERS, d);
-
- Callback<Void, GetMasterRegistrationResponse> cb0 = grrm.callbackForNode(MASTERS.get(0));
- Callback<Void, GetMasterRegistrationResponse> cb1 = grrm.callbackForNode(MASTERS.get(1));
- Callback<Void, GetMasterRegistrationResponse> cb2 = grrm.callbackForNode(MASTERS.get(2));
-
- Callback<Void, Exception> eb0 = grrm.errbackForNode(MASTERS.get(0));
- Callback<Void, Exception> eb1 = grrm.errbackForNode(MASTERS.get(1));
- Callback<Void, Exception> eb2 = grrm.errbackForNode(MASTERS.get(2));
-
- callTheRightCallback(cb0, eb0, response0);
- callTheRightCallback(cb1, eb1, response1);
- callTheRightCallback(cb2, eb2, response2);
-
- try {
- d.join(); // Don't care about the response.
- if ((expectedResponse instanceof Exception)) {
- fail("Should not work " + expectedResponse.getClass());
- } else {
- // ok
- }
- } catch (Exception ex) {
- assertEquals(expectedResponse.getClass(), ex.getClass());
- }
- }
-
- // Helper method that determines if the callback or errback should be called.
- private static void callTheRightCallback(
- Callback<Void, GetMasterRegistrationResponse> cb,
- Callback<Void, Exception> eb,
- Object response) throws Exception {
- if (response instanceof Exception) {
- eb.call((Exception) response);
- } else {
- cb.call((GetMasterRegistrationResponse) response);
- }
- }
-
- private static GetMasterRegistrationResponse makeGMRR(Metadata.RaftPeerPB.Role role) {
- return new GetMasterRegistrationResponse(0, "", role, null,
- WireProtocol.NodeInstancePB.getDefaultInstance());
- }
-}