You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ti...@apache.org on 2019/01/07 07:58:41 UTC
[2/2] hbase git commit: HBASE-21588 Procedure v2 wal splitting
implementation
HBASE-21588 Procedure v2 wal splitting implementation
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/281d6429
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/281d6429
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/281d6429
Branch: refs/heads/master
Commit: 281d6429e55149cc4c05430dcc1d1dc136d8b245
Parents: 77db1fa
Author: tianjingyun <ti...@xiaomi.com>
Authored: Sun Jan 6 01:31:59 2019 +0800
Committer: Jingyun Tian <ti...@apache.org>
Committed: Mon Jan 7 15:58:15 2019 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 8 +
.../src/main/protobuf/MasterProcedure.proto | 25 ++
.../SplitLogWorkerCoordination.java | 3 -
.../ZkSplitLogWorkerCoordination.java | 6 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 18 +
.../hadoop/hbase/master/MasterServices.java | 7 +
.../hadoop/hbase/master/MasterWalManager.java | 6 +-
.../hadoop/hbase/master/SplitWALManager.java | 239 ++++++++++++
.../master/procedure/ServerCrashProcedure.java | 76 +++-
.../procedure/ServerProcedureInterface.java | 13 +-
.../hbase/master/procedure/ServerQueue.java | 2 +
.../master/procedure/SplitWALProcedure.java | 199 ++++++++++
.../procedure/SplitWALRemoteProcedure.java | 195 ++++++++++
.../hbase/regionserver/HRegionServer.java | 16 +-
.../hbase/regionserver/SplitLogWorker.java | 10 +-
.../hbase/regionserver/SplitWALCallable.java | 109 ++++++
.../hadoop/hbase/master/AbstractTestDLS.java | 3 +-
.../hadoop/hbase/master/TestRestartCluster.java | 21 +
.../hadoop/hbase/master/TestRollingRestart.java | 18 +-
.../hbase/master/TestSplitWALManager.java | 383 +++++++++++++++++++
.../procedure/TestServerCrashProcedure.java | 21 +-
.../master/procedure/TestSplitWALProcedure.java | 133 +++++++
.../hbase/regionserver/TestSplitLogWorker.java | 5 +-
23 files changed, 1491 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index fdc3d82..75ee687 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1311,6 +1311,14 @@ public final class HConstants {
public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
"hbase.client.fast.fail.interceptor.impl";
+ public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated";
+
+ public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true;
+
+ public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters";
+
+ public static final int DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER = 2;
+
/** Config key for if the server should send backpressure and if the client should listen to
* that backpressure from the server */
public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled";
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index b365373..59af722 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -308,6 +308,8 @@ enum ServerCrashState {
SERVER_CRASH_WAIT_ON_ASSIGN = 9;
SERVER_CRASH_SPLIT_META_LOGS = 10;
SERVER_CRASH_ASSIGN_META = 11;
+ SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12;
+ SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13;
SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true];
SERVER_CRASH_FINISH = 100;
}
@@ -565,4 +567,27 @@ message SwitchRpcThrottleStateData {
message SwitchRpcThrottleRemoteStateData {
required ServerName target_server = 1;
required bool rpc_throttle_enabled = 2;
+}
+
+message SplitWALParameter {
+ required string wal_path = 1;
+}
+
+
+message SplitWALData{
+ required string wal_path = 1;
+ required ServerName crashed_server=2;
+ optional ServerName worker = 3;
+}
+
+message SplitWALRemoteData{
+ required string wal_path = 1;
+ required ServerName crashed_server=2;
+ required ServerName worker = 3;
+}
+
+enum SplitWALState{
+ ACQUIRE_SPLIT_WAL_WORKER = 1;
+ DISPATCH_WAL_TO_WORKER = 2;
+ RELEASE_SPLIT_WORKER = 3;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
index ab04f60..ad74015 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -48,9 +48,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
@InterfaceAudience.Private
public interface SplitLogWorkerCoordination {
-/* SplitLogWorker part */
- int DEFAULT_MAX_SPLITTERS = 2;
-
/**
* Initialize internal values. This method should be used when corresponding SplitLogWorker
* instance is created
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index ff555f2..7ceaaec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.hbase.coordination;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -135,7 +138,8 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
this.server = server;
this.worker = worker;
this.splitTaskExecutor = splitExecutor;
- maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
+ maxConcurrentTasks =
+ conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
reportPeriod =
conf.getInt("hbase.splitlog.report.period",
conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 0bcef59..8d47db4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.master;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Service;
@@ -338,6 +340,13 @@ public class HMaster extends HRegionServer implements MasterServices {
private MasterFileSystem fileSystemManager;
private MasterWalManager walManager;
+ // manager to manage procedure-based WAL splitting, can be null if current
+ // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
+ // and MasterWalManager, which means zk-based WAL splitting code will be
+ // useless after we switch to the procedure-based one. our eventual goal
+ // is to remove all the zk-based WAL splitting code.
+ private SplitWALManager splitWALManager;
+
// server manager to deal with region server info
private volatile ServerManager serverManager;
@@ -945,6 +954,10 @@ public class HMaster extends HRegionServer implements MasterServices {
// loading.
this.serverManager = createServerManager(this);
this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
+ if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+ DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+ this.splitWALManager = new SplitWALManager(this);
+ }
createProcedureExecutor();
@SuppressWarnings("rawtypes")
Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType =
@@ -1401,6 +1414,11 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
+ public SplitWALManager getSplitWALManager() {
+ return splitWALManager;
+ }
+
+ @Override
public TableStateManager getTableStateManager() {
return tableStateManager;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
index 27ef5f8..12c78ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
@@ -510,4 +510,11 @@ public interface MasterServices extends Server {
* @return True if cluster is up; false if cluster is not up (we are shutting down).
*/
boolean isClusterUp();
+
+ /**
+ * @return return null if current is zk-based WAL splitting
+ */
+ default SplitWALManager getSplitWALManager(){
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
index 5ab1c28..fbf4594 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -60,7 +60,8 @@ public class MasterWalManager {
}
};
- final static PathFilter NON_META_FILTER = new PathFilter() {
+ @VisibleForTesting
+ public final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
return !AbstractFSWALProvider.isMetaFile(p);
@@ -167,7 +168,6 @@ public class MasterWalManager {
/**
* @return listing of ServerNames found by parsing WAL directory paths in FS.
- *
*/
public Set<ServerName> getServerNamesFromWALDirPath(final PathFilter filter) throws IOException {
FileStatus[] walDirForServerNames = getWALDirPaths(filter);
@@ -290,7 +290,7 @@ public class MasterWalManager {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification=
"We only release this lock when we set it. Updates to code that uses it should verify use " +
"of the guard boolean.")
- private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
+ List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
List<Path> logDirs = new ArrayList<>();
boolean needReleaseLock = false;
if (!this.services.isInitialized()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
new file mode 100644
index 0000000..fc50840
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
+import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
+import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each
+ * {@link SplitWALProcedure}.
+ * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER).
+ * Helps assign and release workers for split tasks.
+ * Provide helper method to delete split WAL file and directory.
+ *
+ * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta)
+ * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta)
+ * can delete the splitting WAL and directory via deleteSplitWAL(wal)
+ * and deleteSplitWAL(crashedServer)
+ * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath)
+ * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure)
+ * and releaseSplitWALWorker(worker, scheduler)
+ *
+ * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager},
+ * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and
+ * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed
+ * after we switch to procedure-based WAL splitting.
+ */
+@InterfaceAudience.Private
+public class SplitWALManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class);
+
+ private final MasterServices master;
+ private final SplitWorkerAssigner splitWorkerAssigner;
+ private final Path rootDir;
+ private final FileSystem fs;
+ private final Configuration conf;
+
+ public SplitWALManager(MasterServices master) {
+ this.master = master;
+ this.conf = master.getConfiguration();
+ this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
+ conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
+ this.rootDir = master.getMasterFileSystem().getWALRootDir();
+ this.fs = master.getMasterFileSystem().getFileSystem();
+
+ }
+
+ public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
+ throws IOException {
+ try {
+ // 1. list all splitting files
+ List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta);
+ // 2. create corresponding procedures
+ return createSplitWALProcedures(splittingFiles, crashedServer);
+ } catch (IOException e) {
+ LOG.error("failed to create procedures for splitting logs of {}", crashedServer, e);
+ throw e;
+ }
+ }
+
+ public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta)
+ throws IOException {
+ List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName));
+ FileStatus[] fileStatuses =
+ SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER);
+ LOG.info("size of WALs of {} is {}, isMeta: {}", serverName, fileStatuses.length, splitMeta);
+ return Lists.newArrayList(fileStatuses);
+ }
+
+ private Path getWALSplitDir(ServerName serverName) {
+ Path logDir =
+ new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+ return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
+ }
+
+ public void deleteSplitWAL(String wal) throws IOException {
+ fs.delete(new Path(wal), false);
+ }
+
+ public void deleteWALDir(ServerName serverName) throws IOException {
+ Path splitDir = getWALSplitDir(serverName);
+ fs.delete(splitDir, false);
+ }
+
+ public boolean isSplitWALFinished(String walPath) throws IOException {
+ return !fs.exists(new Path(rootDir, walPath));
+ }
+
+ @VisibleForTesting
+ List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
+ ServerName crashedServer) {
+ return splittingWALs.stream()
+ .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * try to acquire an worker from online servers which is executring
+ * @param procedure split WAL task
+ * @return an available region server which could execute this task
+ * @throws ProcedureSuspendedException if there is no available worker,
+ * it will throw this exception to let the procedure wait
+ */
+ public ServerName acquireSplitWALWorker(Procedure<?> procedure)
+ throws ProcedureSuspendedException {
+ Optional<ServerName> worker = splitWorkerAssigner.acquire();
+ LOG.debug("acquired a worker {} to split a WAL", worker);
+ if (worker.isPresent()) {
+ return worker.get();
+ }
+ splitWorkerAssigner.suspend(procedure);
+ throw new ProcedureSuspendedException();
+ }
+
+ /**
+ * After the worker finished the split WAL task, it will release the worker, and wake up all the
+ * suspend procedures in the ProcedureEvent
+ * @param worker worker which is about to release
+ * @param scheduler scheduler which is to wake up the procedure event
+ */
+ public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
+ LOG.debug("release a worker {} to split a WAL", worker);
+ splitWorkerAssigner.release(worker);
+ splitWorkerAssigner.wake(scheduler);
+ }
+
+ /**
+ * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL
+ * tasks running on the region server side, they will not be count by the new splitWorkerAssigner.
+ * Thus we should add the workers of running tasks to the assigner when we load the procedures
+ * from MasterProcWALs.
+ * @param worker region server which is executing a split WAL task
+ */
+ public void addUsedSplitWALWorker(ServerName worker){
+ splitWorkerAssigner.addUsedWorker(worker);
+ }
+
+ /**
+ * help assign and release a worker for each WAL splitting task
+ * For each worker, concurrent running splitting task should be no more than maxSplitTasks
+ * If a task failed to acquire a worker, it will suspend and wait for workers available
+ *
+ */
+ private static final class SplitWorkerAssigner implements ServerListener {
+ private int maxSplitTasks;
+ private final ProcedureEvent<?> event;
+ private Map<ServerName, Integer> currentWorkers = new HashMap<>();
+ private MasterServices master;
+
+ public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) {
+ this.maxSplitTasks = maxSplitTasks;
+ this.master = master;
+ this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
+ this.master.getServerManager().registerListener(this);
+ }
+
+ public synchronized Optional<ServerName> acquire() {
+ List<ServerName> serverList = master.getServerManager().getOnlineServersList();
+ Collections.shuffle(serverList);
+ Optional<ServerName> worker = serverList.stream().filter(
+ serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
+ .findAny();
+ if (worker.isPresent()) {
+ currentWorkers.compute(worker.get(), (serverName,
+ availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
+ }
+ return worker;
+ }
+
+ public synchronized void release(ServerName serverName) {
+ currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
+ }
+
+ public void suspend(Procedure<?> proc) {
+ event.suspend();
+ event.suspendIfNotReady(proc);
+ }
+
+ public void wake(MasterProcedureScheduler scheduler) {
+ if (!event.isReady()) {
+ event.wake(scheduler);
+ }
+ }
+
+ @Override
+ public void serverAdded(ServerName worker) {
+ this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+ }
+
+ public synchronized void addUsedWorker(ServerName worker) {
+ // load used worker when master restart
+ currentWorkers.compute(worker, (serverName,
+ availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index 05bcd28..2072727 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master.procedure;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -28,9 +31,11 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager;
+import org.apache.hadoop.hbase.master.SplitWALManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
+import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
@@ -107,6 +112,7 @@ public class ServerCrashProcedure
protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
throws ProcedureSuspendedException, ProcedureYieldException {
final MasterServices services = env.getMasterServices();
+ final AssignmentManager am = env.getAssignmentManager();
// HBASE-14802
// If we have not yet notified that we are processing a dead server, we should do now.
if (!notifiedDeadServer) {
@@ -117,6 +123,7 @@ public class ServerCrashProcedure
switch (state) {
case SERVER_CRASH_START:
case SERVER_CRASH_SPLIT_META_LOGS:
+ case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
case SERVER_CRASH_ASSIGN_META:
break;
default:
@@ -137,8 +144,24 @@ public class ServerCrashProcedure
}
break;
case SERVER_CRASH_SPLIT_META_LOGS:
- splitMetaLogs(env);
- setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
+ if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+ DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+ splitMetaLogs(env);
+ setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
+ } else {
+ am.getRegionStates().metaLogSplitting(serverName);
+ addChildProcedure(createSplittingWalProcedures(env, true));
+ setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR);
+ }
+ break;
+ case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
+ if(isSplittingDone(env, true)){
+ cleanupSplitDir(env);
+ setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
+ am.getRegionStates().metaLogSplit(serverName);
+ } else {
+ setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
+ }
break;
case SERVER_CRASH_ASSIGN_META:
assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
@@ -156,8 +179,24 @@ public class ServerCrashProcedure
}
break;
case SERVER_CRASH_SPLIT_LOGS:
- splitLogs(env);
- setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+ if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+ DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+ splitLogs(env);
+ setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+ } else {
+ am.getRegionStates().logSplitting(this.serverName);
+ addChildProcedure(createSplittingWalProcedures(env, false));
+ setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR);
+ }
+ break;
+ case SERVER_CRASH_DELETE_SPLIT_WALS_DIR:
+ if (isSplittingDone(env, false)) {
+ cleanupSplitDir(env);
+ setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
+ am.getRegionStates().logSplit(this.serverName);
+ } else {
+ setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
+ }
break;
case SERVER_CRASH_ASSIGN:
// If no regions to assign, skip assign and skip to the finish.
@@ -179,6 +218,7 @@ public class ServerCrashProcedure
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
case SERVER_CRASH_FINISH:
+ LOG.info("removed crashed server {} after splitting done", serverName);
services.getAssignmentManager().getRegionStates().removeServer(serverName);
services.getServerManager().getDeadServers().finish(serverName);
return Flow.NO_MORE_STATE;
@@ -191,6 +231,34 @@ public class ServerCrashProcedure
return Flow.HAS_MORE_STATE;
}
+ private void cleanupSplitDir(MasterProcedureEnv env) {
+ SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
+ try {
+ splitWALManager.deleteWALDir(serverName);
+ } catch (IOException e) {
+ LOG.warn("remove WAL directory of server {} failed, ignore...", serverName, e);
+ }
+ }
+
+ private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) {
+ LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta);
+ SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
+ try {
+ return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0;
+ } catch (IOException e) {
+ LOG.warn("get filelist of serverName {} failed, retry...", serverName, e);
+ return false;
+ }
+ }
+
+ private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta)
+ throws IOException {
+ LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta);
+ SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
+ List<Procedure> procedures = splitWALManager.splitWALs(serverName, splitMeta);
+ return procedures.toArray(new Procedure[procedures.size()]);
+ }
+
private boolean filterDefaultMetaRegions() {
if (regionsOnCrashedServer == null) {
return false;
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
index 7549b13..eb0583b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java
@@ -27,7 +27,18 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface ServerProcedureInterface {
public enum ServerOperationType {
- CRASH_HANDLER, SWITCH_RPC_THROTTLE
+ CRASH_HANDLER, SWITCH_RPC_THROTTLE,
+ /**
+ * help find a available region server as worker and release worker after task done
+ * invoke SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker
+ * manage the split wal task flow, will retry if SPLIT_WAL_REMOTE failed
+ */
+ SPLIT_WAL,
+
+ /**
+ * send the split WAL request to region server and handle the response
+ */
+ SPLIT_WAL_REMOTE
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
index 9e3b311..1659ab5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java
@@ -36,6 +36,8 @@ class ServerQueue extends Queue<ServerName> {
case CRASH_HANDLER:
return true;
case SWITCH_RPC_THROTTLE:
+ case SPLIT_WAL:
+ case SPLIT_WAL_REMOTE:
return false;
default:
break;
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
new file mode 100644
index 0000000..3b2d0d5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java
@@ -0,0 +1,199 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.master.SplitWALManager;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * The procedure is to split a WAL. It will get an available region server and
+ * schedule a {@link SplitWALRemoteProcedure} to actually send the request to region
+ * server to split this WAL.
+ * It also check if the split wal task really succeed. If the WAL still exists, it will
+ * schedule another region server to split this WAL.
+ */
+@InterfaceAudience.Private
+public class SplitWALProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
+ implements ServerProcedureInterface {
+ private static final Logger LOG = LoggerFactory.getLogger(SplitWALProcedure.class);
+ private String walPath;
+ private ServerName worker;
+ private ServerName crashedServer;
+ private int attempts = 0;
+
+ public SplitWALProcedure() {
+ }
+
+ public SplitWALProcedure(String walPath, ServerName crashedServer) {
+ this.walPath = walPath;
+ this.crashedServer = crashedServer;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
+ switch (state) {
+ case ACQUIRE_SPLIT_WAL_WORKER:
+ worker = splitWALManager.acquireSplitWALWorker(this);
+ setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
+ return Flow.HAS_MORE_STATE;
+ case DISPATCH_WAL_TO_WORKER:
+ assert worker != null;
+ addChildProcedure(new SplitWALRemoteProcedure(worker, crashedServer, walPath));
+ setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
+ return Flow.HAS_MORE_STATE;
+ case RELEASE_SPLIT_WORKER:
+ boolean finished;
+ try {
+ finished = splitWALManager.isSplitWALFinished(walPath);
+ } catch (IOException ioe) {
+ long backoff = ProcedureUtil.getBackoffTimeMs(attempts++);
+ LOG.warn(
+ "Failed to check whether splitting wal {} success, wait {} seconds to retry",
+ walPath, backoff / 1000, ioe);
+ throw suspend(backoff);
+ }
+ splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
+ if (!finished) {
+ LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker);
+ setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
+ return Flow.HAS_MORE_STATE;
+ }
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env,
+ MasterProcedureProtos.SplitWALState splitOneWalState)
+ throws IOException, InterruptedException {
+ if (splitOneWalState == getInitialState()) {
+ return;
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected MasterProcedureProtos.SplitWALState getState(int stateId) {
+ return MasterProcedureProtos.SplitWALState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(MasterProcedureProtos.SplitWALState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MasterProcedureProtos.SplitWALState getInitialState() {
+ return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.serializeStateData(serializer);
+ MasterProcedureProtos.SplitWALData.Builder builder =
+ MasterProcedureProtos.SplitWALData.newBuilder();
+ builder.setWalPath(walPath).setCrashedServer(ProtobufUtil.toServerName(crashedServer));
+ if (worker != null) {
+ builder.setWorker(ProtobufUtil.toServerName(worker));
+ }
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ super.deserializeStateData(serializer);
+ MasterProcedureProtos.SplitWALData data =
+ serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
+ walPath = data.getWalPath();
+ crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+ if (data.hasWorker()) {
+ worker = ProtobufUtil.toServerName(data.getWorker());
+ }
+ }
+
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureProtos.ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ protected final ProcedureSuspendedException suspend(long backoff)
+ throws ProcedureSuspendedException {
+ attempts++;
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+
+ public String getWAL() {
+ return walPath;
+ }
+
+ @VisibleForTesting
+ public ServerName getWorker(){
+ return worker;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return this.crashedServer;
+ }
+
+ @Override
+ public boolean hasMetaTableRegion() {
+ return AbstractFSWALProvider.isMetaFile(new Path(walPath));
+ }
+
+ @Override
+ public ServerOperationType getServerOperationType() {
+ return ServerOperationType.SPLIT_WAL;
+ }
+
+ @Override
+ protected void afterReplay(MasterProcedureEnv env){
+ if(worker != null){
+ env.getMasterServices().getSplitWALManager().addUsedSplitWALWorker(worker);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
new file mode 100644
index 0000000..fb2dbd7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
@@ -0,0 +1,195 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException;
+import org.apache.hadoop.hbase.procedure2.NoServerDispatchException;
+import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+/**
+ * A remote procedure which is used to send split WAL request to region server.
+ * it will return null if the task is succeed or return a DoNotRetryIOException
+ * {@link SplitWALProcedure} will help handle the situation that encounter
+ * DoNotRetryIOException. Otherwise it will retry until succeed.
+ */
+@InterfaceAudience.Private
+public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv>
+ implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName>,
+ ServerProcedureInterface {
+ private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class);
+ private String walPath;
+ private ServerName worker;
+ private ServerName crashedServer;
+ private boolean dispatched;
+ private ProcedureEvent<?> event;
+ private boolean success = false;
+
+ public SplitWALRemoteProcedure() {
+ }
+
+ public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) {
+ this.worker = worker;
+ this.crashedServer = crashedServer;
+ this.walPath = wal;
+ }
+
+ @Override
+ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ if (dispatched) {
+ if (success) {
+ return null;
+ }
+ dispatched = false;
+ }
+ try {
+ env.getRemoteDispatcher().addOperationToNode(worker, this);
+ } catch (NoNodeDispatchException | NullTargetServerDispatchException
+ | NoServerDispatchException e) {
+ // When send to a wrong target server, it need construct a new SplitWALRemoteProcedure.
+ // Thus return null for this procedure and let SplitWALProcedure to handle this.
+ LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e);
+ return null;
+ }
+ dispatched = true;
+ event = new ProcedureEvent<>(this);
+ event.suspendIfNotReady(this);
+ throw new ProcedureSuspendedException();
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ MasterProcedureProtos.SplitWALRemoteData.Builder builder =
+ MasterProcedureProtos.SplitWALRemoteData.newBuilder();
+ builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker))
+ .setCrashedServer(ProtobufUtil.toServerName(crashedServer));
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ MasterProcedureProtos.SplitWALRemoteData data =
+ serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class);
+ walPath = data.getWalPath();
+ worker = ProtobufUtil.toServerName(data.getWorker());
+ crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+ }
+
+ @Override
+ public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env,
+ ServerName serverName) {
+ return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class,
+ MasterProcedureProtos.SplitWALParameter.newBuilder().setWalPath(walPath).build()
+ .toByteArray());
+ }
+
+ @Override
+ public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
+ IOException exception) {
+ complete(env, exception);
+ }
+
+ @Override
+ public void remoteOperationCompleted(MasterProcedureEnv env) {
+ complete(env, null);
+ }
+
+ private void complete(MasterProcedureEnv env, Throwable error) {
+ if (event == null) {
+ LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
+ getProcId());
+ return;
+ }
+ if (error == null) {
+ LOG.info("split WAL {} on {} succeeded", walPath, worker);
+ try {
+ env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
+ } catch (IOException e){
+ LOG.warn("remove WAL {} failed, ignore...", walPath, e);
+ }
+ success = true;
+ } else {
+ if (error instanceof DoNotRetryIOException) {
+ LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server",
+ walPath, worker, error);
+ success = true;
+ } else {
+ LOG.warn("split WAL {} failed, retry...", walPath, error);
+ success = false;
+ }
+
+ }
+ event.wake(env.getProcedureScheduler());
+ event = null;
+ }
+
+ @Override
+ public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) {
+ complete(env, error);
+ }
+
+ public String getWAL() {
+ return this.walPath;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ // return the crashed server is to use the queue of root ServerCrashProcedure
+ return this.crashedServer;
+ }
+
+ @Override
+ public boolean hasMetaTableRegion() {
+ return AbstractFSWALProvider.isMetaFile(new Path(walPath));
+ }
+
+ @Override
+ public ServerOperationType getServerOperationType() {
+ return ServerOperationType.SPLIT_WAL_REMOTE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6e8af18..a99ded3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.MemoryType;
@@ -88,7 +93,6 @@ import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
@@ -619,7 +623,10 @@ public class HRegionServer extends HasThread implements
rpcServices.isa.getPort(), this, canCreateBaseZNode());
// If no master in cluster, skip trying to track one or look for a cluster status.
if (!this.masterless) {
- this.csm = new ZkCoordinatedStateManager(this);
+ if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+ DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+ this.csm = new ZkCoordinatedStateManager(this);
+ }
masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
masterAddressTracker.start();
@@ -1950,7 +1957,7 @@ public class HRegionServer extends HasThread implements
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
}
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
- "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
+ HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
// Start the threads for compacted files discharger
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
@@ -1995,7 +2002,8 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds
sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
- if (this.csm != null) {
+ if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+ DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
// SplitLogWorker needs csm. If none, don't start this.
this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
splitLogWorker.start();
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 4a9712c..13804ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -152,7 +152,7 @@ public class SplitLogWorker implements Runnable {
return true;
}
- private static Status splitLog(String name, CancelableProgressable p, Configuration conf,
+ static Status splitLog(String name, CancelableProgressable p, Configuration conf,
RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
Path walDir;
FileSystem fs;
@@ -175,9 +175,11 @@ public class SplitLogWorker implements Runnable {
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
- if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf,
- p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
- factory)) {
+ SplitLogWorkerCoordination splitLogWorkerCoordination =
+ server.getCoordinatedStateManager() == null ? null
+ : server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
+ if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, p,
+ sequenceIdChecker, splitLogWorkerCoordination, factory)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
new file mode 100644
index 0000000..b94df22
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java
@@ -0,0 +1,109 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
+import org.apache.hadoop.hbase.util.KeyLocker;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+
+/**
+ * This callable is used to do the real split WAL task. It is called by
+ * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} from master and executed
+ * by executor service which is in charge of executing the events of EventType.RS_LOG_REPLAY
+ *
+ * When execute this callable, it will call SplitLogWorker.splitLog() to split the WAL.
+ * If the return value is SplitLogWorker.TaskExecutor.Status.DONE, it means the task is successful
+ * and it will return null to end the call. Otherwise it will throw an exception and let
+ * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} to handle this problem.
+ *
+ * This class is to replace the zk-based WAL splitting related code, {@link SplitLogWorker},
+ * {@link org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination} and
+ * {@link org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination} can be removed after
+ * we switch to procedure-based WAL splitting.
+ */
+@InterfaceAudience.Private
+public class SplitWALCallable implements RSProcedureCallable {
+ private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class);
+
+ private String walPath;
+ private Exception initError;
+ private HRegionServer rs;
+ private final KeyLocker<String> splitWALLocks = new KeyLocker<>();
+ private volatile Lock splitWALLock = null;
+
+
+ @Override
+ public void init(byte[] parameter, HRegionServer rs) {
+ try {
+ this.rs = rs;
+ MasterProcedureProtos.SplitWALParameter param =
+ MasterProcedureProtos.SplitWALParameter.parseFrom(parameter);
+ this.walPath = param.getWalPath();
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("parse proto buffer of split WAL request failed ", e);
+ initError = e;
+ }
+ }
+
+ @Override
+ public EventType getEventType() {
+ return EventType.RS_LOG_REPLAY;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (initError != null) {
+ throw initError;
+ }
+ //grab a lock
+ splitWALLock = splitWALLocks.acquireLock(walPath);
+ try{
+ splitWal();
+ LOG.info("split WAL {} succeed.", walPath);
+ } catch (IOException e){
+ LOG.warn("failed to split WAL {}.", walPath, e);
+ throw e;
+ }
+ finally {
+ splitWALLock.unlock();
+ }
+ return null;
+ }
+
+ public String getWalPath() {
+ return this.walPath;
+ }
+
+ private void splitWal() throws IOException {
+ SplitLogWorker.TaskExecutor.Status status =
+ SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory);
+ if (status != SplitLogWorker.TaskExecutor.Status.DONE) {
+ throw new IOException("Split WAL " + walPath + " failed at server ");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 388c53d..d72e756 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.master;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
@@ -143,7 +144,7 @@ public abstract class AbstractTestDLS {
conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
- conf.setInt("hbase.regionserver.wal.max.splitters", 3);
+ conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
conf.set("hbase.wal.provider", getWalProvider());
StartMiniClusterOption option = StartMiniClusterOption.builder()
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
index e55e375..242455c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -47,12 +49,16 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
@Category({ MasterTests.class, LargeTests.class })
public class TestRestartCluster {
@@ -63,6 +69,9 @@ public class TestRestartCluster {
private static final Logger LOG = LoggerFactory.getLogger(TestRestartCluster.class);
private HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ @Parameterized.Parameter
+ public boolean splitWALCoordinatedByZK;
+
private static final TableName[] TABLES = {
TableName.valueOf("restartTableOne"),
TableName.valueOf("restartTableTwo"),
@@ -70,6 +79,13 @@ public class TestRestartCluster {
};
private static final byte[] FAMILY = Bytes.toBytes("family");
+ @Before
+ public void setup() throws Exception {
+ LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK);
+ UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+ splitWALCoordinatedByZK);
+ }
+
@After public void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}
@@ -301,4 +317,9 @@ public class TestRestartCluster {
Thread.sleep(100);
}
}
+
+ @Parameterized.Parameters
+ public static Collection coordinatedByZK() {
+ return Arrays.asList(false, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
index 8a6f708..0aba487 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
@@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
@@ -45,6 +48,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +58,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* Tests the restarting of everything as done during rolling restarts.
*/
+@RunWith(Parameterized.class)
@Category({MasterTests.class, LargeTests.class})
public class TestRollingRestart {
@@ -65,6 +71,9 @@ public class TestRollingRestart {
@Rule
public TestName name = new TestName();
+ @Parameterized.Parameter
+ public boolean splitWALCoordinatedByZK;
+
@Test
public void testBasicRollingRestart() throws Exception {
@@ -78,6 +87,8 @@ public class TestRollingRestart {
// Start the cluster
log("Starting cluster");
Configuration conf = HBaseConfiguration.create();
+ conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
+ splitWALCoordinatedByZK);
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build();
@@ -87,7 +98,8 @@ public class TestRollingRestart {
cluster.waitForActiveAndReadyMaster();
// Create a table with regions
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName =
+ TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-"));
byte [] family = Bytes.toBytes("family");
log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE);
@@ -284,5 +296,9 @@ public class TestRollingRestart {
}
+ @Parameterized.Parameters
+ public static Collection coordinatedByZK() {
+ return Arrays.asList(false, true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
new file mode 100644
index 0000000..9e127c0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+
+@Category({ MasterTests.class, MediumTests.class })
+
+public class TestSplitWALManager {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSplitWALManager.class);
+
+ private static HBaseTestingUtility TEST_UTIL;
+ private HMaster master;
+ private SplitWALManager splitWALManager;
+ private TableName TABLE_NAME;
+ private byte[] FAMILY;
+
+ @Before
+ public void setup() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false);
+ TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1);
+ TEST_UTIL.startMiniCluster(3);
+ master = TEST_UTIL.getHBaseCluster().getMaster();
+ splitWALManager = master.getSplitWALManager();
+ TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager"));
+ FAMILY = Bytes.toBytes("test");
+ }
+
+ @After
+ public void teardown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testAcquireAndRelease() throws Exception {
+ List<FakeServerProcedure> testProcedures = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ testProcedures.add(new FakeServerProcedure(
+ TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+ }
+ ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
+ Assert.assertNotNull(server);
+ Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
+ Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
+
+ Exception e = null;
+ try {
+ splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
+ } catch (ProcedureSuspendedException suspendException) {
+ e = suspendException;
+ }
+ Assert.assertNotNull(e);
+ Assert.assertTrue(e instanceof ProcedureSuspendedException);
+
+ splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
+ .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
+ Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+ }
+
+ @Test
+ public void testAddNewServer() throws Exception {
+ List<FakeServerProcedure> testProcedures = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ testProcedures.add(new FakeServerProcedure(
+ TEST_UTIL.getHBaseCluster().getServerHoldingMeta()));
+ }
+ ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0));
+ Assert.assertNotNull(server);
+ Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1)));
+ Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2)));
+
+ Exception e = null;
+ try {
+ splitWALManager.acquireSplitWALWorker(testProcedures.get(3));
+ } catch (ProcedureSuspendedException suspendException) {
+ e = suspendException;
+ }
+ Assert.assertNotNull(e);
+ Assert.assertTrue(e instanceof ProcedureSuspendedException);
+
+ JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer();
+ newServer.waitForServerOnline();
+ Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
+ }
+
+ @Test
+ public void testCreateSplitWALProcedures() throws Exception {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+ // load table
+ TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
+ ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
+ ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+ Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(),
+ AbstractFSWALProvider.getWALDirectoryName(metaServer.toString()));
+ // Test splitting meta wal
+ FileStatus[] wals =
+ TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER);
+ Assert.assertEquals(1, wals.length);
+ List<Procedure> testProcedures =
+ splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
+ Assert.assertEquals(1, testProcedures.size());
+ ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
+ Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+
+ // Test splitting wal
+ wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER);
+ Assert.assertEquals(1, wals.length);
+ testProcedures =
+ splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer);
+ Assert.assertEquals(1, testProcedures.size());
+ ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0));
+ Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath()));
+ }
+
+ @Test
+ public void testAcquireAndReleaseSplitWALWorker() throws Exception {
+ ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
+ List<FakeServerProcedure> testProcedures = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ FakeServerProcedure procedure =
+ new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
+ testProcedures.add(procedure);
+ ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
+ }
+ TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
+ FakeServerProcedure failedProcedure =
+ new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
+ ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE,
+ HConstants.NO_NONCE);
+ TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
+ Assert.assertFalse(failedProcedure.isWorkerAcquired());
+ // let one procedure finish and release worker
+ testProcedures.get(0).countDown();
+ TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired());
+ Assert.assertTrue(testProcedures.get(0).isSuccess());
+ }
+
+ @Test
+ public void testGetWALsToSplit() throws Exception {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+ // load table
+ TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
+ ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+ List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true);
+ Assert.assertEquals(1, metaWals.size());
+ List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false);
+ Assert.assertEquals(1, wals.size());
+ ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+ .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
+ .get();
+ metaWals = splitWALManager.getWALsToSplit(testServer, true);
+ Assert.assertEquals(0, metaWals.size());
+ }
+
+ @Test
+ public void testSplitLogs() throws Exception {
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE);
+ // load table
+ TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY);
+ ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor();
+ ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta();
+ ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream()
+ .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny()
+ .get();
+ List<Procedure> procedures = splitWALManager.splitWALs(testServer, false);
+ Assert.assertEquals(1, procedures.size());
+ ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
+ Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size());
+
+ procedures = splitWALManager.splitWALs(metaServer, true);
+ Assert.assertEquals(1, procedures.size());
+ ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0));
+ Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size());
+ Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size());
+ }
+
+ @Test
+ public void testWorkerReloadWhenMasterRestart() throws Exception {
+ List<FakeServerProcedure> testProcedures = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ FakeServerProcedure procedure =
+ new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName());
+ testProcedures.add(procedure);
+ ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure,
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+ TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired());
+ // Kill master
+ TEST_UTIL.getHBaseCluster().killMaster(master.getServerName());
+ TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000);
+ // restart master
+ TEST_UTIL.getHBaseCluster().startMaster();
+ TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
+ this.master = TEST_UTIL.getHBaseCluster().getMaster();
+
+ FakeServerProcedure failedProcedure =
+ new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta());
+ ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure,
+ HConstants.NO_NONCE, HConstants.NO_NONCE);
+ TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire());
+ Assert.assertFalse(failedProcedure.isWorkerAcquired());
+ for (int i = 0; i < 3; i++) {
+ testProcedures.get(i).countDown();
+ }
+ failedProcedure.countDown();
+ }
+
+ public static final class FakeServerProcedure
+ extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState>
+ implements ServerProcedureInterface {
+
+ private ServerName serverName;
+ private ServerName worker;
+ private CountDownLatch barrier = new CountDownLatch(1);
+ private boolean triedToAcquire = false;
+
+ public FakeServerProcedure() {
+ }
+
+ public FakeServerProcedure(ServerName serverName) {
+ this.serverName = serverName;
+ }
+
+ public ServerName getServerName() {
+ return serverName;
+ }
+
+ @Override
+ public boolean hasMetaTableRegion() {
+ return false;
+ }
+
+ @Override
+ public ServerOperationType getServerOperationType() {
+ return SPLIT_WAL;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env,
+ MasterProcedureProtos.SplitWALState state)
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
+ switch (state) {
+ case ACQUIRE_SPLIT_WAL_WORKER:
+ triedToAcquire = true;
+ worker = splitWALManager.acquireSplitWALWorker(this);
+ setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER);
+ return Flow.HAS_MORE_STATE;
+ case DISPATCH_WAL_TO_WORKER:
+ barrier.await();
+ setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
+ return Flow.HAS_MORE_STATE;
+ case RELEASE_SPLIT_WORKER:
+ splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ }
+
+ public boolean isWorkerAcquired() {
+ return worker != null;
+ }
+
+ public boolean isTriedToAcquire() {
+ return triedToAcquire;
+ }
+
+ public void countDown() {
+ this.barrier.countDown();
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state)
+ throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ protected MasterProcedureProtos.SplitWALState getState(int stateId) {
+ return MasterProcedureProtos.SplitWALState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(MasterProcedureProtos.SplitWALState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected MasterProcedureProtos.SplitWALState getInitialState() {
+ return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER;
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ return true;
+ }
+
+ @Override
+ protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ protected boolean abort(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ MasterProcedureProtos.SplitWALData.Builder builder =
+ MasterProcedureProtos.SplitWALData.newBuilder();
+ builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName));
+ serializer.serialize(builder.build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ MasterProcedureProtos.SplitWALData data =
+ serializer.deserialize(MasterProcedureProtos.SplitWALData.class);
+ serverName = ProtobufUtil.toServerName(data.getCrashedServer());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/281d6429/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
index af2076e..6751eaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -44,9 +46,13 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@RunWith(Parameterized.class)
@Category({MasterTests.class, LargeTests.class})
public class TestServerCrashProcedure {
@@ -58,6 +64,9 @@ public class TestServerCrashProcedure {
protected HBaseTestingUtility util;
+ @Parameter
+ public boolean splitWALCoordinatedByZK;
+
private ProcedureMetrics serverCrashProcMetrics;
private long serverCrashSubmittedCount = 0;
private long serverCrashFailedCount = 0;
@@ -67,6 +76,10 @@ public class TestServerCrashProcedure {
conf.set("hbase.balancer.tablesOnMaster", "none");
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3);
+ conf.setBoolean("hbase.split.writer.creation.bounded", true);
+ conf.setInt("hbase.regionserver.hlog.splitlog.writer.threads", 8);
+ LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK);
+ conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK);
}
@Before
@@ -173,7 +186,8 @@ public class TestServerCrashProcedure {
@Test
public void testConcurrentSCPForSameServer() throws Exception {
- final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer");
+ final TableName tableName =
+ TableName.valueOf("testConcurrentSCPForSameServer-" + splitWALCoordinatedByZK);
try (Table t = createTable(tableName)) {
// Load the table with a bit of data so some logs to split and some edits in each region.
this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]);
@@ -222,4 +236,9 @@ public class TestServerCrashProcedure {
serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount();
serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount();
}
+
+ @Parameterized.Parameters
+ public static Collection coordinatedByZK() {
+ return Arrays.asList(false, true);
+ }
}