You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zookeeper.apache.org by JonathanO <gi...@git.apache.org> on 2018/03/07 10:44:28 UTC
[GitHub] zookeeper pull request #456: ZOOKEEPER-2930: Leader cannot be elected due to...
Github user JonathanO commented on a diff in the pull request:
https://github.com/apache/zookeeper/pull/456#discussion_r172802731
--- Diff: src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java ---
@@ -318,76 +318,167 @@ public Thread newThread(Runnable r) {
*/
public void testInitiateConnection(long sid) throws Exception {
LOG.debug("Opening channel to server " + sid);
- Socket sock = new Socket();
- setSockOpts(sock);
- sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
- initiateConnection(sock, sid);
+ initiateConnection(sid, self.getVotingView().get(sid).electionAddr);
+ }
+
+ private Socket openChannel(long sid, InetSocketAddress electionAddr) {
+ LOG.debug("Opening channel to server " + sid);
+ try {
+ final Socket sock = new Socket();
+ setSockOpts(sock);
+ sock.connect(electionAddr, cnxTO);
+ LOG.debug("Connected to server " + sid);
+ return sock;
+ } catch (UnresolvedAddressException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, also UAE cannot be wrapped cleanly
+ // so we log the exception in order to capture this critical
+ // detail.
+ LOG.warn("Cannot open channel to " + sid
+ + " at election address " + electionAddr, e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn("Cannot open channel to " + sid
+ + " at election address " + electionAddr,
+ e);
+ return null;
+ }
}
/**
* If this server has initiated the connection, then it gives up on the
* connection if it loses challenge. Otherwise, it keeps the connection.
*/
- public void initiateConnection(final Socket sock, final Long sid) {
+ public boolean initiateConnection(final Long sid, InetSocketAddress electionAddr) {
try {
- startConnection(sock, sid);
- } catch (IOException e) {
- LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
- new Object[] { sid, sock.getRemoteSocketAddress() }, e);
- closeSocket(sock);
- return;
+ Socket sock = openChannel(sid, electionAddr);
+ if (sock != null) {
+ try {
+ startConnection(sock, sid);
+ } catch (IOException e) {
+ LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
+ new Object[]{sid, sock.getRemoteSocketAddress()}, e);
+ closeSocket(sock);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } finally {
+ inprogressConnections.remove(sid);
}
}
- /**
- * Server will initiate the connection request to its peer server
- * asynchronously via separate connection thread.
- */
- public void initiateConnectionAsync(final Socket sock, final Long sid) {
+ synchronized private void connectOneAsync(final Long sid, final ZooKeeperThread connectorThread) {
+ if (senderWorkerMap.get(sid) != null) {
+ LOG.debug("There is a connection already for server " + sid);
+ return;
+ }
if(!inprogressConnections.add(sid)){
// simply return as there is a connection request to
// server 'sid' already in progress.
LOG.debug("Connection request to server id: {} is already in progress, so skipping this request",
sid);
- closeSocket(sock);
return;
}
try {
- connectionExecutor.execute(
- new QuorumConnectionReqThread(sock, sid));
+ connectionExecutor.execute(connectorThread);
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
// Imp: Safer side catching all type of exceptions and remove 'sid'
// from inprogress connections. This is to avoid blocking further
// connection requests from this 'sid' in case of errors.
inprogressConnections.remove(sid);
LOG.error("Exception while submitting quorum connection request", e);
- closeSocket(sock);
}
}
+ /**
+ * Try to establish a connection to server with id sid using its electionAddr.
+ *
+ * Server will initiate the connection request to its peer server
+ * asynchronously via separate connection thread.
+ *
+ * @param sid server id
+ * @param electionAddr election address
+ */
+ private void connectOne(final Long sid, InetSocketAddress electionAddr) {
+ connectOneAsync(sid, new QuorumConnectionReqThread(sid, electionAddr));
+ }
+
+ /**
+ * Try to establish a connection to server with id sid.
+ *
+ * Server will initiate the connection request to its peer server
+ * asynchronously via separate connection thread.
+ *
+ * @param sid server id
+ */
+ public void connectOne(final Long sid) {
+ connectOneAsync(sid, new QuorumConnectionReqBySidThread(sid));
+ }
+
/**
* Thread to send connection request to peer server.
*/
- private class QuorumConnectionReqThread extends ZooKeeperThread {
- final Socket sock;
+ private class QuorumConnectionReqBySidThread extends ZooKeeperThread {
final Long sid;
- QuorumConnectionReqThread(final Socket sock, final Long sid) {
+
+ QuorumConnectionReqBySidThread(final Long sid) {
super("QuorumConnectionReqThread-" + sid);
- this.sock = sock;
this.sid = sid;
}
@Override
public void run() {
- try{
- initiateConnection(sock, sid);
- } finally {
- inprogressConnections.remove(sid);
+ synchronized (self.QV_LOCK) {
+ boolean knownId = false;
+ // Resolve hostname for the remote server before attempting to
+ // connect in case the underlying ip address has changed.
+ self.recreateSocketAddresses(sid);
+ Map<Long, QuorumPeer.QuorumServer> lastCommittedView = self.getView();
+ QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
+ Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
+ if (lastCommittedView.containsKey(sid)) {
+ knownId = true;
+ if (initiateConnection(sid, lastCommittedView.get(sid).electionAddr)) {
+ return;
+ }
+ }
+ if (lastSeenQV != null && lastProposedView.containsKey(sid)
+ && (!knownId || (lastProposedView.get(sid).electionAddr !=
+ lastCommittedView.get(sid).electionAddr))) {
+ knownId = true;
+ if (initiateConnection(sid, lastProposedView.get(sid).electionAddr)) {
+ return;
+ }
+ }
+ if (!knownId) {
+ LOG.warn("Invalid server id: " + sid);
+ return;
+ }
}
}
}
+ /**
+ * Thread to send connection request to peer server.
+ */
+ private class QuorumConnectionReqThread extends ZooKeeperThread {
--- End diff --
Are you OK with this approach, or would you prefer me to change it?
---