You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/04 12:40:30 UTC
svn commit: r1393983 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/
bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ bookkee...
Author: ivank
Date: Thu Oct 4 10:40:29 2012
New Revision: 1393983
URL: http://svn.apache.org/viewvc?rev=1393983&view=rev
Log:
BOOKKEEPER-278: Ability to disable auto recovery temporarily (rakeshr via ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 4 10:40:29 2012
@@ -146,6 +146,8 @@ Trunk (unreleased changes)
BOOKKEEPER-388: Document bookie format command (kiran_bc via ivank)
+ BOOKKEEPER-278: Ability to disable auto recovery temporarily (rakeshr via ivank)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java Thu Oct 4 10:40:29 2012
@@ -17,6 +17,7 @@
*/
package org.apache.bookkeeper.meta;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException;
/**
@@ -61,4 +62,39 @@ public interface LedgerUnderreplicationM
*/
void close()
throws ReplicationException.UnavailableException;
+
+ /**
+ * Stop ledger replication. Currently running ledger rereplication tasks
+ * will be continued and will be stopped from next task. This will block
+ * ledger replication {@link #Auditor} and {@link #getLedgerToRereplicate()}
+ * tasks
+ */
+ void disableLedgerReplication()
+ throws ReplicationException.UnavailableException;
+
+ /**
+ * Resuming ledger replication. This will allow ledger replication
+ * {@link #Auditor} and {@link #getLedgerToRereplicate()} tasks to continue
+ */
+ void enableLedgerReplication()
+ throws ReplicationException.UnavailableException;
+
+ /**
+ * Check whether the ledger replication is enabled or not. This will return
+ * true if the ledger replication is enabled, otherwise return false
+ *
+ * @return - return true if it is enabled otherwise return false
+ */
+ boolean isLedgerReplicationEnabled()
+ throws ReplicationException.UnavailableException;
+
+ /**
+ * Receive notification asynchronously when the ledger replication process
+ * is enabled
+ *
+ * @param cb
+ * - callback implementation to receive the notification
+ */
+ void notifyLedgerReplicationEnabled(GenericCallback<Void> cb)
+ throws ReplicationException.UnavailableException;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java Thu Oct 4 10:40:29 2012
@@ -18,7 +18,11 @@
package org.apache.bookkeeper.meta;
+import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import org.apache.bookkeeper.conf.AbstractConfiguration;
@@ -65,6 +69,7 @@ public class ZkLedgerUnderreplicationMan
static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
static final Charset UTF8 = Charset.forName("UTF-8");
public static final String UNDER_REPLICATION_NODE = "underreplication";
+ static final String DISABLE_NODE = "disable";
static final String LAYOUT="BASIC";
static final int LAYOUT_VERSION=1;
@@ -104,22 +109,6 @@ public class ZkLedgerUnderreplicationMan
checkLayout();
}
- private void createOptimistic(String path, byte[] data) throws KeeperException, InterruptedException {
- try {
- zkc.create(path, data,
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- } catch (KeeperException.NoNodeException nne) {
- int lastSlash = path.lastIndexOf('/');
- if (lastSlash <= 0) {
- throw nne;
- }
- String parent = path.substring(0, lastSlash);
- createOptimistic(parent, new byte[0]);
- zkc.create(path, data,
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
private void checkLayout()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
if (zkc.exists(basePath, false) == null) {
@@ -212,8 +201,9 @@ public class ZkLedgerUnderreplicationMan
UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
try {
builder.addReplica(missingReplica);
- createOptimistic(znode,
- TextFormat.printToString(builder.build()).getBytes(UTF8));
+ ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
+ .printToString(builder.build()).getBytes(UTF8),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
Stat s = zkc.exists(znode, false);
if (s == null) {
@@ -331,6 +321,7 @@ public class ZkLedgerUnderreplicationMan
LOG.debug("getLedgerToRereplicate()");
try {
while (true) {
+ waitIfLedgerReplicationDisabled();
final CountDownLatch changedLatch = new CountDownLatch(1);
Watcher w = new Watcher() {
public void process(WatchedEvent e) {
@@ -352,10 +343,19 @@ public class ZkLedgerUnderreplicationMan
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+ throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
}
}
+ private void waitIfLedgerReplicationDisabled() throws UnavailableException,
+ InterruptedException {
+ ReplicationEnableCb cb = new ReplicationEnableCb();
+ if (!this.isLedgerReplicationEnabled()) {
+ this.notifyLedgerReplicationEnabled(cb);
+ cb.await();
+ }
+ }
+
@Override
public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
LOG.debug("releaseLedger(ledgerId={})", ledgerId);
@@ -371,7 +371,7 @@ public class ZkLedgerUnderreplicationMan
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+ throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
}
}
@@ -389,7 +389,95 @@ public class ZkLedgerUnderreplicationMan
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
- throw new ReplicationException.UnavailableException("Interrupted while conecting zookeeper", ie);
+ throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
+ }
+ }
+
+ @Override
+ public void disableLedgerReplication()
+ throws ReplicationException.UnavailableException {
+ LOG.debug("disableLedegerReplication()");
+ try {
+ ZkUtils.createFullPathOptimistic(zkc, basePath + '/'
+ + ZkLedgerUnderreplicationManager.DISABLE_NODE, ""
+ .getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ LOG.info("Auto ledger re-replication is disabled!");
+ } catch (KeeperException ke) {
+ LOG.error("Exception while stopping replication", ke);
+ throw new ReplicationException.UnavailableException(
+ "Error contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException(
+ "Interrupted while connecting zookeeper", ie);
+ }
+ }
+
+ @Override
+ public void enableLedgerReplication()
+ throws ReplicationException.UnavailableException {
+ LOG.debug("enableLedegerReplication()");
+ try {
+ zkc.delete(basePath + '/'
+ + ZkLedgerUnderreplicationManager.DISABLE_NODE, -1);
+ LOG.info("Resuming automatic ledger re-replication");
+ } catch (KeeperException ke) {
+ LOG.error("Exception while resuming ledger replication", ke);
+ throw new ReplicationException.UnavailableException(
+ "Error contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException(
+ "Interrupted while connecting zookeeper", ie);
+ }
+ }
+
+ @Override
+ public boolean isLedgerReplicationEnabled()
+ throws ReplicationException.UnavailableException {
+ LOG.debug("isLedgerReplicationEnabled()");
+ try {
+ if (null != zkc.exists(basePath + '/' + DISABLE_NODE, false)) {
+ return false;
+ }
+ return true;
+ } catch (KeeperException ke) {
+ LOG.error("Error while checking the state of "
+ + "ledger re-replication", ke);
+ throw new ReplicationException.UnavailableException(
+ "Error contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException(
+ "Interrupted while contacting zookeeper", ie);
+ }
+ }
+
+ @Override
+ public void notifyLedgerReplicationEnabled(final GenericCallback<Void> cb)
+ throws ReplicationException.UnavailableException {
+ LOG.debug("notifyLedgerReplicationEnabled()");
+ Watcher w = new Watcher() {
+ public void process(WatchedEvent e) {
+ if (e.getType() == Watcher.Event.EventType.NodeDeleted) {
+ cb.operationComplete(0, null);
+ }
+ }
+ };
+ try {
+ if (null == zkc.exists(basePath + '/' + DISABLE_NODE, w)) {
+ cb.operationComplete(0, null);
+ return;
+ }
+ } catch (KeeperException ke) {
+ LOG.error("Error while checking the state of "
+ + "ledger re-replication", ke);
+ throw new ReplicationException.UnavailableException(
+ "Error contacting zookeeper", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new ReplicationException.UnavailableException(
+ "Interrupted while contacting zookeeper", ie);
}
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Thu Oct 4 10:40:29 2012
@@ -106,6 +106,10 @@ public class Auditor extends Thread impl
while (true) {
// wait for bookie join/failure notifications
bookieNotifications.take();
+
+ // check whether ledger replication is enabled
+ waitIfLedgerReplicationDisabled();
+
List<String> availableBookies = getAvailableBookies();
// casting to String, as knownBookies and availableBookies
@@ -132,11 +136,22 @@ public class Auditor extends Thread impl
LOG.error("Interrupted while watching available bookies ", ie);
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
+ } catch (UnavailableException ue) {
+ LOG.error("Exception while watching available bookies", ue);
}
shutdown();
}
+ private void waitIfLedgerReplicationDisabled() throws UnavailableException,
+ InterruptedException {
+ ReplicationEnableCb cb = new ReplicationEnableCb();
+ if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+ ledgerUnderreplicationManager.notifyLedgerReplicationEnabled(cb);
+ cb.await();
+ }
+ }
+
private List<String> getAvailableBookies() throws KeeperException,
InterruptedException {
return zkc.getChildren(conf.getZkAvailableBookiesPath(), this);
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java?rev=1393983&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationEnableCb.java Thu Oct 4 10:40:29 2012
@@ -0,0 +1,56 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Callback which is getting notified when the replication process is enabled
+ */
+public class ReplicationEnableCb implements GenericCallback<Void> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ReplicationEnableCb.class);
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void operationComplete(int rc, Void result) {
+ latch.countDown();
+ LOG.debug("Automatic ledger re-replication is enabled");
+ }
+
+ /**
+ * This is a blocking call and causes the current thread to wait until the
+ * replication process is enabled
+ *
+ * @throws InterruptedException
+ * interrupted while waiting
+ */
+ public void await() throws InterruptedException {
+ LOG.debug("Automatic ledger re-replication is disabled. "
+ + "Hence waiting until its enabled!");
+ latch.await();
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java Thu Oct 4 10:40:29 2012
@@ -140,4 +140,38 @@ public class ZkUtils {
}
return newZk;
}
+
+ /**
+ * Utility to create the complete znode path synchronously
+ *
+ * @param zkc
+ * - ZK instance
+ * @param path
+ * - znode path
+ * @param data
+ * - znode data
+ * @param acl
+ * - Acl of the zk path
+ * @param createMode
+ * - Create mode of zk path
+ * @throws KeeperException
+ * if the server returns a non-zero error code, or invalid ACL
+ * @throws InterruptedException
+ * if the transaction is interrupted
+ */
+ public static void createFullPathOptimistic(ZooKeeper zkc, String path,
+ byte[] data, final List<ACL> acl, final CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ try {
+ zkc.create(path, data, acl, createMode);
+ } catch (KeeperException.NoNodeException nne) {
+ int lastSlash = path.lastIndexOf('/');
+ if (lastSlash <= 0) {
+ throw nne;
+ }
+ String parent = path.substring(0, lastSlash);
+ createFullPathOptimistic(zkc, parent, new byte[0], acl, createMode);
+ zkc.create(path, data, acl, createMode);
+ }
+ }
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java Thu Oct 4 10:40:29 2012
@@ -256,6 +256,31 @@ public class AuditorLedgerCheckerTest ex
}
}
+ @Test//(timeout = 30000)
+ public void testToggleLedgerReplication() throws Exception {
+ LedgerHandle lh1 = createAndAddEntriesToLedger();
+ ledgerList.add(lh1.getId());
+ LOG.debug("Created following ledgers : " + ledgerList);
+
+ // failing another bookie
+ CountDownLatch urReplicaLatch = registerUrLedgerWatcher(ledgerList
+ .size());
+
+ // disabling ledger replication
+ urLedgerMgr.disableLedgerReplication();
+ ArrayList<String> shutdownBookieList = new ArrayList<String>();
+ shutdownBookieList.add(shutdownBookie(bs.size() - 1));
+ shutdownBookieList.add(shutdownBookie(bs.size() - 1));
+
+ assertFalse("Ledger replication is not disabled!", urReplicaLatch
+ .await(5, TimeUnit.SECONDS));
+
+ // enabling ledger replication
+ urLedgerMgr.enableLedgerReplication();
+ assertTrue("Ledger replication is not disabled!", urReplicaLatch.await(
+ 5, TimeUnit.SECONDS));
+ }
+
private CountDownLatch registerUrLedgerWatcher(int count)
throws KeeperException, InterruptedException {
final CountDownLatch underReplicaLatch = new CountDownLatch(count);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1393983&r1=1393982&r2=1393983&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Thu Oct 4 10:40:29 2012
@@ -49,8 +49,12 @@ import org.apache.bookkeeper.replication
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -76,6 +80,7 @@ public class TestLedgerUnderreplicationM
String basePath;
String urLedgerPath;
+ boolean isLedgerReplicationDisabled = true;
@Before
public void setupZooKeeper() throws Exception {
@@ -466,6 +471,119 @@ public class TestLedgerUnderreplicationM
verifyMarkLedgerUnderreplicated(missingReplica);
}
+ /**
+ * Test disabling the ledger re-replication. After disabling, it will not be
+ * able to getLedgerToRereplicate(). This calls will enter into infinite
+ * waiting until enabling rereplication process
+ */
+ @Test(timeout = 20000)
+ public void testDisableLedegerReplication() throws Exception {
+ final LedgerUnderreplicationManager replicaMgr = lmf1
+ .newLedgerUnderreplicationManager();
+
+ // simulate few urLedgers before disabling
+ final Long ledgerA = 0xfeadeefdacL;
+ final String missingReplica = "localhost:3181";
+
+ // disabling replication
+ replicaMgr.disableLedgerReplication();
+ LOG.info("Disabled Ledeger Replication");
+
+ try {
+ replicaMgr.markLedgerUnderreplicated(ledgerA, missingReplica);
+ } catch (UnavailableException e) {
+ LOG.debug("Unexpected exception while marking urLedger", e);
+ fail("Unexpected exception while marking urLedger" + e.getMessage());
+ }
+
+ Future<Long> fA = getLedgerToReplicate(replicaMgr);
+ try {
+ fA.get(5, TimeUnit.SECONDS);
+ fail("Shouldn't be able to find a ledger to replicate");
+ } catch (TimeoutException te) {
+ // expected behaviour, as the replication is disabled
+ isLedgerReplicationDisabled = false;
+ }
+
+ assertTrue("Ledger replication is not disabled!",
+ !isLedgerReplicationDisabled);
+ }
+
+ /**
+ * Test enabling the ledger re-replication. After enableLedegerReplication,
+ * should continue getLedgerToRereplicate() task
+ */
+ @Test(timeout = 20000)
+ public void testEnableLedegerReplication() throws Exception {
+ isLedgerReplicationDisabled = true;
+ final LedgerUnderreplicationManager replicaMgr = lmf1
+ .newLedgerUnderreplicationManager();
+
+ // simulate few urLedgers before disabling
+ final Long ledgerA = 0xfeadeefdacL;
+ final String missingReplica = "localhost:3181";
+ try {
+ replicaMgr.markLedgerUnderreplicated(ledgerA, missingReplica);
+ } catch (UnavailableException e) {
+ LOG.debug("Unexpected exception while marking urLedger", e);
+ fail("Unexpected exception while marking urLedger" + e.getMessage());
+ }
+
+ // disabling replication
+ replicaMgr.disableLedgerReplication();
+ LOG.debug("Disabled Ledeger Replication");
+
+ String znodeA = getUrLedgerZnode(ledgerA);
+ final CountDownLatch znodeLatch = new CountDownLatch(2);
+ String urledgerA = StringUtils.substringAfterLast(znodeA, "/");
+ String urLockLedgerA = basePath + "/locks/" + urledgerA;
+ zkc1.exists(urLockLedgerA, new Watcher(){
+ @Override
+ public void process(WatchedEvent event) {
+ if (event.getType() == EventType.NodeCreated) {
+ znodeLatch.countDown();
+ LOG.debug("Recieved node creation event for the zNodePath:"
+ + event.getPath());
+ }
+
+ }});
+ // getLedgerToRereplicate is waiting until enable rereplication
+ Thread thread1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Long lA = replicaMgr.getLedgerToRereplicate();
+ assertEquals("Should be the ledger I just marked", lA,
+ ledgerA);
+ isLedgerReplicationDisabled = false;
+ znodeLatch.countDown();
+ } catch (UnavailableException e) {
+ LOG.debug("Unexpected exception while marking urLedger", e);
+ isLedgerReplicationDisabled = false;
+ }
+ }
+ };
+ thread1.start();
+
+ try {
+ znodeLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("Ledger replication is not disabled!",
+ isLedgerReplicationDisabled);
+ assertEquals("Failed to disable ledger replication!", 2, znodeLatch
+ .getCount());
+
+ replicaMgr.enableLedgerReplication();
+ znodeLatch.await(5, TimeUnit.SECONDS);
+ LOG.debug("Enabled Ledeger Replication");
+ assertTrue("Ledger replication is not disabled!",
+ !isLedgerReplicationDisabled);
+ assertEquals("Failed to disable ledger replication!", 0, znodeLatch
+ .getCount());
+ } finally {
+ thread1.interrupt();
+ }
+ }
+
private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
throws KeeperException, InterruptedException,
CompatibilityException, UnavailableException {