You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2021/10/23 04:21:34 UTC
[bookkeeper] branch master updated: Remove direct ZK access for
Auditor (#2842)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 746f9f6 Remove direct ZK access for Auditor (#2842)
746f9f6 is described below
commit 746f9f6ff5f54203a6f1c25b7d0f02642455ea04
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Oct 22 21:21:27 2021 -0700
Remove direct ZK access for Auditor (#2842)
* Remove direct ZK access for Auditor
* Fixed unused imports
* Fixed checkstyle
* Fixed checkstyle in tests
---
.../org/apache/bookkeeper/client/BookKeeper.java | 10 +-
.../apache/bookkeeper/client/BookKeeperAdmin.java | 27 +-
.../bookkeeper/meta/FlatLedgerManagerFactory.java | 8 +
.../bookkeeper/meta/LedgerAuditorManager.java | 52 ++++
.../bookkeeper/meta/LedgerManagerFactory.java | 10 +
.../LegacyHierarchicalLedgerManagerFactory.java | 8 +
.../bookkeeper/meta/MSLedgerManagerFactory.java | 8 +
.../bookkeeper/meta/ZkLedgerAuditorManager.java | 279 +++++++++++++++++++++
.../bookkeeper/replication/AuditorElector.java | 265 ++++---------------
.../server/http/BKHttpServiceProvider.java | 17 +-
.../server/http/service/WhoIsAuditorService.java | 13 +-
.../commands/autorecovery/WhoIsAuditorCommand.java | 52 ++--
.../bookkeeper/client/BookKeeperTestClient.java | 12 +-
.../apache/bookkeeper/client/MockBookKeeper.java | 5 -
.../bookkeeper/client/TestBookieWatcher.java | 2 +-
.../replication/AuditorRollingRestartTest.java | 6 +-
.../replication/AutoRecoveryMainTest.java | 51 ++--
.../autorecovery/WhoIsAuditorCommandTest.java | 13 +-
18 files changed, 519 insertions(+), 319 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index c34defa..fdefd28 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -81,7 +81,6 @@ import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
-import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieAddressResolver;
@@ -636,6 +635,11 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
}
@VisibleForTesting
+ public LedgerManagerFactory getLedgerManagerFactory() {
+ return ledgerManagerFactory;
+ }
+
+ @VisibleForTesting
LedgerManager getUnderlyingLedgerManager() {
return ((CleanupLedgerManager) ledgerManager).getUnderlying();
}
@@ -743,10 +747,6 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
}
}
- ZooKeeper getZkHandle() {
- return ((ZKMetadataClientDriver) metadataDriver).getZk();
- }
-
protected ClientConfiguration getConf() {
return conf;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 378df4b..91e2552 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.meta.LedgerAuditorManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -75,7 +76,6 @@ import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
-import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.BookieLedgerIndexer;
import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -113,6 +113,8 @@ public class BookKeeperAdmin implements AutoCloseable {
*/
private LedgerUnderreplicationManager underreplicationManager;
+ private LedgerAuditorManager ledgerAuditorManager;
+
/**
* Constructor that takes in a ZooKeeper servers connect string so we know
* how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -198,6 +200,14 @@ public class BookKeeperAdmin implements AutoCloseable {
if (ownsBK) {
bkc.close();
}
+
+ if (ledgerAuditorManager != null) {
+ try {
+ ledgerAuditorManager.close();
+ } catch (Exception e) {
+ throw new BKException.MetaStoreException(e);
+ }
+ }
}
/**
@@ -1401,6 +1411,14 @@ public class BookKeeperAdmin implements AutoCloseable {
return underreplicationManager;
}
+ private LedgerAuditorManager getLedgerAuditorManager()
+ throws IOException, InterruptedException {
+ if (ledgerAuditorManager == null) {
+ ledgerAuditorManager = mFactory.newLedgerAuditorManager();
+ }
+ return ledgerAuditorManager;
+ }
+
/**
* Setter for LostBookieRecoveryDelay value (in seconds) in Zookeeper.
*
@@ -1452,8 +1470,7 @@ public class BookKeeperAdmin implements AutoCloseable {
throw new UnavailableException("Autorecovery is disabled. So giving up!");
}
- BookieId auditorId =
- AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.getConf()), bkc.getZkHandle());
+ BookieId auditorId = getLedgerAuditorManager().getCurrentAuditor();
if (auditorId == null) {
LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
@@ -1706,4 +1723,8 @@ public class BookKeeperAdmin implements AutoCloseable {
long ledgerId) {
return bkc.getBookieClient().getListOfEntriesOfLedger(address, ledgerId);
}
+
+ public BookieId getCurrentAuditor() throws IOException, InterruptedException {
+ return getLedgerAuditorManager().getCurrentAuditor();
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index 19ac418..e613082 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.util.List;
import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
@@ -87,4 +89,10 @@ public class FlatLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
return new ZkLedgerUnderreplicationManager(conf, zk);
}
+
+ @Override
+ public LedgerAuditorManager newLedgerAuditorManager() {
+ ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
+ return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java
new file mode 100644
index 0000000..b1b2fa0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.apache.bookkeeper.net.BookieId;
+
+/**
+ * Interface to handle the ledger auditor election.
+ */
+public interface LedgerAuditorManager extends AutoCloseable {
+
+ /**
+ * Events that can be triggered by the LedgerAuditorManager.
+ */
+ enum AuditorEvent {
+ SessionLost,
+ VoteWasDeleted,
+ }
+
+ /**
+ * Try to become the auditor. If there's already another auditor, it will wait until this
+ * current instance has become the auditor.
+ *
+ * @param bookieId the identifier for current bookie
+ * @param listener listener that will receive AuditorEvent notifications
+ * @return
+ */
+ void tryToBecomeAuditor(String bookieId, Consumer<AuditorEvent> listener) throws IOException, InterruptedException;
+
+ /**
+ * Return the information regarding the current auditor.
+ * @return
+ */
+ BookieId getCurrentAuditor() throws IOException, InterruptedException;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 80d3a65..d213235 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -86,6 +86,16 @@ public interface LedgerManagerFactory extends AutoCloseable {
LedgerUnderreplicationManager newLedgerUnderreplicationManager()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException;
+
+ /**
+ * Return a ledger auditor manager, which is used to
+ * coordinate the auto-recovery process.
+ *
+ * @return ledger auditor manager
+ * @see LedgerAuditorManager
+ */
+ LedgerAuditorManager newLedgerAuditorManager() throws IOException, InterruptedException;
+
/**
* Format the ledger metadata for LedgerManager.
*
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
index 9157973..a218ef3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.util.List;
import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
@@ -79,6 +81,12 @@ public class LegacyHierarchicalLedgerManagerFactory extends AbstractZkLedgerMana
}
@Override
+ public LedgerAuditorManager newLedgerAuditorManager() {
+ ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
+ return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
+ }
+
+ @Override
public LedgerManager newLedgerManager() {
return new LegacyHierarchicalLedgerManager(conf, zk);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index e15e0a5..3ad303a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.metastore.MSException;
import org.apache.bookkeeper.metastore.MSWatchedEvent;
@@ -60,6 +61,7 @@ import org.apache.bookkeeper.metastore.Value;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.bookkeeper.util.ZkUtils;
@@ -815,4 +817,10 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
zkServers, zkLedgersRootPath);
return true;
}
+
+ @Override
+ public LedgerAuditorManager newLedgerAuditorManager() {
+ ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
+ return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
+ }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java
new file mode 100644
index 0000000..fa1fcd4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat;
+import static org.apache.bookkeeper.replication.ReplicationStats.ELECTION_ATTEMPTS;
+import com.google.protobuf.TextFormat;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * ZK based implementation of LedgerAuditorManager.
+ */
+@Slf4j
+public class ZkLedgerAuditorManager implements LedgerAuditorManager {
+
+ private final ZooKeeper zkc;
+ private final ServerConfiguration conf;
+ private final String basePath;
+ private final String electionPath;
+
+ private String myVote;
+
+ private static final String ELECTION_ZNODE = "auditorelection";
+
+ // Represents the index of the auditor node
+ private static final int AUDITOR_INDEX = 0;
+ // Represents vote prefix
+ private static final String VOTE_PREFIX = "V_";
+ // Represents path Separator
+ private static final String PATH_SEPARATOR = "/";
+
+ private volatile Consumer<AuditorEvent> listener;
+ private volatile boolean isClosed = false;
+
+ // Expose Stats
+ @StatsDoc(
+ name = ELECTION_ATTEMPTS,
+ help = "The number of auditor election attempts"
+ )
+ private final Counter electionAttempts;
+
+ public ZkLedgerAuditorManager(ZooKeeper zkc, ServerConfiguration conf, StatsLogger statsLogger) {
+ this.zkc = zkc;
+ this.conf = conf;
+
+ this.basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
+ + BookKeeperConstants.UNDER_REPLICATION_NODE;
+ this.electionPath = basePath + '/' + ELECTION_ZNODE;
+ this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS);
+ }
+
+ @Override
+ public void tryToBecomeAuditor(String bookieId, Consumer<AuditorEvent> listener)
+ throws IOException, InterruptedException {
+ this.listener = listener;
+ createElectorPath();
+
+ try {
+ while (!isClosed) {
+ createMyVote(bookieId);
+
+ List<String> children = zkc.getChildren(getVotePath(""), false);
+ if (0 >= children.size()) {
+ throw new IllegalArgumentException(
+ "At least one bookie server should present to elect the Auditor!");
+ }
+
+ // sorting in ascending order of sequential number
+ Collections.sort(children, new ElectionComparator());
+ String voteNode = StringUtils.substringAfterLast(myVote, PATH_SEPARATOR);
+
+ if (children.get(AUDITOR_INDEX).equals(voteNode)) {
+ // We have been elected as the auditor
+ // update the auditor bookie id in the election path. This is
+ // done for debugging purpose
+ AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
+ .setBookieId(bookieId);
+
+ zkc.setData(getVotePath(""),
+ builder.build().toString().getBytes(UTF_8), -1);
+ return;
+ } else {
+ // If not an auditor, will be watching to my predecessor and
+ // looking the previous node deletion.
+ int myIndex = children.indexOf(voteNode);
+ if (myIndex < 0) {
+ throw new IllegalArgumentException("My vote has disappeared");
+ }
+
+ int prevNodeIndex = myIndex - 1;
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ if (null == zkc.exists(getVotePath(PATH_SEPARATOR)
+ + children.get(prevNodeIndex), event -> latch.countDown())) {
+ // While adding, the previous znode doesn't exists.
+ // Again going to election.
+ continue;
+ }
+
+ // Wait for the previous auditor in line to be deleted
+ latch.await();
+ }
+
+ electionAttempts.inc();
+ }
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public BookieId getCurrentAuditor() throws IOException, InterruptedException {
+ String electionRoot = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
+ + BookKeeperConstants.UNDER_REPLICATION_NODE + '/' + ELECTION_ZNODE;
+
+ try {
+ List<String> children = zkc.getChildren(electionRoot, false);
+ Collections.sort(children, new ElectionComparator());
+ if (children.size() < 1) {
+ return null;
+ }
+ String ledger = electionRoot + "/" + children.get(AUDITOR_INDEX);
+ byte[] data = zkc.getData(ledger, false, null);
+
+ AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder();
+ TextFormat.merge(new String(data, UTF_8), builder);
+ AuditorVoteFormat v = builder.build();
+ return BookieId.parse(v.getBookieId());
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ log.info("Shutting down AuditorElector");
+ isClosed = true;
+ if (myVote != null) {
+ try {
+ zkc.delete(myVote, -1);
+ } catch (KeeperException.NoNodeException nne) {
+ // Ok
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ log.warn("InterruptedException while deleting myVote: " + myVote,
+ ie);
+ } catch (KeeperException ke) {
+ log.error("Exception while deleting myVote:" + myVote, ke);
+ }
+ }
+ }
+
+ private void createMyVote(String bookieId) throws IOException, InterruptedException {
+ List<ACL> zkAcls = ZkUtils.getACLs(conf);
+ AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
+ .setBookieId(bookieId);
+
+ try {
+ if (null == myVote || null == zkc.exists(myVote, false)) {
+ myVote = zkc.create(getVotePath(PATH_SEPARATOR + VOTE_PREFIX),
+ builder.build().toString().getBytes(UTF_8), zkAcls,
+ CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void createElectorPath() throws IOException {
+ try {
+ List<ACL> zkAcls = ZkUtils.getACLs(conf);
+ if (zkc.exists(basePath, false) == null) {
+ try {
+ zkc.create(basePath, new byte[0], zkAcls,
+ CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException nee) {
+ // do nothing, someone else could have created it
+ }
+ }
+ if (zkc.exists(getVotePath(""), false) == null) {
+ try {
+ zkc.create(getVotePath(""), new byte[0],
+ zkAcls, CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException nee) {
+ // do nothing, someone else could have created it
+ }
+ }
+ } catch (KeeperException ke) {
+ throw new IOException("Failed to initialize Auditor Elector", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Failed to initialize Auditor Elector", ie);
+ }
+ }
+
+ private String getVotePath(String vote) {
+ return electionPath + vote;
+ }
+
+ private void handleZkWatch(WatchedEvent event) {
+ if (isClosed) {
+ return;
+ }
+
+ if (event.getState() == Watcher.Event.KeeperState.Expired) {
+ log.error("Lost ZK connection, shutting down");
+
+ listener.accept(AuditorEvent.SessionLost);
+ } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
+ listener.accept(AuditorEvent.VoteWasDeleted);
+ }
+ }
+
+ /**
+ * Compare the votes in the ascending order of the sequence number. Vote
+ * format is 'V_sequencenumber', comparator will do sorting based on the
+ * numeric sequence value.
+ */
+ private static class ElectionComparator
+ implements Comparator<String>, Serializable {
+ /**
+ * Return -1 if the first vote is less than second. Return 1 if the
+ * first vote is greater than second. Return 0 if the votes are equal.
+ */
+ @Override
+ public int compare(String vote1, String vote2) {
+ long voteSeqId1 = getVoteSequenceId(vote1);
+ long voteSeqId2 = getVoteSequenceId(vote2);
+ int result = voteSeqId1 < voteSeqId2 ? -1
+ : (voteSeqId1 > voteSeqId2 ? 1 : 0);
+ return result;
+ }
+
+ private long getVoteSequenceId(String vote) {
+ String voteId = StringUtils.substringAfter(vote, VOTE_PREFIX);
+ return Long.parseLong(voteId);
+ }
+ }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
index 7028466..eaab84c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
@@ -20,47 +20,25 @@
*/
package org.apache.bookkeeper.replication;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
-import static org.apache.bookkeeper.replication.ReplicationStats.ELECTION_ATTEMPTS;
-
import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.TextFormat;
-
import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ZkLayoutManager;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.meta.LedgerAuditorManager;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
-import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
-import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,35 +58,18 @@ import org.slf4j.LoggerFactory;
public class AuditorElector {
private static final Logger LOG = LoggerFactory
.getLogger(AuditorElector.class);
- // Represents the index of the auditor node
- private static final int AUDITOR_INDEX = 0;
- // Represents vote prefix
- private static final String VOTE_PREFIX = "V_";
- // Represents path Separator
- private static final String PATH_SEPARATOR = "/";
- private static final String ELECTION_ZNODE = "auditorelection";
- // Represents urLedger path in zk
- private final String basePath;
- // Represents auditor election path in zk
- private final String electionPath;
private final String bookieId;
private final ServerConfiguration conf;
private final BookKeeper bkc;
- private final ZooKeeper zkc;
private final boolean ownBkc;
private final ExecutorService executor;
+ private final LedgerAuditorManager ledgerAuditorManager;
- private String myVote;
Auditor auditor;
private AtomicBoolean running = new AtomicBoolean(false);
- // Expose Stats
- @StatsDoc(
- name = ELECTION_ATTEMPTS,
- help = "The number of auditor election attempts"
- )
- private final Counter electionAttempts;
+
private final StatsLogger statsLogger;
@@ -163,13 +124,12 @@ public class AuditorElector {
this.conf = conf;
this.bkc = bkc;
this.ownBkc = ownBkc;
- this.zkc = ((ZkLayoutManager) bkc.getMetadataClientDriver().getLayoutManager()).getZk();
this.statsLogger = statsLogger;
- this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS);
- basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
- + BookKeeperConstants.UNDER_REPLICATION_NODE;
- electionPath = basePath + '/' + ELECTION_ZNODE;
- createElectorPath();
+ try {
+ this.ledgerAuditorManager = bkc.getLedgerManagerFactory().newLedgerAuditorManager();
+ } catch (Exception e) {
+ throw new UnavailableException("Failed to instantiate the ledger auditor manager", e);
+ }
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -178,70 +138,6 @@ public class AuditorElector {
});
}
- private void createMyVote() throws KeeperException, InterruptedException {
- if (null == myVote || null == zkc.exists(myVote, false)) {
- List<ACL> zkAcls = ZkUtils.getACLs(conf);
- AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
- .setBookieId(bookieId);
- myVote = zkc.create(getVotePath(PATH_SEPARATOR + VOTE_PREFIX),
- builder.build().toString().getBytes(UTF_8), zkAcls,
- CreateMode.EPHEMERAL_SEQUENTIAL);
- }
- }
-
- String getMyVote() {
- return myVote;
- }
-
- private String getVotePath(String vote) {
- return electionPath + vote;
- }
-
- private void createElectorPath() throws UnavailableException {
- try {
- List<ACL> zkAcls = ZkUtils.getACLs(conf);
- if (zkc.exists(basePath, false) == null) {
- try {
- zkc.create(basePath, new byte[0], zkAcls,
- CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nee) {
- // do nothing, someone else could have created it
- }
- }
- if (zkc.exists(getVotePath(""), false) == null) {
- try {
- zkc.create(getVotePath(""), new byte[0],
- zkAcls, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nee) {
- // do nothing, someone else could have created it
- }
- }
- } catch (KeeperException ke) {
- throw new UnavailableException(
- "Failed to initialize Auditor Elector", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new UnavailableException(
- "Failed to initialize Auditor Elector", ie);
- }
- }
-
- /**
- * Watching the predecessor bookies and will do election on predecessor node
- * deletion or expiration.
- */
- private class ElectionWatcher implements Watcher {
- @Override
- public void process(WatchedEvent event) {
- if (event.getState() == KeeperState.Expired) {
- LOG.error("Lost ZK connection, shutting down");
- submitShutdownTask();
- } else if (event.getType() == EventType.NodeDeleted) {
- submitElectionTask();
- }
- }
- }
-
public Future<?> start() {
running.set(true);
return submitElectionTask();
@@ -257,17 +153,14 @@ public class AuditorElector {
if (!running.compareAndSet(true, false)) {
return;
}
- LOG.info("Shutting down AuditorElector");
- if (myVote != null) {
- try {
- zkc.delete(myVote, -1);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.warn("InterruptedException while deleting myVote: " + myVote,
- ie);
- } catch (KeeperException ke) {
- LOG.error("Exception while deleting myVote:" + myVote, ke);
- }
+
+ try {
+ ledgerAuditorManager.close();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("InterruptedException while closing ledger auditor manager", ie);
+ } catch (Exception ke) {
+ LOG.error("Exception while closing ledger auditor manager", ke);
}
}
});
@@ -288,59 +181,39 @@ public class AuditorElector {
return;
}
try {
- // creating my vote in zk. Vote format is 'V_numeric'
- createMyVote();
- List<String> children = zkc.getChildren(getVotePath(""), false);
-
- if (0 >= children.size()) {
- throw new IllegalArgumentException(
- "Atleast one bookie server should present to elect the Auditor!");
- }
-
- // sorting in ascending order of sequential number
- Collections.sort(children, new ElectionComparator());
- String voteNode = StringUtils.substringAfterLast(myVote,
- PATH_SEPARATOR);
+ ledgerAuditorManager.tryToBecomeAuditor(bookieId, e -> handleAuditorEvent(e));
- // starting Auditing service
- if (children.get(AUDITOR_INDEX).equals(voteNode)) {
- // update the auditor bookie id in the election path. This is
- // done for debugging purpose
- AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
- .setBookieId(bookieId);
-
- zkc.setData(getVotePath(""),
- builder.build().toString().getBytes(UTF_8), -1);
- auditor = new Auditor(bookieId, conf, bkc, false, statsLogger);
- auditor.start();
- } else {
- // If not an auditor, will be watching to my predecessor and
- // looking the previous node deletion.
- Watcher electionWatcher = new ElectionWatcher();
- int myIndex = children.indexOf(voteNode);
- int prevNodeIndex = myIndex - 1;
- if (null == zkc.exists(getVotePath(PATH_SEPARATOR)
- + children.get(prevNodeIndex), electionWatcher)) {
- // While adding, the previous znode doesn't exists.
- // Again going to election.
- submitElectionTask();
- }
- electionAttempts.inc();
- }
- } catch (KeeperException e) {
- LOG.error("Exception while performing auditor election", e);
- submitShutdownTask();
+ auditor = new Auditor(bookieId, conf, bkc, false, statsLogger);
+ auditor.start();
} catch (InterruptedException e) {
LOG.error("Interrupted while performing auditor election", e);
Thread.currentThread().interrupt();
submitShutdownTask();
- } catch (UnavailableException e) {
- LOG.error("Ledger underreplication manager unavailable during election", e);
+ } catch (Exception e) {
+ LOG.error("Exception while performing auditor election", e);
submitShutdownTask();
}
}
};
- return executor.submit(r);
+ try {
+ return executor.submit(r);
+ } catch (RejectedExecutionException e) {
+ LOG.debug("Executor was already closed");
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+ private void handleAuditorEvent(LedgerAuditorManager.AuditorEvent e) {
+ switch (e) {
+ case SessionLost:
+ LOG.error("Lost ZK connection, shutting down");
+ submitShutdownTask();
+ break;
+
+ case VoteWasDeleted:
+ submitElectionTask();
+ break;
+ }
}
@VisibleForTesting
@@ -348,33 +221,21 @@ public class AuditorElector {
return auditor;
}
- /**
- * Query zookeeper for the currently elected auditor.
- * @return the bookie id of the current auditor
- */
- public static BookieId getCurrentAuditor(ServerConfiguration conf, ZooKeeper zk)
- throws KeeperException, InterruptedException, IOException {
- String electionRoot = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
- + BookKeeperConstants.UNDER_REPLICATION_NODE + '/' + ELECTION_ZNODE;
-
- List<String> children = zk.getChildren(electionRoot, false);
- Collections.sort(children, new AuditorElector.ElectionComparator());
- if (children.size() < 1) {
- return null;
- }
- String ledger = electionRoot + "/" + children.get(AUDITOR_INDEX);
- byte[] data = zk.getData(ledger, false, null);
- AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder();
- TextFormat.merge(new String(data, UTF_8), builder);
- AuditorVoteFormat v = builder.build();
- return BookieId.parse(v.getBookieId());
+ public BookieId getCurrentAuditor() throws IOException, InterruptedException {
+ return ledgerAuditorManager.getCurrentAuditor();
}
/**
* Shutting down AuditorElector.
*/
public void shutdown() throws InterruptedException {
+ try {
+ ledgerAuditorManager.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
synchronized (this) {
if (executor.isShutdown()) {
return;
@@ -413,30 +274,4 @@ public class AuditorElector {
public String toString() {
return "AuditorElector for " + bookieId;
}
-
- /**
- * Compare the votes in the ascending order of the sequence number. Vote
- * format is 'V_sequencenumber', comparator will do sorting based on the
- * numeric sequence value.
- */
- private static class ElectionComparator
- implements Comparator<String>, Serializable {
- /**
- * Return -1 if the first vote is less than second. Return 1 if the
- * first vote is greater than second. Return 0 if the votes are equal.
- */
- @Override
- public int compare(String vote1, String vote2) {
- long voteSeqId1 = getVoteSequenceId(vote1);
- long voteSeqId2 = getVoteSequenceId(vote2);
- int result = voteSeqId1 < voteSeqId2 ? -1
- : (voteSeqId1 > voteSeqId2 ? 1 : 0);
- return result;
- }
-
- private long getVoteSequenceId(String vote) {
- String voteId = StringUtils.substringAfter(vote, VOTE_PREFIX);
- return Long.parseLong(voteId);
- }
- }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
index e9ac414..76705b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.http.HttpServiceProvider;
import org.apache.bookkeeper.http.service.ErrorHttpService;
import org.apache.bookkeeper.http.service.HeartbeatService;
import org.apache.bookkeeper.http.service.HttpEndpointService;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -64,16 +63,12 @@ import org.apache.bookkeeper.server.http.service.TriggerAuditService;
import org.apache.bookkeeper.server.http.service.TriggerGCService;
import org.apache.bookkeeper.server.http.service.WhoIsAuditorService;
import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
/**
* Bookkeeper based implementation of HttpServiceProvider,
* which provide bookkeeper services to handle http requests
* from different http endpoints.
- *
- * <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1332}
*/
@Slf4j
public class BKHttpServiceProvider implements HttpServiceProvider {
@@ -82,7 +77,6 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
private final BookieServer bookieServer;
private final AutoRecoveryMain autoRecovery;
private final ServerConfiguration serverConf;
- private final ZooKeeper zk;
private final BookKeeperAdmin bka;
private final ExecutorService executor;
@@ -95,12 +89,6 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
this.autoRecovery = autoRecovery;
this.serverConf = serverConf;
this.statsProvider = statsProvider;
- String zkServers = ZKMetadataDriverBase.resolveZkServers(serverConf);
- this.zk = ZooKeeperClient.newBuilder()
- .connectString(zkServers)
- .sessionTimeoutMs(serverConf.getZkTimeout())
- .build();
-
ClientConfiguration clientConfiguration = new ClientConfiguration(serverConf);
this.bka = new BookKeeperAdmin(clientConfiguration);
@@ -115,9 +103,6 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
if (bka != null) {
bka.close();
}
- if (zk != null) {
- zk.close();
- }
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Interruption while closing BKHttpServiceProvider", ie);
@@ -236,7 +221,7 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
case LIST_UNDER_REPLICATED_LEDGER:
return new ListUnderReplicatedLedgerService(configuration, bookieServer);
case WHO_IS_AUDITOR:
- return new WhoIsAuditorService(configuration, zk);
+ return new WhoIsAuditorService(configuration, bka);
case TRIGGER_AUDIT:
return new TriggerAuditService(configuration, bka);
case LOST_BOOKIE_RECOVERY_DELAY:
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java
index 8bb7824..bafa932 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java
@@ -20,14 +20,13 @@ package org.apache.bookkeeper.server.http.service;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.http.HttpServer;
import org.apache.bookkeeper.http.service.HttpEndpointService;
import org.apache.bookkeeper.http.service.HttpServiceRequest;
import org.apache.bookkeeper.http.service.HttpServiceResponse;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.replication.AuditorElector;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +40,12 @@ public class WhoIsAuditorService implements HttpEndpointService {
static final Logger LOG = LoggerFactory.getLogger(WhoIsAuditorService.class);
protected ServerConfiguration conf;
- protected ZooKeeper zk;
+ protected BookKeeperAdmin bka;
- public WhoIsAuditorService(ServerConfiguration conf, ZooKeeper zk) {
+ public WhoIsAuditorService(ServerConfiguration conf, BookKeeperAdmin bka) {
checkNotNull(conf);
this.conf = conf;
- this.zk = zk;
+ this.bka = bka;
}
/*
@@ -57,9 +56,9 @@ public class WhoIsAuditorService implements HttpEndpointService {
HttpServiceResponse response = new HttpServiceResponse();
if (HttpServer.Method.GET == request.getMethod()) {
- BookieId bookieId = null;
+ BookieId bookieId;
try {
- bookieId = AuditorElector.getCurrentAuditor(conf, zk);
+ bookieId = bka.getCurrentAuditor();
if (bookieId == null) {
response.setCode(HttpServer.StatusCode.NOT_FOUND);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java
index 2aa1298..853fca0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java
@@ -21,18 +21,15 @@ package org.apache.bookkeeper.tools.cli.commands.autorecovery;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
-import java.net.URI;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,12 +43,19 @@ public class WhoIsAuditorCommand extends BookieCommand<CliFlags> {
private static final String NAME = "whoisauditor";
private static final String DESC = "Print the node which holds the auditor lock.";
+ private BookKeeperAdmin bka;
+
public WhoIsAuditorCommand() {
+ this(null);
+ }
+
+ public WhoIsAuditorCommand(BookKeeperAdmin bka) {
super(CliSpec.newBuilder()
.withName(NAME)
.withDescription(DESC)
.withFlags(new CliFlags())
.build());
+ this.bka = bka;
}
@Override
@@ -64,26 +68,22 @@ public class WhoIsAuditorCommand extends BookieCommand<CliFlags> {
}
private boolean getAuditor(ServerConfiguration conf)
- throws ConfigurationException, InterruptedException, IOException, KeeperException {
- ZooKeeper zk = null;
- try {
- String metadataServiceUri = conf.getMetadataServiceUri();
- String zkServers = ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataServiceUri));
- zk = ZooKeeperClient.newBuilder()
- .connectString(zkServers)
- .sessionTimeoutMs(conf.getZkTimeout())
- .build();
- BookieId bookieId = AuditorElector.getCurrentAuditor(conf, zk);
- if (bookieId == null) {
- LOG.info("No auditor elected");
- return false;
- }
- LOG.info("Auditor: " + bookieId);
- } finally {
- if (zk != null) {
- zk.close();
- }
+ throws BKException, InterruptedException, IOException {
+ ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
+
+ BookieId bookieId;
+ if (this.bka != null) {
+ bookieId = bka.getCurrentAuditor();
+ } else {
+ @Cleanup
+ BookKeeperAdmin bka = new BookKeeperAdmin(clientConfiguration);
+ bookieId = bka.getCurrentAuditor();
+ }
+ if (bookieId == null) {
+ LOG.info("No auditor elected");
+ return false;
}
+ LOG.info("Auditor: " + bookieId);
return true;
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index e361294..c692e1e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -53,13 +54,20 @@ public class BookKeeperTestClient extends BookKeeper {
this.statsProvider = statsProvider;
}
+ public BookKeeperTestClient(ClientConfiguration conf, ZooKeeper zkc)
+ throws IOException, InterruptedException, BKException {
+ super(conf, zkc, null, new UnpooledByteBufAllocator(false),
+ NullStatsLogger.INSTANCE, null, null, null);
+ this.statsProvider = statsProvider;
+ }
+
public BookKeeperTestClient(ClientConfiguration conf)
throws InterruptedException, BKException, IOException {
- this(conf, null);
+ this(conf, (TestStatsProvider) null);
}
public ZooKeeper getZkHandle() {
- return super.getZkHandle();
+ return ((ZKMetadataClientDriver) metadataDriver).getZk();
}
public ClientConfiguration getConf() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index d26b226..75f3d8e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -55,11 +55,6 @@ public class MockBookKeeper extends BookKeeper {
final ZooKeeper zkc;
@Override
- public ZooKeeper getZkHandle() {
- return zkc;
- }
-
- @Override
public ClientConfiguration getConf() {
return super.getConf();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
index 9b5853a..ce115a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -160,7 +160,7 @@ public class TestBookieWatcher extends BookKeeperClusterTestCase {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(metadataServiceUri);
- try (BookKeeper bkc = new BookKeeper(conf, zk)) {
+ try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf, zk)) {
LedgerHandle lh;
try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index b1210b7..13ee214 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -24,9 +24,11 @@ import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerMa
import static org.junit.Assert.assertEquals;
import com.google.common.util.concurrent.UncheckedExecutionException;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerAuditorManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieId;
@@ -77,7 +79,9 @@ public class AuditorRollingRestartTest extends BookKeeperClusterTestCase {
underReplicationManager.pollLedgerToRereplicate(), -1);
underReplicationManager.disableLedgerReplication();
- BookieId auditor = AuditorElector.getCurrentAuditor(baseConf, zkc);
+ @Cleanup
+ LedgerAuditorManager lam = mFactory.newLedgerAuditorManager();
+ BookieId auditor = lam.getCurrentAuditor();
ServerConfiguration conf = killBookie(auditor);
Thread.sleep(2000);
startBookie(conf);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 2ed92b2..0c2221a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -21,13 +21,13 @@
package org.apache.bookkeeper.replication;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
@@ -97,9 +97,22 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
*/
ZKMetadataClientDriver zkMetadataClientDriver1 = startAutoRecoveryMain(main1);
ZooKeeper zk1 = zkMetadataClientDriver1.getZk();
- Auditor auditor1 = main1.auditorElector.getAuditor();
- BookieId currentAuditor = AuditorElector.getCurrentAuditor(confByIndex(0), zk1);
+ // Wait until auditor gets elected
+ for (int i = 0; i < 10; i++) {
+ try {
+ if (main1.auditorElector.getCurrentAuditor() != null) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ } catch (IOException e) {
+ Thread.sleep(1000);
+ }
+ }
+ BookieId currentAuditor = main1.auditorElector.getCurrentAuditor();
+ assertNotNull(currentAuditor);
+ Auditor auditor1 = main1.auditorElector.getAuditor();
assertTrue("Current Auditor should be AR1", currentAuditor.equals(BookieImpl.getBookieId(confByIndex(0))));
assertTrue("Auditor of AR1 should be running", auditor1.isRunning());
@@ -142,18 +155,9 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
}
/*
- * since zk1 and zk2 sessions are expired, the 'myVote' ephemeral nodes
- * of AR1 and AR2 should not be existing anymore.
- */
- assertTrue("AR1's vote node should not be existing",
- zk3.exists(main1.auditorElector.getMyVote(), false) == null);
- assertTrue("AR2's vote node should not be existing",
- zk3.exists(main2.auditorElector.getMyVote(), false) == null);
-
- /*
* the AR3 should be current auditor.
*/
- currentAuditor = AuditorElector.getCurrentAuditor(confByIndex(2), zk3);
+ currentAuditor = main3.auditorElector.getCurrentAuditor();
assertTrue("Current Auditor should be AR3", currentAuditor.equals(BookieImpl.getBookieId(confByIndex(2))));
auditor3 = main3.auditorElector.getAuditor();
assertTrue("Auditor of AR3 should be running", auditor3.isRunning());
@@ -180,29 +184,12 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
* start autoRecoveryMain and make sure all its components are running and
* myVote node is existing
*/
- ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain autoRecoveryMain)
- throws InterruptedException, KeeperException, UnavailableException {
+ ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain autoRecoveryMain) {
autoRecoveryMain.start();
ZKMetadataClientDriver metadataClientDriver = (ZKMetadataClientDriver) autoRecoveryMain.bkc
.getMetadataClientDriver();
- ZooKeeper zk = metadataClientDriver.getZk();
- String myVote;
- for (int i = 0; i < 10; i++) {
- if (autoRecoveryMain.auditorElector.isRunning() && autoRecoveryMain.replicationWorker.isRunning()
- && autoRecoveryMain.isAutoRecoveryRunning()) {
- myVote = autoRecoveryMain.auditorElector.getMyVote();
- if (myVote != null) {
- if (null != zk.exists(myVote, false)) {
- break;
- }
- }
- }
- Thread.sleep(100);
- }
assertTrue("autoRecoveryMain components should be running", autoRecoveryMain.auditorElector.isRunning()
&& autoRecoveryMain.replicationWorker.isRunning() && autoRecoveryMain.isAutoRecoveryRunning());
- assertTrue("autoRecoveryMain's vote node should be existing",
- zk.exists(autoRecoveryMain.auditorElector.getMyVote(), false) != null);
return metadataClientDriver;
}
}
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java
index d21856a..f0e74fc 100644
--- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java
@@ -27,6 +27,8 @@ import static org.powermock.api.mockito.PowerMockito.when;
import java.net.URI;
import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieId;
@@ -75,10 +77,6 @@ public class WhoIsAuditorCommandTest extends BookieCommandTestBase {
BookieId bookieId = BookieId.parse(UUID.randomUUID().toString());
- PowerMockito.mockStatic(AuditorElector.class);
- PowerMockito.when(AuditorElector.getCurrentAuditor(eq(conf), eq(zk)))
- .thenReturn(bookieId);
-
PowerMockito.mockStatic(CommandHelpers.class);
PowerMockito.when(CommandHelpers
.getBookieSocketAddrStringRepresentation(
@@ -86,8 +84,11 @@ public class WhoIsAuditorCommandTest extends BookieCommandTestBase {
}
@Test
- public void testCommand() {
- WhoIsAuditorCommand cmd = new WhoIsAuditorCommand();
+ public void testCommand() throws Exception {
+ @Cleanup
+ BookKeeperAdmin bka = mock(BookKeeperAdmin.class);
+ when(bka.getCurrentAuditor()).thenReturn(BookieId.parse("127.0.0.1:3181"));
+ WhoIsAuditorCommand cmd = new WhoIsAuditorCommand(bka);
Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" }));
}
}