You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2022/09/12 07:57:31 UTC

[GitHub] [bookkeeper] Shoothzj commented on a diff in pull request #3457: BP-56: Support data migration

Shoothzj commented on code in PR #3457:
URL: https://github.com/apache/bookkeeper/pull/3457#discussion_r968084246


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataClientDriver.java:
##########
@@ -106,6 +110,23 @@ interface SessionStateListener {
      */
     void setSessionStateListener(SessionStateListener sessionStateListener);
 
+    default List<String> listLedgersOfMigrationReplicas(String migrationReplicasPath)
+            throws InterruptedException, KeeperException {
+        return new ArrayList<>();
+    }
+
+    default void lockMigrationReplicas(String lockPath, String advertisedAddress)

Review Comment:
   method in interface needs javadoc



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataBookieDriver.java:
##########
@@ -88,6 +89,13 @@ default CompletableFuture<Void> disableHealthCheck() {
         return result;
     }
 
+    default CompletableFuture<Void> submitToMigrateReplicas(long ledgerId, String bookieIds)

Review Comment:
   This method needs javadoc



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicasMigrationWorker.java:
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.bookkeeper.bookie.BookieThread;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.LedgerChecker;
+import org.apache.bookkeeper.client.LedgerFragment;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Migrate replica data to other bookie nodes.
+ */
+public class ReplicasMigrationWorker implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicasMigrationWorker.class);
+    private final BookKeeperAdmin admin;
+    private final BiConsumer<Long, Long> onReadEntryFailureCallback;
+    private final MetadataClientDriver metadataClientDriver;
+    private final String rootPath;
+    private final String replicasMigrationPath;
+    private final String lock = "locked";
+    private final LedgerChecker ledgerChecker;
+    private final Thread workerThread;
+    private final long backoffMs = 100;
+    private volatile boolean workerRunning = false;
+    private final String seperator = ",";
+    private final String advertisedAddress;
+
+    @StatsDoc(
+            name = NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION,
+            help = "the number of entries ReplicasMigrationWorker unable to read"
+    )
+    private final Counter numEntriesUnableToReadForMigration;
+
+    public ReplicasMigrationWorker(final ServerConfiguration conf, BookKeeper bkc, StatsLogger statsLogger) {
+        this.advertisedAddress = conf.getAdvertisedAddress();
+        this.rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
+        this.replicasMigrationPath = getReplicasMigrationPath(rootPath);
+        this.ledgerChecker = new LedgerChecker(bkc);
+        this.admin = new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf));
+        this.metadataClientDriver = bkc.getMetadataClientDriver();
+        this.numEntriesUnableToReadForMigration = statsLogger
+                .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION);
+        this.onReadEntryFailureCallback = (ledgerid, entryid) -> {
+            numEntriesUnableToReadForMigration.inc();
+        };
+        this.workerThread = new BookieThread(this, "ReplicasMigrationWorker");
+    }
+
+    public static String getReplicasMigrationPath(String rootPath) {
+        return String.format("%s/%s", rootPath, BookKeeperConstants.MIGRATION_REPLICAS);
+    }
+
+    public static String getLockForMigrationReplicasPath(String replicasMigrationPath, long ledgerId, String lock) {
+        return String.format("%s/%s/%s", replicasMigrationPath, ledgerId, lock);
+    }
+
+    public static String getLedgerForMigrationReplicasPath(String replicasMigrationPath, long ledgerId) {
+        return String.format("%s/%s", replicasMigrationPath, ledgerId);
+    }
+
+    static class Replicas {
+        long ledgerId;
+        Set<BookieId> bookieIds;
+
+        public Replicas(long ledgerId, Set<BookieId> bookieIds) {
+            this.ledgerId = ledgerId;
+            this.bookieIds = bookieIds;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("<ledgerId=%s,bookies=<%s>", ledgerId, bookieIds);
+        }
+    }
+
+    private Replicas getReplicasToMigrate() throws InterruptedException, KeeperException {
+        List<String> ledgerIds = metadataClientDriver.listLedgersOfMigrationReplicas(replicasMigrationPath);
+        for (String ledgerId : ledgerIds) {
+            try {
+                String lockForMigrationReplicasPath = getLockForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId), lock);
+                String ledgerForMigrationReplicasPath = getLedgerForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId));
+                metadataClientDriver.lockMigrationReplicas(lockForMigrationReplicasPath, advertisedAddress);
+                String bookieIdStr = metadataClientDriver.getOwnerBookiesMigrationReplicas(
+                        ledgerForMigrationReplicasPath);
+                String[] migrationBookieIds = bookieIdStr.split(seperator);
+                Set<BookieId> bookieIds = new HashSet<>();
+                for (int i = 0; i < migrationBookieIds.length; i++) {
+                    bookieIds.add(BookieId.parse(migrationBookieIds[i]));
+                }
+                return new Replicas(Long.parseLong(ledgerId), bookieIds);
+            } catch (KeeperException.NodeExistsException nee) {
+                // do nothing, someone each could have created it
+            } catch (UnsupportedEncodingException e) {
+                LOG.error("getReplicasToMigrate failed!", e);
+            }
+        }
+        return null;
+    }
+
+    private Set<LedgerFragment> getFragmentsOnMigrationBookies(LedgerHandle lh, Set<BookieId> migrationBookieIds) {
+        return ledgerChecker.getFragmentsOnMigrationBookies(lh, migrationBookieIds);
+    }
+
+    private static void waitBackOffTime(long backoffMs) {
+        try {
+            Thread.sleep(backoffMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void releaseLock(Replicas replicas) {
+        try {
+            metadataClientDriver.deleteZkPath(getLockForMigrationReplicasPath(replicasMigrationPath,
+                    replicas.ledgerId, lock));
+            LOG.info(String.format("Release lock for ledgerId %s success!", replicas.ledgerId));
+        } catch (KeeperException.NoNodeException e) {
+            // do nothing,already release lock
+        } catch (Exception e) {
+            LOG.error(String.format("Release lock for ledgerId %s failed!", replicas.ledgerId));
+        }
+    }
+
+    /**
+     * Start the migration replicas worker.
+     */
+    public void start() {
+        this.workerThread.start();
+    }
+
+    @Override
+    public void run() {
+        workerRunning = true;
+        while (workerRunning) {
+            //1. Get a replica ledger to replicate
+            Replicas replicas = null;
+            try {
+                replicas = getReplicasToMigrate();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (KeeperException e) {
+                LOG.error("getReplicasToMigrate throw exception!", e);
+            }
+            if (null == replicas) {
+                waitBackOffTime(backoffMs);
+                continue;
+            }
+
+            LOG.info(String.format("Start migrate replicas(%s)!", replicas));
+            //2. Get the fragment containing the migration bookie
+            try (LedgerHandle lh = admin.openLedgerNoRecovery(replicas.ledgerId)) {
+                Set<LedgerFragment> fragments = getFragmentsOnMigrationBookies(lh, replicas.bookieIds);
+                if (fragments.size() < 1) {
+                    //3.The replication has been completed, delete the ledgerId directory and release lock
+                    releaseLock(replicas);
+                    metadataClientDriver.deleteZkPath(getLedgerForMigrationReplicasPath(replicasMigrationPath,
+                            replicas.ledgerId));
+                    LOG.info(String.format("Finish ledgerId %s migration!", replicas.ledgerId));
+                }
+
+                //4. Start replicate

Review Comment:
   ```suggestion
                   // 4. Start replicate
   ```



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicasMigrationWorker.java:
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.bookkeeper.bookie.BookieThread;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.LedgerChecker;
+import org.apache.bookkeeper.client.LedgerFragment;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Migrate replica data to other bookie nodes.
+ */
+public class ReplicasMigrationWorker implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicasMigrationWorker.class);
+    private final BookKeeperAdmin admin;
+    private final BiConsumer<Long, Long> onReadEntryFailureCallback;
+    private final MetadataClientDriver metadataClientDriver;
+    private final String rootPath;
+    private final String replicasMigrationPath;
+    private final String lock = "locked";
+    private final LedgerChecker ledgerChecker;
+    private final Thread workerThread;
+    private final long backoffMs = 100;
+    private volatile boolean workerRunning = false;
+    private final String seperator = ",";
+    private final String advertisedAddress;
+
+    @StatsDoc(
+            name = NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION,
+            help = "the number of entries ReplicasMigrationWorker unable to read"
+    )
+    private final Counter numEntriesUnableToReadForMigration;
+
+    public ReplicasMigrationWorker(final ServerConfiguration conf, BookKeeper bkc, StatsLogger statsLogger) {
+        this.advertisedAddress = conf.getAdvertisedAddress();
+        this.rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
+        this.replicasMigrationPath = getReplicasMigrationPath(rootPath);
+        this.ledgerChecker = new LedgerChecker(bkc);
+        this.admin = new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf));
+        this.metadataClientDriver = bkc.getMetadataClientDriver();
+        this.numEntriesUnableToReadForMigration = statsLogger
+                .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION);
+        this.onReadEntryFailureCallback = (ledgerid, entryid) -> {
+            numEntriesUnableToReadForMigration.inc();
+        };
+        this.workerThread = new BookieThread(this, "ReplicasMigrationWorker");
+    }
+
+    public static String getReplicasMigrationPath(String rootPath) {
+        return String.format("%s/%s", rootPath, BookKeeperConstants.MIGRATION_REPLICAS);
+    }
+
+    public static String getLockForMigrationReplicasPath(String replicasMigrationPath, long ledgerId, String lock) {
+        return String.format("%s/%s/%s", replicasMigrationPath, ledgerId, lock);
+    }
+
+    public static String getLedgerForMigrationReplicasPath(String replicasMigrationPath, long ledgerId) {
+        return String.format("%s/%s", replicasMigrationPath, ledgerId);
+    }
+
+    static class Replicas {
+        long ledgerId;
+        Set<BookieId> bookieIds;
+
+        public Replicas(long ledgerId, Set<BookieId> bookieIds) {
+            this.ledgerId = ledgerId;
+            this.bookieIds = bookieIds;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("<ledgerId=%s,bookies=<%s>", ledgerId, bookieIds);
+        }
+    }
+
+    private Replicas getReplicasToMigrate() throws InterruptedException, KeeperException {
+        List<String> ledgerIds = metadataClientDriver.listLedgersOfMigrationReplicas(replicasMigrationPath);
+        for (String ledgerId : ledgerIds) {
+            try {
+                String lockForMigrationReplicasPath = getLockForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId), lock);
+                String ledgerForMigrationReplicasPath = getLedgerForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId));
+                metadataClientDriver.lockMigrationReplicas(lockForMigrationReplicasPath, advertisedAddress);
+                String bookieIdStr = metadataClientDriver.getOwnerBookiesMigrationReplicas(
+                        ledgerForMigrationReplicasPath);
+                String[] migrationBookieIds = bookieIdStr.split(seperator);
+                Set<BookieId> bookieIds = new HashSet<>();
+                for (int i = 0; i < migrationBookieIds.length; i++) {
+                    bookieIds.add(BookieId.parse(migrationBookieIds[i]));
+                }
+                return new Replicas(Long.parseLong(ledgerId), bookieIds);
+            } catch (KeeperException.NodeExistsException nee) {
+                // do nothing, someone each could have created it
+            } catch (UnsupportedEncodingException e) {
+                LOG.error("getReplicasToMigrate failed!", e);
+            }
+        }
+        return null;
+    }
+
+    private Set<LedgerFragment> getFragmentsOnMigrationBookies(LedgerHandle lh, Set<BookieId> migrationBookieIds) {
+        return ledgerChecker.getFragmentsOnMigrationBookies(lh, migrationBookieIds);
+    }
+
+    private static void waitBackOffTime(long backoffMs) {
+        try {
+            Thread.sleep(backoffMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void releaseLock(Replicas replicas) {
+        try {
+            metadataClientDriver.deleteZkPath(getLockForMigrationReplicasPath(replicasMigrationPath,
+                    replicas.ledgerId, lock));
+            LOG.info(String.format("Release lock for ledgerId %s success!", replicas.ledgerId));
+        } catch (KeeperException.NoNodeException e) {
+            // do nothing,already release lock
+        } catch (Exception e) {
+            LOG.error(String.format("Release lock for ledgerId %s failed!", replicas.ledgerId));
+        }
+    }
+
+    /**
+     * Start the migration replicas worker.
+     */
+    public void start() {
+        this.workerThread.start();
+    }
+
+    @Override
+    public void run() {
+        workerRunning = true;
+        while (workerRunning) {
+            //1. Get a replica ledger to replicate
+            Replicas replicas = null;
+            try {
+                replicas = getReplicasToMigrate();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (KeeperException e) {
+                LOG.error("getReplicasToMigrate throw exception!", e);
+            }
+            if (null == replicas) {
+                waitBackOffTime(backoffMs);
+                continue;
+            }
+
+            LOG.info(String.format("Start migrate replicas(%s)!", replicas));
+            //2. Get the fragment containing the migration bookie

Review Comment:
   ```suggestion
               // 2. Get the fragment containing the migration bookie
   ```



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MetadataBookieDriver.java:
##########
@@ -88,6 +89,13 @@ default CompletableFuture<Void> disableHealthCheck() {
         return result;
     }
 
+    default CompletableFuture<Void> submitToMigrateReplicas(long ledgerId, String bookieIds)
+            throws UnsupportedEncodingException {

Review Comment:
   why we need this exception? If we use **UTF-8**, I think we can remove this throw



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicasMigrationWorker.java:
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.bookkeeper.bookie.BookieThread;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.LedgerChecker;
+import org.apache.bookkeeper.client.LedgerFragment;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Migrate replica data to other bookie nodes.
+ */
+public class ReplicasMigrationWorker implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicasMigrationWorker.class);
+    private final BookKeeperAdmin admin;
+    private final BiConsumer<Long, Long> onReadEntryFailureCallback;
+    private final MetadataClientDriver metadataClientDriver;
+    private final String rootPath;
+    private final String replicasMigrationPath;
+    private final String lock = "locked";
+    private final LedgerChecker ledgerChecker;
+    private final Thread workerThread;
+    private final long backoffMs = 100;
+    private volatile boolean workerRunning = false;
+    private final String seperator = ",";
+    private final String advertisedAddress;
+
+    @StatsDoc(
+            name = NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION,
+            help = "the number of entries ReplicasMigrationWorker unable to read"
+    )
+    private final Counter numEntriesUnableToReadForMigration;
+
+    public ReplicasMigrationWorker(final ServerConfiguration conf, BookKeeper bkc, StatsLogger statsLogger) {
+        this.advertisedAddress = conf.getAdvertisedAddress();
+        this.rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
+        this.replicasMigrationPath = getReplicasMigrationPath(rootPath);
+        this.ledgerChecker = new LedgerChecker(bkc);
+        this.admin = new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf));
+        this.metadataClientDriver = bkc.getMetadataClientDriver();
+        this.numEntriesUnableToReadForMigration = statsLogger
+                .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION);
+        this.onReadEntryFailureCallback = (ledgerid, entryid) -> {
+            numEntriesUnableToReadForMigration.inc();
+        };
+        this.workerThread = new BookieThread(this, "ReplicasMigrationWorker");
+    }
+
+    public static String getReplicasMigrationPath(String rootPath) {
+        return String.format("%s/%s", rootPath, BookKeeperConstants.MIGRATION_REPLICAS);
+    }
+
+    public static String getLockForMigrationReplicasPath(String replicasMigrationPath, long ledgerId, String lock) {
+        return String.format("%s/%s/%s", replicasMigrationPath, ledgerId, lock);
+    }
+
+    public static String getLedgerForMigrationReplicasPath(String replicasMigrationPath, long ledgerId) {
+        return String.format("%s/%s", replicasMigrationPath, ledgerId);
+    }
+
+    static class Replicas {
+        long ledgerId;
+        Set<BookieId> bookieIds;
+
+        public Replicas(long ledgerId, Set<BookieId> bookieIds) {
+            this.ledgerId = ledgerId;
+            this.bookieIds = bookieIds;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("<ledgerId=%s,bookies=<%s>", ledgerId, bookieIds);
+        }
+    }
+
+    private Replicas getReplicasToMigrate() throws InterruptedException, KeeperException {
+        List<String> ledgerIds = metadataClientDriver.listLedgersOfMigrationReplicas(replicasMigrationPath);
+        for (String ledgerId : ledgerIds) {
+            try {
+                String lockForMigrationReplicasPath = getLockForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId), lock);
+                String ledgerForMigrationReplicasPath = getLedgerForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId));
+                metadataClientDriver.lockMigrationReplicas(lockForMigrationReplicasPath, advertisedAddress);
+                String bookieIdStr = metadataClientDriver.getOwnerBookiesMigrationReplicas(
+                        ledgerForMigrationReplicasPath);
+                String[] migrationBookieIds = bookieIdStr.split(seperator);
+                Set<BookieId> bookieIds = new HashSet<>();
+                for (int i = 0; i < migrationBookieIds.length; i++) {
+                    bookieIds.add(BookieId.parse(migrationBookieIds[i]));
+                }
+                return new Replicas(Long.parseLong(ledgerId), bookieIds);
+            } catch (KeeperException.NodeExistsException nee) {
+                // do nothing, someone each could have created it
+            } catch (UnsupportedEncodingException e) {
+                LOG.error("getReplicasToMigrate failed!", e);
+            }
+        }
+        return null;
+    }
+
+    private Set<LedgerFragment> getFragmentsOnMigrationBookies(LedgerHandle lh, Set<BookieId> migrationBookieIds) {
+        return ledgerChecker.getFragmentsOnMigrationBookies(lh, migrationBookieIds);
+    }
+
+    private static void waitBackOffTime(long backoffMs) {
+        try {
+            Thread.sleep(backoffMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void releaseLock(Replicas replicas) {
+        try {
+            metadataClientDriver.deleteZkPath(getLockForMigrationReplicasPath(replicasMigrationPath,
+                    replicas.ledgerId, lock));
+            LOG.info(String.format("Release lock for ledgerId %s success!", replicas.ledgerId));
+        } catch (KeeperException.NoNodeException e) {
+            // do nothing,already release lock
+        } catch (Exception e) {
+            LOG.error(String.format("Release lock for ledgerId %s failed!", replicas.ledgerId));
+        }
+    }
+
+    /**
+     * Start the migration replicas worker.
+     */
+    public void start() {
+        this.workerThread.start();
+    }
+
+    @Override
+    public void run() {
+        workerRunning = true;
+        while (workerRunning) {
+            //1. Get a replica ledger to replicate

Review Comment:
   ```suggestion
               // 1. Get a replica ledger to replicate
   ```



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookies/ReplicasMigrationCommand.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookies;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.HttpServer;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.HttpClientUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Migrate the data of the specified replica to other bookie nodes.
+ * */
+public class ReplicasMigrationCommand extends BookieCommand<ReplicasMigrationCommand.ReplicasMigrationFlags> {
+    static final Logger LOG = LoggerFactory.getLogger(ReplicasMigrationCommand.class);
+    private static final String NAME = "replicasMigration";
+    private static final String DESC = "Migrate the data of the specified replica to other bookie nodes";
+    private static final String ALL = "ALL";
+    private final String seperator = ",";
+    public ReplicasMigrationCommand() {
+        this(new ReplicasMigrationFlags());
+    }
+    private ReplicasMigrationCommand(ReplicasMigrationFlags flags) {
+        super(CliSpec.<ReplicasMigrationFlags>newBuilder()
+                .withName(NAME)
+                .withDescription(DESC)
+                .withFlags(flags)
+                .build());
+    }
+
+    /**
+     * Flags for replicasMigration command.
+     */
+    @Accessors(fluent = true)
+    @Setter
+    public static class ReplicasMigrationFlags extends CliFlags {
+
+        @Parameter(names = { "-b", "--bookieIds" }, description = "Bookies corresponding to the migrated replica, "
+                + "eg: bookieId1,bookieId2,bookieId3")
+        private String bookieIdsOfMigratedReplica;
+
+        @Parameter(names = { "-l", "--ledgerIds" }, description = "The ledgerIds corresponding to the migrated replica,"
+                + "eg: ledgerId1,ledgerId2,ledgerId3. The default is ALL, which means that all replica data on these "
+                + "bookie nodes will be migrated to other bookie nodes")
+        private String ledgerIdsOfMigratedReplica;
+
+        @Parameter(names = { "-r", "--readOnly" }, description = "Whether to set the bookie nodes of the migrated data"
+                + " to readOnly, the default is true")
+        private boolean readOnly;
+
+        @Parameter(names = {"-p", "--httpPort"}, description = "http port,default is 8080")
+        private String port;
+    }
+
+    @Override
+    public boolean apply(ServerConfiguration conf, ReplicasMigrationFlags cmdFlags) {
+        try {
+            return switchToReadonly(conf, cmdFlags) && replicasMigration(conf, cmdFlags);
+        } catch (Exception e) {
+            throw new UncheckedExecutionException(e.getMessage(), e);
+        }
+    }
+
+    public Set<BookieId> parseBookieIds(String bookieIdsOfMigratedReplica) {
+        Set<BookieId> bookieIdsSet = new HashSet<>();
+        String[] bookieIds = bookieIdsOfMigratedReplica.split(seperator);
+        for (String bookieId : bookieIds) {
+            bookieIdsSet.add(BookieId.parse(bookieId));
+        }
+        return bookieIdsSet;
+    }
+
+    public Set<Long> parseLedgerIds(String ledgerIdsOfMigratedReplica) {
+        Set<Long> ledgerIdSet = new HashSet<>();
+        String[] ledgerIds = ledgerIdsOfMigratedReplica.split(seperator);
+        for (String ledgerId : ledgerIds) {
+            ledgerIdSet.add(Long.parseLong(ledgerId));
+        }
+        return ledgerIdSet;
+    }
+
+    private boolean switchToReadonly(ServerConfiguration conf, ReplicasMigrationFlags flags) {
+        if (flags.readOnly && !conf.isHttpServerEnabled()) {
+            throw new RuntimeException("Please enable http service first, config httpServerEnabled is false!");
+        }
+
+        Set<BookieId> bookieIdSet = parseBookieIds(flags.bookieIdsOfMigratedReplica);
+        for (BookieId bookieId : bookieIdSet) {
+            String bookieAddress = bookieId.getId().split(":")[0];
+            String httpAddress = bookieAddress + ":" + flags.port;
+            String url = String.format("http://%s/api/v1/bookie/state/readonly", httpAddress);
+            String readOnly = flags.readOnly ? "true" : "false";
+            CloseableHttpResponse response = HttpClientUtils.doPutRequest(url, readOnly, "UTF-8");
+            int responseCode = response.getStatusLine().getStatusCode();
+            if (responseCode != HttpServer.StatusCode.OK.getValue()) {
+                LOG.error(String.format("Set readonly to %s failed! bookieId:%s, responseCode:%s ",
+                        readOnly, bookieId, responseCode));
+                return false;
+            }
+            LOG.info(String.format("Set readonly to %s success! bookieId:%s, responseCode:%s",
+                    readOnly, bookieId, responseCode));
+        }
+        return true;
+    }
+
+    private boolean replicasMigration(ServerConfiguration conf, ReplicasMigrationFlags flags)
+            throws BKException, InterruptedException, IOException {
+        ClientConfiguration adminConf = new ClientConfiguration(conf);
+        BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
+        try {
+            Map<Long, Set<BookieId>> toMigratedLedgerAndBookieMap = new HashMap<>();
+            Set<BookieId> bookieIds = parseBookieIds(flags.bookieIdsOfMigratedReplica);
+            Set<Long> ledgerIdsToMigrate = null;
+            if (!flags.ledgerIdsOfMigratedReplica.equals(ALL)) {
+                ledgerIdsToMigrate = parseLedgerIds(flags.ledgerIdsOfMigratedReplica);
+            }
+            Map<BookieId, Set<Long>> bookieLedgerMaps = admin.listLedgers(bookieIds);
+            for (Map.Entry<BookieId, Set<Long>> bookieIdSetEntry : bookieLedgerMaps.entrySet()) {
+                Set<Long> ledgersOnBookieId = bookieIdSetEntry.getValue();
+                if (!flags.ledgerIdsOfMigratedReplica.equals(ALL)) {
+                    ledgersOnBookieId.retainAll(ledgerIdsToMigrate);
+                }
+                for (Long ledgerId : ledgersOnBookieId) {
+                    Set<BookieId> bookieIdSet = toMigratedLedgerAndBookieMap.getOrDefault(ledgerId, new HashSet<>());
+                    bookieIdSet.add(bookieIdSetEntry.getKey());
+                    toMigratedLedgerAndBookieMap.put(ledgerId, bookieIdSet);
+                }
+            }
+
+            MetadataDrivers.runFunctionWithMetadataBookieDriver(conf, driver -> {
+                for (Map.Entry<Long, Set<BookieId>> entry : toMigratedLedgerAndBookieMap.entrySet()) {
+                    Long ledgerId = entry.getKey();
+                    String bookies = StringUtils.join(entry.getValue(), seperator);
+                    try {
+                        driver.submitToMigrateReplicas(ledgerId, bookies).get();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    } catch (Throwable e) {
+                        LOG.warn(String.format("The migration task of Replicas(ledgerId=%s,bookieIds=%s) "
+                                + "submit failed!", ledgerId, bookies), e);
+                    }
+                }
+                return true;
+            });
+
+            return true;
+        } catch (Exception e) {
+            LOG.error("Received exception in ReplicasMigrationCommand ", e);
+            return false;
+        } finally {
+            if (admin != null) {

Review Comment:
   admin can't be null here. Am I right?



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookies/ReplicasMigrationCommand.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookies;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.HttpServer;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.HttpClientUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Migrate the data of the specified replica to other bookie nodes.
+ * */
+public class ReplicasMigrationCommand extends BookieCommand<ReplicasMigrationCommand.ReplicasMigrationFlags> {
+    static final Logger LOG = LoggerFactory.getLogger(ReplicasMigrationCommand.class);
+    private static final String NAME = "replicasMigration";
+    private static final String DESC = "Migrate the data of the specified replica to other bookie nodes";
+    private static final String ALL = "ALL";
+    private final String seperator = ",";
+    public ReplicasMigrationCommand() {
+        this(new ReplicasMigrationFlags());
+    }
+    private ReplicasMigrationCommand(ReplicasMigrationFlags flags) {
+        super(CliSpec.<ReplicasMigrationFlags>newBuilder()
+                .withName(NAME)
+                .withDescription(DESC)
+                .withFlags(flags)
+                .build());
+    }
+
+    /**
+     * Flags for replicasMigration command.
+     */
+    @Accessors(fluent = true)
+    @Setter
+    public static class ReplicasMigrationFlags extends CliFlags {
+
+        @Parameter(names = { "-b", "--bookieIds" }, description = "Bookies corresponding to the migrated replica, "
+                + "eg: bookieId1,bookieId2,bookieId3")
+        private String bookieIdsOfMigratedReplica;
+
+        @Parameter(names = { "-l", "--ledgerIds" }, description = "The ledgerIds corresponding to the migrated replica,"
+                + "eg: ledgerId1,ledgerId2,ledgerId3. The default is ALL, which means that all replica data on these "
+                + "bookie nodes will be migrated to other bookie nodes")
+        private String ledgerIdsOfMigratedReplica;
+
+        @Parameter(names = { "-r", "--readOnly" }, description = "Whether to set the bookie nodes of the migrated data"
+                + " to readOnly, the default is true")
+        private boolean readOnly;
+
+        @Parameter(names = {"-p", "--httpPort"}, description = "http port,default is 8080")
+        private String port;
+    }
+
+    @Override
+    public boolean apply(ServerConfiguration conf, ReplicasMigrationFlags cmdFlags) {
+        try {
+            return switchToReadonly(conf, cmdFlags) && replicasMigration(conf, cmdFlags);
+        } catch (Exception e) {
+            throw new UncheckedExecutionException(e.getMessage(), e);
+        }
+    }
+
+    public Set<BookieId> parseBookieIds(String bookieIdsOfMigratedReplica) {
+        Set<BookieId> bookieIdsSet = new HashSet<>();
+        String[] bookieIds = bookieIdsOfMigratedReplica.split(seperator);
+        for (String bookieId : bookieIds) {
+            bookieIdsSet.add(BookieId.parse(bookieId));
+        }
+        return bookieIdsSet;
+    }
+
+    public Set<Long> parseLedgerIds(String ledgerIdsOfMigratedReplica) {
+        Set<Long> ledgerIdSet = new HashSet<>();
+        String[] ledgerIds = ledgerIdsOfMigratedReplica.split(seperator);
+        for (String ledgerId : ledgerIds) {
+            ledgerIdSet.add(Long.parseLong(ledgerId));
+        }
+        return ledgerIdSet;
+    }
+
+    private boolean switchToReadonly(ServerConfiguration conf, ReplicasMigrationFlags flags) {
+        if (flags.readOnly && !conf.isHttpServerEnabled()) {
+            throw new RuntimeException("Please enable http service first, config httpServerEnabled is false!");
+        }
+
+        Set<BookieId> bookieIdSet = parseBookieIds(flags.bookieIdsOfMigratedReplica);
+        for (BookieId bookieId : bookieIdSet) {
+            String bookieAddress = bookieId.getId().split(":")[0];
+            String httpAddress = bookieAddress + ":" + flags.port;
+            String url = String.format("http://%s/api/v1/bookie/state/readonly", httpAddress);

Review Comment:
   we can use the constant in `HttpRouter.BOOKIE_STATE_READONLY`



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicasMigrationWorker.java:
##########
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import org.apache.bookkeeper.bookie.BookieThread;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.LedgerChecker;
+import org.apache.bookkeeper.client.LedgerFragment;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.MetadataClientDriver;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Migrate replica data to other bookie nodes.
+ */
+public class ReplicasMigrationWorker implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(ReplicasMigrationWorker.class);
+    private final BookKeeperAdmin admin;
+    private final BiConsumer<Long, Long> onReadEntryFailureCallback;
+    private final MetadataClientDriver metadataClientDriver;
+    private final String rootPath;
+    private final String replicasMigrationPath;
+    private final String lock = "locked";
+    private final LedgerChecker ledgerChecker;
+    private final Thread workerThread;
+    private final long backoffMs = 100;
+    private volatile boolean workerRunning = false;
+    private final String seperator = ",";
+    private final String advertisedAddress;
+
+    @StatsDoc(
+            name = NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION,
+            help = "the number of entries ReplicasMigrationWorker unable to read"
+    )
+    private final Counter numEntriesUnableToReadForMigration;
+
+    public ReplicasMigrationWorker(final ServerConfiguration conf, BookKeeper bkc, StatsLogger statsLogger) {
+        this.advertisedAddress = conf.getAdvertisedAddress();
+        this.rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
+        this.replicasMigrationPath = getReplicasMigrationPath(rootPath);
+        this.ledgerChecker = new LedgerChecker(bkc);
+        this.admin = new BookKeeperAdmin(bkc, statsLogger, new ClientConfiguration(conf));
+        this.metadataClientDriver = bkc.getMetadataClientDriver();
+        this.numEntriesUnableToReadForMigration = statsLogger
+                .getCounter(NUM_ENTRIES_UNABLE_TO_READ_FOR_MIGRATION);
+        this.onReadEntryFailureCallback = (ledgerid, entryid) -> {
+            numEntriesUnableToReadForMigration.inc();
+        };
+        this.workerThread = new BookieThread(this, "ReplicasMigrationWorker");
+    }
+
+    public static String getReplicasMigrationPath(String rootPath) {
+        return String.format("%s/%s", rootPath, BookKeeperConstants.MIGRATION_REPLICAS);
+    }
+
+    public static String getLockForMigrationReplicasPath(String replicasMigrationPath, long ledgerId, String lock) {
+        return String.format("%s/%s/%s", replicasMigrationPath, ledgerId, lock);
+    }
+
+    public static String getLedgerForMigrationReplicasPath(String replicasMigrationPath, long ledgerId) {
+        return String.format("%s/%s", replicasMigrationPath, ledgerId);
+    }
+
+    static class Replicas {
+        long ledgerId;
+        Set<BookieId> bookieIds;
+
+        public Replicas(long ledgerId, Set<BookieId> bookieIds) {
+            this.ledgerId = ledgerId;
+            this.bookieIds = bookieIds;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("<ledgerId=%s,bookies=<%s>", ledgerId, bookieIds);
+        }
+    }
+
+    private Replicas getReplicasToMigrate() throws InterruptedException, KeeperException {
+        List<String> ledgerIds = metadataClientDriver.listLedgersOfMigrationReplicas(replicasMigrationPath);
+        for (String ledgerId : ledgerIds) {
+            try {
+                String lockForMigrationReplicasPath = getLockForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId), lock);
+                String ledgerForMigrationReplicasPath = getLedgerForMigrationReplicasPath(
+                        replicasMigrationPath, Long.parseLong(ledgerId));
+                metadataClientDriver.lockMigrationReplicas(lockForMigrationReplicasPath, advertisedAddress);
+                String bookieIdStr = metadataClientDriver.getOwnerBookiesMigrationReplicas(
+                        ledgerForMigrationReplicasPath);
+                String[] migrationBookieIds = bookieIdStr.split(seperator);
+                Set<BookieId> bookieIds = new HashSet<>();
+                for (int i = 0; i < migrationBookieIds.length; i++) {
+                    bookieIds.add(BookieId.parse(migrationBookieIds[i]));
+                }
+                return new Replicas(Long.parseLong(ledgerId), bookieIds);
+            } catch (KeeperException.NodeExistsException nee) {
+                // do nothing, someone each could have created it
+            } catch (UnsupportedEncodingException e) {
+                LOG.error("getReplicasToMigrate failed!", e);
+            }
+        }
+        return null;
+    }
+
+    private Set<LedgerFragment> getFragmentsOnMigrationBookies(LedgerHandle lh, Set<BookieId> migrationBookieIds) {
+        return ledgerChecker.getFragmentsOnMigrationBookies(lh, migrationBookieIds);
+    }
+
+    private static void waitBackOffTime(long backoffMs) {
+        try {
+            Thread.sleep(backoffMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private void releaseLock(Replicas replicas) {
+        try {
+            metadataClientDriver.deleteZkPath(getLockForMigrationReplicasPath(replicasMigrationPath,
+                    replicas.ledgerId, lock));
+            LOG.info(String.format("Release lock for ledgerId %s success!", replicas.ledgerId));
+        } catch (KeeperException.NoNodeException e) {
+            // do nothing,already release lock
+        } catch (Exception e) {
+            LOG.error(String.format("Release lock for ledgerId %s failed!", replicas.ledgerId));
+        }
+    }
+
+    /**
+     * Start the migration replicas worker.
+     */
+    public void start() {
+        this.workerThread.start();
+    }
+
+    @Override
+    public void run() {
+        workerRunning = true;
+        while (workerRunning) {
+            //1. Get a replica ledger to replicate
+            Replicas replicas = null;
+            try {
+                replicas = getReplicasToMigrate();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } catch (KeeperException e) {
+                LOG.error("getReplicasToMigrate throw exception!", e);
+            }
+            if (null == replicas) {
+                waitBackOffTime(backoffMs);
+                continue;
+            }
+
+            LOG.info(String.format("Start migrate replicas(%s)!", replicas));
+            //2. Get the fragment containing the migration bookie
+            try (LedgerHandle lh = admin.openLedgerNoRecovery(replicas.ledgerId)) {
+                Set<LedgerFragment> fragments = getFragmentsOnMigrationBookies(lh, replicas.bookieIds);
+                if (fragments.size() < 1) {
+                    //3.The replication has been completed, delete the ledgerId directory and release lock

Review Comment:
   ```suggestion
                       // 3.The replication has been completed, delete the ledgerId directory and release lock
   ```



##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookies/ReplicasMigrationCommand.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookies;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.http.HttpServer;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.HttpClientUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Migrate the data of the specified replica to other bookie nodes.
+ * */
+public class ReplicasMigrationCommand extends BookieCommand<ReplicasMigrationCommand.ReplicasMigrationFlags> {
+    static final Logger LOG = LoggerFactory.getLogger(ReplicasMigrationCommand.class);
+    private static final String NAME = "replicasMigration";
+    private static final String DESC = "Migrate the data of the specified replica to other bookie nodes";
+    private static final String ALL = "ALL";
+    private final String seperator = ",";
+    public ReplicasMigrationCommand() {
+        this(new ReplicasMigrationFlags());
+    }
+    private ReplicasMigrationCommand(ReplicasMigrationFlags flags) {
+        super(CliSpec.<ReplicasMigrationFlags>newBuilder()
+                .withName(NAME)
+                .withDescription(DESC)
+                .withFlags(flags)
+                .build());
+    }
+
+    /**
+     * Flags for replicasMigration command.
+     */
+    @Accessors(fluent = true)
+    @Setter
+    public static class ReplicasMigrationFlags extends CliFlags {
+
+        @Parameter(names = { "-b", "--bookieIds" }, description = "Bookies corresponding to the migrated replica, "
+                + "eg: bookieId1,bookieId2,bookieId3")
+        private String bookieIdsOfMigratedReplica;
+
+        @Parameter(names = { "-l", "--ledgerIds" }, description = "The ledgerIds corresponding to the migrated replica,"
+                + "eg: ledgerId1,ledgerId2,ledgerId3. The default is ALL, which means that all replica data on these "
+                + "bookie nodes will be migrated to other bookie nodes")
+        private String ledgerIdsOfMigratedReplica;
+
+        @Parameter(names = { "-r", "--readOnly" }, description = "Whether to set the bookie nodes of the migrated data"
+                + " to readOnly, the default is true")
+        private boolean readOnly;
+
+        @Parameter(names = {"-p", "--httpPort"}, description = "http port,default is 8080")
+        private String port;
+    }
+
+    @Override
+    public boolean apply(ServerConfiguration conf, ReplicasMigrationFlags cmdFlags) {
+        try {
+            return switchToReadonly(conf, cmdFlags) && replicasMigration(conf, cmdFlags);
+        } catch (Exception e) {
+            throw new UncheckedExecutionException(e.getMessage(), e);
+        }
+    }
+
+    public Set<BookieId> parseBookieIds(String bookieIdsOfMigratedReplica) {
+        Set<BookieId> bookieIdsSet = new HashSet<>();
+        String[] bookieIds = bookieIdsOfMigratedReplica.split(seperator);
+        for (String bookieId : bookieIds) {
+            bookieIdsSet.add(BookieId.parse(bookieId));
+        }
+        return bookieIdsSet;
+    }
+
+    public Set<Long> parseLedgerIds(String ledgerIdsOfMigratedReplica) {
+        Set<Long> ledgerIdSet = new HashSet<>();
+        String[] ledgerIds = ledgerIdsOfMigratedReplica.split(seperator);
+        for (String ledgerId : ledgerIds) {
+            ledgerIdSet.add(Long.parseLong(ledgerId));
+        }
+        return ledgerIdSet;
+    }
+
+    private boolean switchToReadonly(ServerConfiguration conf, ReplicasMigrationFlags flags) {
+        if (flags.readOnly && !conf.isHttpServerEnabled()) {
+            throw new RuntimeException("Please enable http service first, config httpServerEnabled is false!");
+        }
+
+        Set<BookieId> bookieIdSet = parseBookieIds(flags.bookieIdsOfMigratedReplica);
+        for (BookieId bookieId : bookieIdSet) {
+            String bookieAddress = bookieId.getId().split(":")[0];
+            String httpAddress = bookieAddress + ":" + flags.port;
+            String url = String.format("http://%s/api/v1/bookie/state/readonly", httpAddress);
+            String readOnly = flags.readOnly ? "true" : "false";
+            CloseableHttpResponse response = HttpClientUtils.doPutRequest(url, readOnly, "UTF-8");

Review Comment:
   can we reuse some code in `listbookies` command?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@bookkeeper.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org