You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/07/27 15:24:18 UTC
[hbase] branch branch-2 updated: HBASE-24632 Enable procedure-based
log splitting as default in hbase3 Add deprecation of 'classic' zk-based
WAL splitter.
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new edb4cd5 HBASE-24632 Enable procedure-based log splitting as default in hbase3 Add deprecation of 'classic' zk-based WAL splitter.
edb4cd5 is described below
commit edb4cd534c6ad96745009b2bcc8587d52d1657a2
Author: stack <st...@apache.org>
AuthorDate: Tue Jun 30 20:53:36 2020 -0700
HBASE-24632 Enable procedure-based log splitting as default in hbase3 Add deprecation of 'classic' zk-based WAL splitter.
Also fix three bugs:
* We were trying to delete non-empty directory; weren't doing
accounting for meta WALs where meta had moved off the server
(successfully)
* We were deleting split WALs rather than archiving them.
* We were not handling corrupt files.
Deprecations and removal of tests of old system.
---
.../java/org/apache/hadoop/hbase/HConstants.java | 11 +-
.../org/apache/hadoop/hbase/SplitLogCounters.java | 5 +
.../java/org/apache/hadoop/hbase/SplitLogTask.java | 3 +
.../coordination/SplitLogManagerCoordination.java | 3 +
.../coordination/SplitLogWorkerCoordination.java | 10 +-
.../coordination/ZkCoordinatedStateManager.java | 3 +
.../hadoop/hbase/master/MasterRpcServices.java | 7 +
.../hadoop/hbase/master/MasterWalManager.java | 34 ++-
.../hbase/master/MetricsMasterWrapperImpl.java | 3 +
.../hadoop/hbase/master/SplitLogManager.java | 3 +
.../hadoop/hbase/master/SplitWALManager.java | 41 ++-
.../master/procedure/ServerCrashProcedure.java | 55 ++--
.../master/procedure/SplitWALRemoteProcedure.java | 2 +-
.../hadoop/hbase/regionserver/SplitLogWorker.java | 8 +-
.../regionserver/handler/WALSplitterHandler.java | 5 +-
.../org/apache/hadoop/hbase/wal/WALSplitUtil.java | 105 +++-----
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 290 ++++++++++++---------
.../hadoop/hbase/master/AbstractTestDLS.java | 239 +----------------
.../master/assignment/MockMasterServices.java | 22 +-
.../hbase/regionserver/TestCleanupMetaWAL.java | 2 +-
.../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 22 +-
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 56 ++--
.../hadoop/hbase/zookeeper/MetaTableLocator.java | 4 +-
.../apache/hadoop/hbase/zookeeper/ZKSplitLog.java | 3 +
24 files changed, 392 insertions(+), 544 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1eea08b..16bee93 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1480,9 +1480,18 @@ public final class HConstants {
public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
"hbase.client.fast.fail.interceptor.impl";
+ /**
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter; see SplitWALManager.
+ */
+ @Deprecated
public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated";
- public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true;
+ /**
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0.
+ */
+ @Deprecated
+ public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = false;
public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
index 6be1131..443c8d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java
@@ -25,9 +25,14 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Counters kept by the distributed WAL split log process.
* Used by master and regionserver packages.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter, see SplitWALManager
*/
+@Deprecated
@InterfaceAudience.Private
public class SplitLogCounters {
+ private SplitLogCounters() {}
+
//Spnager counters
public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder();
public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
index dd4eb93..ca07fcb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
@@ -31,7 +31,10 @@ import org.apache.hadoop.hbase.util.Bytes;
* Encapsulates protobuf serialization/deserialization so we don't leak generated pb outside of
* this class. Used by regionserver and master packages.
* <p>Immutable
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter, see SplitWALManager
*/
+@Deprecated
@InterfaceAudience.Private
public class SplitLogTask {
private final ServerName originServer;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
index 8682c91..33d8f2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogManagerCoordination.java
@@ -40,8 +40,11 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* Methods required for task life circle: <BR>
* {@link #checkTaskStillAvailable(String)} Check that task is still there <BR>
* {@link #checkTasks()} check for unassigned tasks and resubmit them
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter, see SplitWALManager
*/
@InterfaceAudience.Private
+@Deprecated
public interface SplitLogManagerCoordination {
/**
* Detail class that shares data between coordination and split log manager
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
index ad74015..a9fae46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java
@@ -1,5 +1,4 @@
- /**
- *
+ /*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,16 +17,14 @@
*/
package org.apache.hadoop.hbase.coordination;
import java.util.concurrent.atomic.LongAdder;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.SplitLogTask;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
-
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -44,7 +41,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* <p>
* Important methods for WALSplitterHandler: <BR>
* splitting task has completed.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter, see SplitWALManager
*/
+@Deprecated
@InterfaceAudience.Private
public interface SplitLogWorkerCoordination {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index ba73d53..323e575 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -25,7 +25,10 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter (see SplitWALManager) which doesn't use this zk-based coordinator.
*/
+@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ZkCoordinatedStateManager implements CoordinatedStateManager {
protected ZKWatcher watcher;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index adf8062..b46abc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.master.locking.LockProcedure;
@@ -2429,6 +2430,12 @@ public class MasterRpcServices extends RSRpcServices implements
@Override
public ReportProcedureDoneResponse reportProcedureDone(RpcController controller,
ReportProcedureDoneRequest request) throws ServiceException {
+ // Check Masters is up and ready for duty before progressing. Remote side will keep trying.
+ try {
+ this.master.checkServiceStarted();
+ } catch (ServerNotRunningYetException snrye) {
+ throw new ServiceException(snrye);
+ }
request.getResultList().forEach(result -> {
if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) {
master.remoteProcedureCompleted(result.getProcId());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
index 3f6bd8a..fe9107f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -44,7 +43,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
@@ -55,6 +53,9 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
public class MasterWalManager {
private static final Logger LOG = LoggerFactory.getLogger(MasterWalManager.class);
+ /**
+ * Filter *in* WAL files that are for the hbase:meta Region.
+ */
final static PathFilter META_FILTER = new PathFilter() {
@Override
public boolean accept(Path p) {
@@ -62,6 +63,9 @@ public class MasterWalManager {
}
};
+ /**
+ * Filter *out* WAL files that are for the hbase:meta Region; i.e. return user-space WALs only.
+ */
@VisibleForTesting
public final static PathFilter NON_META_FILTER = new PathFilter() {
@Override
@@ -81,10 +85,19 @@ public class MasterWalManager {
// The Path to the old logs dir
private final Path oldLogDir;
+
private final Path rootDir;
// create the split log lock
private final Lock splitLogLock = new ReentrantLock();
+
+ /**
+ * Superceded by {@link SplitWALManager}; i.e. procedure-based WAL splitting rather than
+ * 'classic' zk-coordinated WAL splitting.
+ * @deprecated since 2.3.0 and 3.0.0 to be removed in 4.0.0; replaced by {@link SplitWALManager}.
+ * @see SplitWALManager
+ */
+ @Deprecated
private final SplitLogManager splitLogManager;
// Is the fileystem ok?
@@ -101,7 +114,6 @@ public class MasterWalManager {
this.rootDir = CommonFSUtils.getWALRootDir(conf);
this.services = services;
this.splitLogManager = new SplitLogManager(services, conf);
-
this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}
@@ -203,7 +215,7 @@ public class MasterWalManager {
*/
@Deprecated
public Set<ServerName> getFailedServersFromLogFolders() throws IOException {
- boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors",
+ boolean retrySplitting = !conf.getBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY,
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
Set<ServerName> serverNames = new HashSet<>();
@@ -360,11 +372,13 @@ public class MasterWalManager {
}
/**
- * For meta region open and closed normally on a server, it may leave some meta
- * WAL in the server's wal dir. Since meta region is no long on this server,
- * The SCP won't split those meta wals, just leaving them there. So deleting
- * the wal dir will fail since the dir is not empty. Actually We can safely achive those
- * meta log and Archiving the meta log and delete the dir.
+ * The hbase:meta region may OPEN and CLOSE without issue on a server and then move elsewhere.
+ * On CLOSE, the WAL for the hbase:meta table may not be archived yet (The WAL is only needed if
+ * hbase:meta did not close cleanaly). Since meta region is no long on this server,
+ * the ServerCrashProcedure won't split these leftover hbase:meta WALs, just leaving them in
+ * the WAL splitting dir. If we try to delete the WAL splitting for the server, it fail since
+ * the dir is not totally empty. We can safely archive these hbase:meta log; then the
+ * WAL dir can be deleted.
* @param serverName the server to archive meta log
*/
public void archiveMetaLog(final ServerName serverName) {
@@ -395,6 +409,4 @@ public class MasterWalManager {
LOG.warn("Failed archiving meta log for server " + serverName, ie);
}
}
-
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
index 4803a49..3c8b971 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetricsMasterWrapperImpl.java
@@ -154,6 +154,9 @@ public class MetricsMasterWrapperImpl implements MetricsMasterWrapper {
@Override
public Map<String,Entry<Long,Long>> getTableSpaceUtilization() {
+ if (master == null) {
+ return Collections.emptyMap();
+ }
QuotaObserverChore quotaChore = master.getQuotaObserverChore();
if (quotaChore == null) {
return Collections.emptyMap();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 7ed0d9a..3e0c746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -86,7 +86,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* again. If a task is resubmitted then there is a risk that old "delete task"
* can delete the re-submission.
* @see SplitWALManager for an alternate implementation based on Procedures.
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter, see SplitWALManager.
*/
+@Deprecated
@InterfaceAudience.Private
public class SplitLogManager {
private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
index 76407e0..9ff84dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,35 +16,36 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
-
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER;
import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER;
-
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -78,15 +79,17 @@ public class SplitWALManager {
private final Path rootDir;
private final FileSystem fs;
private final Configuration conf;
+ private final Path walArchiveDir;
- public SplitWALManager(MasterServices master) {
+ public SplitWALManager(MasterServices master) throws IOException {
this.master = master;
this.conf = master.getConfiguration();
this.splitWorkerAssigner = new SplitWorkerAssigner(this.master,
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
this.rootDir = master.getMasterFileSystem().getWALRootDir();
+ // TODO: This should be the WAL FS, not the Master FS?
this.fs = master.getMasterFileSystem().getFileSystem();
-
+ this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}
public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta)
@@ -117,14 +120,24 @@ public class SplitWALManager {
return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
}
- public void deleteSplitWAL(String wal) throws IOException {
- fs.delete(new Path(wal), false);
+ /**
+ * Archive processed WAL
+ */
+ public void archive(String wal) throws IOException {
+ WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir);
}
public void deleteWALDir(ServerName serverName) throws IOException {
Path splitDir = getWALSplitDir(serverName);
- if (!fs.delete(splitDir, false)) {
- LOG.warn("Failed delete {}", splitDir);
+ try {
+ if (!fs.delete(splitDir, false)) {
+ LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true));
+ }
+ } catch (PathIsNotEmptyDirectoryException e) {
+ FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir);
+ LOG.warn("PathIsNotEmptyDirectoryException {}",
+ Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList()));
+ throw e;
}
}
@@ -197,7 +210,11 @@ public class SplitWALManager {
this.maxSplitTasks = maxSplitTasks;
this.master = master;
this.event = new ProcedureEvent<>("split-WAL-worker-assigning");
- this.master.getServerManager().registerListener(this);
+ // ServerManager might be null in a test context where we are mocking; allow for this
+ ServerManager sm = this.master.getServerManager();
+ if (sm != null) {
+ sm.registerListener(this);
+ }
}
public synchronized Optional<ServerName> acquire() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
index a6ebbaa..17606c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,10 +16,8 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
-
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -47,7 +45,6 @@ import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState;
@@ -154,8 +151,8 @@ public class ServerCrashProcedure
break;
case SERVER_CRASH_SPLIT_META_LOGS:
if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
- DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
- splitMetaLogs(env);
+ DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
+ zkCoordinatedSplitMetaLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
} else {
am.getRegionStates().metaLogSplitting(serverName);
@@ -164,8 +161,7 @@ public class ServerCrashProcedure
}
break;
case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
- if(isSplittingDone(env, true)){
- cleanupSplitDir(env);
+ if (isSplittingDone(env, true)) {
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
am.getRegionStates().metaLogSplit(serverName);
} else {
@@ -195,7 +191,7 @@ public class ServerCrashProcedure
case SERVER_CRASH_SPLIT_LOGS:
if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
- splitLogs(env);
+ zkCoordinatedSplitLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
} else {
am.getRegionStates().logSplitting(this.serverName);
@@ -256,19 +252,27 @@ public class ServerCrashProcedure
private void cleanupSplitDir(MasterProcedureEnv env) {
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
try {
+ if (!this.carryingMeta) {
+ // If we are NOT carrying hbase:meta, check if any left-over hbase:meta WAL files from an
+ // old hbase:meta tenancy on this server; clean these up if any before trying to remove the
+ // WAL directory of this server or we will fail. See archiveMetaLog comment for more details
+ // on this condition.
+ env.getMasterServices().getMasterWalManager().archiveMetaLog(this.serverName);
+ }
splitWALManager.deleteWALDir(serverName);
} catch (IOException e) {
- LOG.warn("Remove WAL directory of server {} failed, ignore...", serverName, e);
+ LOG.warn("Remove WAL directory for {} failed, ignore...{}", serverName, e.getMessage());
}
}
private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) {
- LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta);
SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager();
try {
- return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0;
+ int wals = splitWALManager.getWALsToSplit(serverName, splitMeta).size();
+ LOG.debug("Check if {} WAL splitting is done? wals={}, meta={}", serverName, wals, splitMeta);
+ return wals == 0;
} catch (IOException e) {
- LOG.warn("get filelist of serverName {} failed, retry...", serverName, e);
+ LOG.warn("Get WALs of {} failed, retry...", serverName, e);
return false;
}
}
@@ -293,7 +297,12 @@ public class ServerCrashProcedure
return hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri);
}
- private void splitMetaLogs(MasterProcedureEnv env) throws IOException {
+ /**
+ * Split hbase:meta logs using 'classic' zk-based coordination.
+ * Superceded by procedure-based WAL splitting.
+ * @see #createSplittingWalProcedures(MasterProcedureEnv, boolean)
+ */
+ private void zkCoordinatedSplitMetaLogs(MasterProcedureEnv env) throws IOException {
LOG.debug("Splitting meta WALs {}", this);
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
@@ -303,7 +312,12 @@ public class ServerCrashProcedure
LOG.debug("Done splitting meta WALs {}", this);
}
- private void splitLogs(final MasterProcedureEnv env) throws IOException {
+ /**
+ * Split logs using 'classic' zk-based coordination.
+ * Superceded by procedure-based WAL splitting.
+ * @see #createSplittingWalProcedures(MasterProcedureEnv, boolean)
+ */
+ private void zkCoordinatedSplitLogs(final MasterProcedureEnv env) throws IOException {
LOG.debug("Splitting WALs {}", this);
MasterWalManager mwm = env.getMasterServices().getMasterWalManager();
AssignmentManager am = env.getMasterServices().getAssignmentManager();
@@ -333,14 +347,12 @@ public class ServerCrashProcedure
currentRunningState = getCurrentState();
}
int childrenLatch = getChildrenLatch();
- status.setStatus(msg + " current State " + currentRunningState
- + (childrenLatch > 0 ? "; remaining num of running child procedures = " + childrenLatch
- : ""));
+ status.setStatus(msg + " current State " + currentRunningState + (childrenLatch > 0?
+ "; remaining num of running child procedures = " + childrenLatch: ""));
}
@Override
- protected void rollbackState(MasterProcedureEnv env, ServerCrashState state)
- throws IOException {
+ protected void rollbackState(MasterProcedureEnv env, ServerCrashState state) throws IOException {
// Can't rollback.
throw new UnsupportedOperationException("unhandled state=" + state);
}
@@ -424,7 +436,8 @@ public class ServerCrashProcedure
int size = state.getRegionsOnCrashedServerCount();
if (size > 0) {
this.regionsOnCrashedServer = new ArrayList<>(size);
- for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri: state.getRegionsOnCrashedServerList()) {
+ for (org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo ri:
+ state.getRegionsOnCrashedServerList()) {
this.regionsOnCrashedServer.add(ProtobufUtil.toRegionInfo(ri));
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
index c829e51..54607e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
@@ -96,7 +96,7 @@ public class SplitWALRemoteProcedure extends ServerRemoteProcedure
protected void complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
try {
- env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath);
+ env.getMasterServices().getSplitWALManager().archive(walPath);
} catch (IOException e) {
LOG.warn("Failed split of {}; ignore...", walPath, e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index 44c8f5c..864b395 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -56,7 +56,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* the absence of a global lock there is a unavoidable race here - a worker might have just finished
* its task when it is stripped of its ownership. Here we rely on the idempotency of the log
* splitting task for correctness
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter, see SplitWALRemoteProcedure
*/
+@Deprecated
@InterfaceAudience.Private
public class SplitLogWorker implements Runnable {
@@ -101,10 +104,9 @@ public class SplitLogWorker implements Runnable {
// encountered a bad non-retry-able persistent error.
try {
SplitLogWorkerCoordination splitLogWorkerCoordination =
- server.getCoordinatedStateManager() == null ? null
- : server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
+ server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf,
- p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
+ p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
index d6009e3..ff39531 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
/**
* Handles log splitting a wal
* Used by the zk-based distributed log splitting. Created by ZKSplitLogWorkerCoordination.
- */
+ * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
+ * distributed WAL splitter, see SplitWALManager
+ */
+@Deprecated
@InterfaceAudience.Private
public class WALSplitterHandler extends EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(WALSplitterHandler.class);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
index 2b38494..f35bae2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
@@ -56,9 +55,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@@ -86,9 +83,6 @@ public final class WALSplitUtil {
* the splitLogFile() part. If the master crashes then this function might get called multiple
* times.
* <p>
- * @param logfile
- * @param conf
- * @throws IOException
*/
public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
Path walDir = CommonFSUtils.getWALRootDir(conf);
@@ -99,20 +93,9 @@ public final class WALSplitUtil {
} else {
walPath = new Path(walDir, logfile);
}
- finishSplitLogFile(walDir, oldLogDir, walPath, conf);
- }
-
- static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
- Configuration conf) throws IOException {
- List<Path> processedLogs = new ArrayList<>();
- List<Path> corruptedLogs = new ArrayList<>();
FileSystem walFS = walDir.getFileSystem(conf);
- if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
- corruptedLogs.add(walPath);
- } else {
- processedLogs.add(walPath);
- }
- archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
+ boolean corrupt = ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS);
+ archive(walPath, corrupt, oldLogDir, walFS, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
walFS.delete(stagingDir, true);
}
@@ -121,40 +104,40 @@ public final class WALSplitUtil {
* Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
* that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
*/
- private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
- final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
- final Path corruptDir =
- new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
- if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
- LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
- corruptDir);
- }
- if (!walFS.mkdirs(corruptDir)) {
- LOG.info("Unable to mkdir {}", corruptDir);
- }
- walFS.mkdirs(oldWALDir);
-
- // this method can get restarted or called multiple times for archiving
- // the same log files.
- for (Path corruptedWAL : corruptedWALs) {
- Path p = new Path(corruptDir, corruptedWAL.getName());
- if (walFS.exists(corruptedWAL)) {
- if (!walFS.rename(corruptedWAL, p)) {
- LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
- } else {
- LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
- }
+ static void archive(final Path wal, final boolean corrupt, final Path oldWALDir,
+ final FileSystem walFS, final Configuration conf) throws IOException {
+ Path dir;
+ Path target;
+ if (corrupt) {
+ dir = new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
+ if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
+ LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", dir);
}
+ target = new Path(dir, wal.getName());
+ } else {
+ dir = oldWALDir;
+ target = AbstractFSWAL.getWALArchivePath(oldWALDir, wal);
}
+ mkdir(walFS, dir);
+ moveWAL(walFS, wal, target);
+ }
- for (Path p : processedWALs) {
- Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
- if (walFS.exists(p)) {
- if (!CommonFSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
- LOG.warn("Unable to move {} to {}", p, newPath);
- } else {
- LOG.info("Archived processed log {} to {}", p, newPath);
- }
+ private static void mkdir(FileSystem fs, Path dir) throws IOException {
+ if (!fs.mkdirs(dir)) {
+ LOG.warn("Failed mkdir {}", dir);
+ }
+ }
+
+ /**
+ * Move WAL. Used to move processed WALs to archive or bad WALs to corrupt WAL dir.
+ * WAL may have already been moved; makes allowance.
+ */
+ public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOException {
+ if (fs.exists(p)) {
+ if (!CommonFSUtils.renameAndSetModifyTime(fs, p, targetDir)) {
+ LOG.warn("Failed move of {} to {}", p, targetDir);
+ } else {
+ LOG.info("Moved {} to {}", p, targetDir);
}
}
}
@@ -171,7 +154,6 @@ public final class WALSplitUtil {
* @param tmpDirName of the directory used to sideline old recovered edits file
* @param conf configuration
* @return Path to file into which to dump split log edits.
- * @throws IOException
*/
@SuppressWarnings("deprecation")
@VisibleForTesting
@@ -216,7 +198,7 @@ public final class WALSplitUtil {
/**
* Get the completed recovered edits file path, renaming it to be by last edit in the file from
* its first edit. Then we could use the name to skip recovered edits when doing
- * {@link HRegion#replayRecoveredEditsIfAny}.
+ * HRegion#replayRecoveredEditsIfAny(Map, CancelableProgressable, MonitoredTask).
* @return dstPath take file's last edit log seq num as the name
*/
static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
@@ -303,7 +285,6 @@ public final class WALSplitUtil {
* @param walFS WAL FileSystem used to retrieving split edits files.
* @param regionDir WAL region dir to look for recovered edits files under.
* @return Files in passed <code>regionDir</code> as a sorted set.
- * @throws IOException
*/
public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
final Path regionDir) throws IOException {
@@ -349,7 +330,6 @@ public final class WALSplitUtil {
* @param fs the file system used to rename bad edits file.
* @param edits Edits file to move aside.
* @return The name of the moved aside file.
- * @throws IOException
*/
public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
throws IOException {
@@ -452,9 +432,9 @@ public final class WALSplitUtil {
}
private final ClientProtos.MutationProto.MutationType type;
- public final Mutation mutation;
- public final long nonceGroup;
- public final long nonce;
+ @SuppressWarnings("checkstyle:VisibilityModifier") public final Mutation mutation;
+ @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonceGroup;
+ @SuppressWarnings("checkstyle:VisibilityModifier") public final long nonce;
@Override
public int compareTo(final MutationReplay d) {
@@ -483,12 +463,9 @@ public final class WALSplitUtil {
/**
* This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
* WALEdit from the passed in WALEntry
- * @param entry
- * @param cells
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
* extracted from the passed in WALEntry.
* @return list of Pair<MutationType, Mutation> to be replayed
- * @throws IOException
*/
public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
@@ -516,7 +493,9 @@ public final class WALSplitUtil {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
- if (val != null) val.add(cell);
+ if (val != null) {
+ val.add(cell);
+ }
boolean isNewRowOrType =
previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
@@ -575,8 +554,6 @@ public final class WALSplitUtil {
* @param tableName the table name
* @param encodedRegionName the encoded region name
* @param familyName the column family name
- * @param seqId the sequence id which used to generate file name
- * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
* @return Path to recovered.hfiles directory of the region's column family.
*/
static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 8384ab5..657eb22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;
-import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -30,6 +29,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
+import edu.umd.cs.findbugs.annotations.Nullable;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -63,24 +62,26 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+import javax.validation.constraints.Null;
/**
* Split RegionServer WAL files. Splits the WAL into new files,
* one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
- * See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or
- * {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable,
- * LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for
- * entry-point.
+ * Create an instance and call {@link #splitWAL(FileStatus, CancelableProgressable)} per file or
+ * use static helper methods.
*/
@InterfaceAudience.Private
public class WALSplitter {
private static final Logger LOG = LoggerFactory.getLogger(WALSplitter.class);
+ public static final String SPLIT_SKIP_ERRORS_KEY = "hbase.hlog.split.skip.errors";
- /** By default we retry errors in splitting, rather than skipping. */
+ /**
+ * By default we retry errors in splitting, rather than skipping.
+ */
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
// Parameters for split process
- protected final Path walDir;
+ protected final Path walRootDir;
protected final FileSystem walFS;
protected final Configuration conf;
final Path rootDir;
@@ -100,8 +101,6 @@ public class WALSplitter {
private final WALFactory walFactory;
- private MonitoredTask status;
-
// For checking the latest flushed sequence id
protected final LastSequenceId sequenceIdChecker;
@@ -132,17 +131,28 @@ public class WALSplitter {
public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
public final static String SPLIT_WAL_WRITER_THREADS =
- "hbase.regionserver.hlog.splitlog.writer.threads";
+ "hbase.regionserver.hlog.splitlog.writer.threads";
+
+ private final int numWriterThreads;
+ private final long bufferSize;
+ private final boolean splitWriterCreationBounded;
+ private final boolean hfile;
+ private final boolean skipErrors;
+
+ WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
+ FileSystem walFS, Path rootDir, FileSystem rootFS) {
+ this(factory, conf, walRootDir, walFS, rootDir, rootFS, null, null, null);
+ }
@VisibleForTesting
- WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
- Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
+ WALSplitter(final WALFactory factory, Configuration conf, Path walRootDir,
+ FileSystem walFS, Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName =
- conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
+ conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
- this.walDir = walDir;
+ this.walRootDir = walRootDir;
this.walFS = walFS;
this.rootDir = rootDir;
this.rootFS = rootFS;
@@ -150,32 +160,17 @@ public class WALSplitter {
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.rsServices = rsServices;
this.walFactory = factory;
- PipelineController controller = new PipelineController();
this.tmpDirName =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
-
-
// if we limit the number of writers opened for sinking recovered edits
- boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
- boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
- long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
- int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
-
- if (splitToHFile) {
- entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
- outputSink =
- new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
- } else if (splitWriterCreationBounded) {
- entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
- outputSink =
- new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
- } else {
- entryBuffers = new EntryBuffers(controller, bufferSize);
- outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
- }
+ this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
+ this.bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
+ this.numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
+ this.hfile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
+ this.skipErrors = conf.getBoolean(SPLIT_SKIP_ERRORS_KEY, SPLIT_SKIP_ERRORS_DEFAULT);
}
- WALFactory getWalFactory(){
+ WALFactory getWalFactory() {
return this.walFactory;
}
@@ -193,17 +188,25 @@ public class WALSplitter {
/**
* Splits a WAL file.
+ * Used by old {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and tests.
+ * Not used by new procedure-based WAL splitter.
+ *
* @return false if it is interrupted by the progress-able.
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
RegionServerServices rsServices) throws IOException {
+ Preconditions.checkNotNull(splitLogWorkerCoordination,
+ "Can't be null; needed to propagate WAL corruption if any found");
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
- WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
- splitLogWorkerCoordination, rsServices);
- return s.splitLogFile(logfile, reporter);
+ WALSplitter splitter = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
+ splitLogWorkerCoordination, rsServices);
+ // splitWAL returns a data structure with whether split is finished and if the file is corrupt.
+ // We don't need to propagate corruption flag here because it is propagated by the
+ // SplitLogWorkerCoordination.
+ return splitter.splitWAL(logfile, reporter).isFinished();
}
/**
@@ -214,85 +217,123 @@ public class WALSplitter {
* @return List of output files created by the split.
*/
@VisibleForTesting
- public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
+ public static List<Path> split(Path walRootDir, Path walsDir, Path archiveDir, FileSystem walFS,
Configuration conf, final WALFactory factory) throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
- final FileStatus[] logfiles =
- SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null);
+ WALSplitter splitter = new WALSplitter(factory, conf, walRootDir, walFS, rootDir, rootFS);
+ final FileStatus[] wals =
+ SplitLogManager.getFileList(conf, Collections.singletonList(walsDir), null);
List<Path> splits = new ArrayList<>();
- if (ArrayUtils.isNotEmpty(logfiles)) {
- for (FileStatus logfile : logfiles) {
- WALSplitter s =
- new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null);
- if (s.splitLogFile(logfile, null)) {
- finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf);
- if (s.outputSink.splits != null) {
- splits.addAll(s.outputSink.splits);
+ if (ArrayUtils.isNotEmpty(wals)) {
+ for (FileStatus wal: wals) {
+ SplitWALResult splitWALResult = splitter.splitWAL(wal, null);
+ if (splitWALResult.isFinished()) {
+ WALSplitUtil.archive(wal.getPath(), splitWALResult.isCorrupt(), archiveDir, walFS, conf);
+ if (splitter.outputSink.splits != null) {
+ splits.addAll(splitter.outputSink.splits);
}
}
}
}
- if (!walFS.delete(logDir, true)) {
- throw new IOException("Unable to delete src dir: " + logDir);
+ if (!walFS.delete(walsDir, true)) {
+ throw new IOException("Unable to delete src dir " + walsDir);
}
return splits;
}
/**
- * WAL splitting implementation, splits one log file.
- * @param logfile should be an actual log file.
+ * Data structure returned as result by #splitWAL(FileStatus, CancelableProgressable).
+ * Test {@link #isFinished()} to see if we are done with the WAL and {@link #isCorrupt()} for if
+ * the WAL is corrupt.
+ */
+ static final class SplitWALResult {
+ private final boolean finished;
+ private final boolean corrupt;
+
+ private SplitWALResult(boolean finished, boolean corrupt) {
+ this.finished = finished;
+ this.corrupt = corrupt;
+ }
+
+ public boolean isFinished() {
+ return finished;
+ }
+
+ public boolean isCorrupt() {
+ return corrupt;
+ }
+ }
+
+ /**
+ * Setup the output sinks and entry buffers ahead of splitting WAL.
+ */
+ private void createOutputSinkAndEntryBuffers() {
+ PipelineController controller = new PipelineController();
+ if (this.hfile) {
+ this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
+ this.outputSink = new BoundedRecoveredHFilesOutputSink(this, controller,
+ this.entryBuffers, this.numWriterThreads);
+ } else if (this.splitWriterCreationBounded) {
+ this.entryBuffers = new BoundedEntryBuffers(controller, this.bufferSize);
+ this.outputSink = new BoundedRecoveredEditsOutputSink(this, controller,
+ this.entryBuffers, this.numWriterThreads);
+ } else {
+ this.entryBuffers = new EntryBuffers(controller, this.bufferSize);
+ this.outputSink = new RecoveredEditsOutputSink(this, controller,
+ this.entryBuffers, this.numWriterThreads);
+ }
+ }
+
+ /**
+ * WAL splitting implementation, splits one WAL file.
+ * @param walStatus should be for an actual WAL file.
*/
@VisibleForTesting
- boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
- Preconditions.checkState(status == null);
- Preconditions.checkArgument(logfile.isFile(),
- "passed in file status is for something other than a regular file.");
- boolean isCorrupted = false;
- boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
- SPLIT_SKIP_ERRORS_DEFAULT);
+ SplitWALResult splitWAL(FileStatus walStatus, CancelableProgressable cancel) throws IOException {
+ Path wal = walStatus.getPath();
+ Preconditions.checkArgument(walStatus.isFile(), "Not a regular file " + wal.toString());
+ boolean corrupt = false;
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
- Path logPath = logfile.getPath();
boolean outputSinkStarted = false;
- boolean progressFailed = false;
+ boolean cancelled = false;
int editsCount = 0;
int editsSkipped = 0;
-
- status = TaskMonitor.get().createStatus(
- "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Splitting " + wal + " to temporary staging area.");
status.enableStatusJournal(true);
- Reader logFileReader = null;
- this.fileBeingSplit = logfile;
+ Reader walReader = null;
+ this.fileBeingSplit = walStatus;
long startTS = EnvironmentEdgeManager.currentTime();
+ long length = walStatus.getLen();
+ String lengthStr = StringUtils.humanSize(length);
+ createOutputSinkAndEntryBuffers();
try {
- long logLength = logfile.getLen();
- LOG.info("Splitting WAL={}, size={} ({} bytes)", logPath, StringUtils.humanSize(logLength),
- logLength);
- status.setStatus("Opening log file " + logPath);
- if (reporter != null && !reporter.progress()) {
- progressFailed = true;
- return false;
+ String logStr = "Splitting " + wal + ", size=" + lengthStr + " (" + length + "bytes)";
+ LOG.info(logStr);
+ status.setStatus(logStr);
+ if (cancel != null && !cancel.progress()) {
+ cancelled = true;
+ return new SplitWALResult(false, corrupt);
}
- logFileReader = getReader(logfile, skipErrors, reporter);
- if (logFileReader == null) {
- LOG.warn("Nothing to split in WAL={}", logPath);
- return true;
+ walReader = getReader(walStatus, this.skipErrors, cancel);
+ if (walReader == null) {
+ LOG.warn("Nothing in {}; empty?", wal);
+ return new SplitWALResult(true, corrupt);
}
- long openCost = EnvironmentEdgeManager.currentTime() - startTS;
- LOG.info("Open WAL={} cost {} ms", logPath, openCost);
+ LOG.info("Open {} took {}ms", wal, EnvironmentEdgeManager.currentTime() - startTS);
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
- outputSink.setReporter(reporter);
+ outputSink.setReporter(cancel);
outputSink.setStatus(status);
outputSink.startWriterThreads();
outputSinkStarted = true;
Entry entry;
- Long lastFlushedSequenceId = -1L;
startTS = EnvironmentEdgeManager.currentTime();
- while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
+ while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
String encodedRegionNameAsStr = Bytes.toString(region);
- lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
+ Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) {
if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
encodedRegionNameAsStr))) {
@@ -301,8 +342,7 @@ public class WALSplitter {
// region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
// will get skipped by the seqId check below.
// See more details at https://issues.apache.org/jira/browse/HBASE-24189
- LOG.info("{} no longer available in the FS. Skipping all edits for this region.",
- encodedRegionNameAsStr);
+ LOG.info("{} no longer in filesystem; skipping all edits.", encodedRegionNameAsStr);
lastFlushedSequenceId = Long.MAX_VALUE;
} else {
if (sequenceIdChecker != null) {
@@ -315,7 +355,7 @@ public class WALSplitter {
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
- LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
+ LOG.debug("Last flushed sequenceid for " + encodedRegionNameAsStr + ": "
+ TextFormat.shortDebugString(ids));
}
}
@@ -344,9 +384,9 @@ public class WALSplitter {
String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
+ " edits, skipped " + editsSkipped + " edits.";
status.setStatus("Split " + countsStr);
- if (reporter != null && !reporter.progress()) {
- progressFailed = true;
- return false;
+ if (cancel != null && !cancel.progress()) {
+ cancelled = true;
+ return new SplitWALResult(false, corrupt);
}
}
}
@@ -355,68 +395,64 @@ public class WALSplitter {
iie.initCause(ie);
throw iie;
} catch (CorruptedLogFileException e) {
- LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
- if (splitLogWorkerCoordination != null) {
+ LOG.warn("Could not parse, corrupt WAL={}", wal, e);
+ // If splitLogWorkerCoordination, then its old-school zk-coordinated splitting so update
+ // zk. Otherwise, it is the newer procedure-based WAL split which has no zk component.
+ if (this.splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null.
- splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
- } else {
- // for tests only
- ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
+ splitLogWorkerCoordination.markCorrupted(walRootDir, wal.getName(), walFS);
}
- isCorrupted = true;
+ corrupt = true;
} catch (IOException e) {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
throw e;
} finally {
- final String log = "Finishing writing output logs and closing down";
+ final String log = "Finishing writing output for " + wal + " so closing down";
LOG.debug(log);
status.setStatus(log);
try {
- if (null != logFileReader) {
- logFileReader.close();
+ if (null != walReader) {
+ walReader.close();
}
} catch (IOException exception) {
- LOG.warn("Could not close WAL reader", exception);
+ LOG.warn("Could not close {} reader", wal, exception);
}
try {
if (outputSinkStarted) {
- // Set progress_failed to true as the immediate following statement will reset its value
- // when close() throws exception, progress_failed has the right value
- progressFailed = true;
- progressFailed = outputSink.close() == null;
+ // Set cancelled to true as the immediate following statement will reset its value.
+ // If close() throws an exception, cancelled will have the right value
+ cancelled = true;
+ cancelled = outputSink.close() == null;
}
} finally {
long processCost = EnvironmentEdgeManager.currentTime() - startTS;
// See if length got updated post lease recovery
String msg = "Processed " + editsCount + " edits across " +
- outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
- " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
- StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
- ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
+ outputSink.getNumberOfRecoveredRegions() + " Regions in " + processCost +
+ " ms; skipped=" + editsSkipped + "; WAL=" + wal + ", size=" + lengthStr +
+ ", length=" + length + ", corrupted=" + corrupt + ", cancelled=" + cancelled;
LOG.info(msg);
status.markComplete(msg);
if (LOG.isDebugEnabled()) {
- LOG.debug("WAL split completed for {} , Journal Log: {}", logPath,
- status.prettyPrintJournal());
+ LOG.debug("Completed split of {}, journal: {}", wal, status.prettyPrintJournal());
}
}
}
- return !progressFailed;
+ return new SplitWALResult(!cancelled, corrupt);
}
- private boolean isRegionDirPresentUnderRoot(TableName tableName, String regionName)
- throws IOException {
- Path regionDirPath = CommonFSUtils.getRegionDir(this.rootDir, tableName, regionName);
- return this.rootFS.exists(regionDirPath);
+ private boolean isRegionDirPresentUnderRoot(TableName tn, String region) throws IOException {
+ return this.rootFS.exists(CommonFSUtils.getRegionDir(this.rootDir, tn, region));
}
/**
* Create a new {@link Reader} for reading logs to split.
+ * @return Returns null if file has length zero or file can't be found.
*/
- private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
+ protected Reader getReader(FileStatus walStatus, boolean skipErrors, CancelableProgressable cancel)
throws IOException, CorruptedLogFileException {
- Path path = file.getPath();
- long length = file.getLen();
+ Path path = walStatus.getPath();
+ long length = walStatus.getLen();
Reader in;
// Check for possibly empty file. With appends, currently Hadoop reports a
@@ -427,9 +463,9 @@ public class WALSplitter {
}
try {
- RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, reporter);
+ RecoverLeaseFSUtils.recoverFileLease(walFS, path, conf, cancel);
try {
- in = getReader(path, reporter);
+ in = getReader(path, cancel);
} catch (EOFException e) {
if (length <= 0) {
// TODO should we ignore an empty, not-last log file if skip.errors
@@ -451,8 +487,8 @@ public class WALSplitter {
if (!skipErrors || e instanceof InterruptedIOException) {
throw e; // Don't mark the file corrupted if interrupted, or not skipErrors
}
- throw new CorruptedLogFileException("skipErrors=true Could not open wal "
- + path + " ignoring", e);
+ throw new CorruptedLogFileException("skipErrors=true; could not open " + path +
+ ", skipping", e);
}
return in;
}
@@ -463,14 +499,14 @@ public class WALSplitter {
return in.next();
} catch (EOFException eof) {
// truncated files are expected if a RS crashes (see HBASE-2643)
- LOG.info("EOF from wal {}. Continuing.", path);
+ LOG.info("EOF from {}; continuing.", path);
return null;
} catch (IOException e) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (e.getCause() != null && (e.getCause() instanceof ParseException
|| e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
- LOG.warn("Parse exception from wal {}. Continuing", path, e);
+ LOG.warn("Parse exception from {}; continuing", path, e);
return null;
}
if (!skipErrors) {
@@ -493,7 +529,7 @@ public class WALSplitter {
* Create a new {@link Reader} for reading logs to split.
* @return new Reader instance, caller should close
*/
- protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
+ private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
return walFactory.createReader(walFS, curLogFile, reporter);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 2b91ba9..24fdbc4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,42 +18,23 @@
package org.apache.hadoop.hbase.master;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_acquired;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_done;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_err;
-import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_task_resigned;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.StartMiniClusterOption;
@@ -65,9 +45,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -77,12 +55,10 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.After;
import org.junit.AfterClass;
@@ -93,7 +69,6 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
@@ -177,83 +152,6 @@ public abstract class AbstractTestDLS {
}
@Test
- public void testRecoveredEdits() throws Exception {
- conf.setLong("hbase.regionserver.hlog.blocksize", 30 * 1024); // create more than one wal
- startCluster(NUM_RS);
-
- int numLogLines = 10000;
- SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
- // turn off load balancing to prevent regions from moving around otherwise
- // they will consume recovered.edits
- master.balanceSwitch(false);
- FileSystem fs = master.getMasterFileSystem().getFileSystem();
-
- List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
-
- Path rootdir = CommonFSUtils.getRootDir(conf);
-
- int numRegions = 50;
- try (Table t = installTable(numRegions)) {
- List<RegionInfo> regions = null;
- HRegionServer hrs = null;
- for (int i = 0; i < NUM_RS; i++) {
- hrs = rsts.get(i).getRegionServer();
- regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
- // At least one RS will have >= to average number of regions.
- if (regions.size() >= numRegions / NUM_RS) {
- break;
- }
- }
- Path logDir = new Path(rootdir,
- AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
-
- LOG.info("#regions = " + regions.size());
- Iterator<RegionInfo> it = regions.iterator();
- while (it.hasNext()) {
- RegionInfo region = it.next();
- if (region.getTable().getNamespaceAsString()
- .equals(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR)) {
- it.remove();
- }
- }
-
- makeWAL(hrs, regions, numLogLines, 100);
-
- slm.splitLogDistributed(logDir);
-
- int count = 0;
- for (RegionInfo hri : regions) {
- @SuppressWarnings("deprecation")
- Path editsdir = WALSplitUtil
- .getRegionDirRecoveredEditsDir(CommonFSUtils.getWALRegionDir(conf,
- tableName, hri.getEncodedName()));
- LOG.debug("Checking edits dir " + editsdir);
- FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
- @Override
- public boolean accept(Path p) {
- if (WALSplitUtil.isSequenceIdFile(p)) {
- return false;
- }
- return true;
- }
- });
- LOG.info("Files {}", Arrays.stream(files).map(f -> f.getPath().toString()).
- collect(Collectors.joining(",")));
- assertTrue("Edits dir should have more than a one file", files.length > 1);
- for (int i = 0; i < files.length; i++) {
- int c = countWAL(files[i].getPath(), fs, conf);
- count += c;
- }
- LOG.info(count + " edits in " + files.length + " recovered edits files.");
- }
-
- // check that the log file is moved
- assertFalse(fs.exists(logDir));
- assertEquals(numLogLines, count);
- }
- }
-
- @Test
public void testMasterStartsUpWithLogSplittingWork() throws Exception {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_RS - 1);
startCluster(NUM_RS);
@@ -303,71 +201,6 @@ public abstract class AbstractTestDLS {
}
}
- /**
- * The original intention of this test was to force an abort of a region server and to make sure
- * that the failure path in the region servers is properly evaluated. But it is difficult to
- * ensure that the region server doesn't finish the log splitting before it aborts. Also now,
- * there is this code path where the master will preempt the region server when master detects
- * that the region server has aborted.
- * @throws Exception
- */
- // Was marked flaky before Distributed Log Replay cleanup.
- @Test
- public void testWorkerAbort() throws Exception {
- LOG.info("testWorkerAbort");
- startCluster(3);
- int numLogLines = 10000;
- SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
- FileSystem fs = master.getMasterFileSystem().getFileSystem();
-
- List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
- HRegionServer hrs = findRSToKill(false);
- Path rootdir = CommonFSUtils.getRootDir(conf);
- final Path logDir = new Path(rootdir,
- AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
-
- try (Table t = installTable(40)) {
- makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100);
-
- new Thread() {
- @Override
- public void run() {
- try {
- waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
- } catch (InterruptedException e) {
- }
- for (RegionServerThread rst : rsts) {
- rst.getRegionServer().abort("testing");
- break;
- }
- }
- }.start();
- FileStatus[] logfiles = fs.listStatus(logDir);
- TaskBatch batch = new TaskBatch();
- slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
- // waitForCounter but for one of the 2 counters
- long curt = System.currentTimeMillis();
- long waitTime = 80000;
- long endt = curt + waitTime;
- while (curt < endt) {
- if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
- tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
- tot_wkr_preempt_task.sum()) == 0) {
- Thread.sleep(100);
- curt = System.currentTimeMillis();
- } else {
- assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
- tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
- tot_wkr_preempt_task.sum()));
- return;
- }
- }
- fail("none of the following counters went up in " + waitTime + " milliseconds - " +
- "tot_wkr_task_resigned, tot_wkr_task_err, " +
- "tot_wkr_final_transition_failed, tot_wkr_task_done, " + "tot_wkr_preempt_task");
- }
- }
-
@Test
public void testThreeRSAbort() throws Exception {
LOG.info("testThreeRSAbort");
@@ -409,75 +242,6 @@ public abstract class AbstractTestDLS {
}
}
- @Test
- public void testDelayedDeleteOnFailure() throws Exception {
- LOG.info("testDelayedDeleteOnFailure");
- startCluster(1);
- final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager();
- final FileSystem fs = master.getMasterFileSystem().getFileSystem();
- final Path logDir =
- new Path(new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME),
- ServerName.valueOf("x", 1, 1).toString());
- fs.mkdirs(logDir);
- ExecutorService executor = null;
- try {
- final Path corruptedLogFile = new Path(logDir, "x");
- FSDataOutputStream out;
- out = fs.create(corruptedLogFile);
- out.write(0);
- out.write(Bytes.toBytes("corrupted bytes"));
- out.close();
- ZKSplitLogManagerCoordination coordination =
- (ZKSplitLogManagerCoordination) (master.getCoordinatedStateManager())
- .getSplitLogManagerCoordination();
- coordination.setIgnoreDeleteForTesting(true);
- executor = Executors.newSingleThreadExecutor();
- Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- // since the logDir is a fake, corrupted one, so the split log worker
- // will finish it quickly with error, and this call will fail and throw
- // an IOException.
- slm.splitLogDistributed(logDir);
- } catch (IOException ioe) {
- try {
- assertTrue(fs.exists(corruptedLogFile));
- // this call will block waiting for the task to be removed from the
- // tasks map which is not going to happen since ignoreZKDeleteForTesting
- // is set to true, until it is interrupted.
- slm.splitLogDistributed(logDir);
- } catch (IOException e) {
- assertTrue(Thread.currentThread().isInterrupted());
- return;
- }
- fail("did not get the expected IOException from the 2nd call");
- }
- fail("did not get the expected IOException from the 1st call");
- }
- };
- Future<?> result = executor.submit(runnable);
- try {
- result.get(2000, TimeUnit.MILLISECONDS);
- } catch (TimeoutException te) {
- // it is ok, expected.
- }
- waitForCounter(tot_mgr_wait_for_zk_delete, 0, 1, 10000);
- executor.shutdownNow();
- executor = null;
-
- // make sure the runnable is finished with no exception thrown.
- result.get();
- } finally {
- if (executor != null) {
- // interrupt the thread in case the test fails in the middle.
- // it has no effect if the thread is already terminated.
- executor.shutdownNow();
- }
- fs.delete(logDir, true);
- }
- }
-
private Table installTable(int nrs) throws Exception {
return installTable(nrs, 0);
}
@@ -504,8 +268,9 @@ public abstract class AbstractTestDLS {
NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
LOG.debug("Verifying only catalog and namespace regions are assigned\n");
if (regions.size() != 2) {
- for (String oregion : regions)
+ for (String oregion : regions) {
LOG.debug("Region still online: " + oregion);
+ }
}
assertEquals(2 + existingRegions, regions.size());
LOG.debug("Enabling table\n");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
index 89c422d..acf85a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +16,9 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
-
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
+import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.mockito.ArgumentMatchers.any;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MasterWalManager;
import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.master.SplitWALManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
@@ -61,9 +62,7 @@ import org.apache.zookeeper.KeeperException;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
@@ -81,6 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrEx
public class MockMasterServices extends MockNoopMasterServices {
private final MasterFileSystem fileSystemManager;
private final MasterWalManager walManager;
+ private final SplitWALManager splitWALManager;
private final AssignmentManager assignmentManager;
private final TableStateManager tableStateManager;
@@ -102,6 +102,10 @@ public class MockMasterServices extends MockNoopMasterServices {
Superusers.initialize(conf);
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
+ this.splitWALManager =
+ conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)?
+ null: new SplitWALManager(this);
+
// Mock an AM.
this.assignmentManager = new AssignmentManager(this, new MockRegionStateStore(this)) {
@Override
@@ -146,8 +150,8 @@ public class MockMasterServices extends MockNoopMasterServices {
// to make our test work.
this.connection =
HConnectionTestingUtility.getMockedConnectionAndDecorate(getConfiguration(),
- Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri, MOCK_MASTER_SERVERNAME,
- RegionInfoBuilder.FIRST_META_REGIONINFO);
+ Mockito.mock(AdminProtos.AdminService.BlockingInterface.class), ri,
+ MOCK_MASTER_SERVERNAME, RegionInfoBuilder.FIRST_META_REGIONINFO);
// Set hbase.rootdir into test dir.
Path rootdir = CommonFSUtils.getRootDir(getConfiguration());
CommonFSUtils.setRootDir(getConfiguration(), rootdir);
@@ -366,4 +370,8 @@ public class MockMasterServices extends MockNoopMasterServices {
}
return builder.build();
}
+
+ @Override public SplitWALManager getSplitWALManager() {
+ return splitWALManager;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java
index fd1e533..63b611d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupMetaWAL.java
@@ -77,7 +77,7 @@ public class TestCleanupMetaWAL {
Path walPath = new Path(fs.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME);
for (FileStatus status : CommonFSUtils.listStatus(fs.getFileSystem(), walPath)) {
if (status.getPath().toString().contains(SPLITTING_EXT)) {
- fail("Should not have splitting wal dir here:" + status);
+ fail("Splitting WAL dir should have been cleaned up: " + status);
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
index 7e93932..f36c5b9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.wal;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -84,7 +83,7 @@ public class TestWALReaderOnSecureWAL {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setBoolean("hbase.hlog.split.skip.errors", true);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
}
@@ -168,21 +167,14 @@ public class TestWALReaderOnSecureWAL {
wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
assertFalse(true);
} catch (IOException ioe) {
- // expected IOE
+ System.out.println("Expected ioe " + ioe.getMessage());
}
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = CommonFSUtils.getRootDir(conf);
- try {
- WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
- s.splitLogFile(listStatus[0], null);
- Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
- "corrupt");
- assertTrue(fs.exists(file));
- // assertFalse("log splitting should have failed", true);
- } catch (IOException ioe) {
- assertTrue("WAL should have been sidelined", false);
- }
+ WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
+ WALSplitter.SplitWALResult swr = s.splitWAL(listStatus[0], null);
+ assertTrue(swr.isCorrupt());
wals.close();
}
@@ -219,7 +211,7 @@ public class TestWALReaderOnSecureWAL {
Path rootdir = CommonFSUtils.getRootDir(conf);
try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
- s.splitLogFile(listStatus[0], null);
+ s.splitWAL(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
assertTrue(!fs.exists(file));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 43cf81f..5f22b45 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
@@ -91,12 +91,10 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@@ -105,18 +103,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
*/
@Category({RegionServerTests.class, LargeTests.class})
public class TestWALSplit {
-
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALSplit.class);
-
- {
- // Uncomment the following lines if more verbosity is needed for
- // debugging (see HBASE-12285 for details).
- //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
- //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
- //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
- }
private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
private static Configuration conf;
@@ -143,7 +132,6 @@ public class TestWALSplit {
private static final byte[] VALUE = Bytes.toBytes("v1");
private static final String WAL_FILE_PREFIX = "wal.dat.";
private static List<String> REGIONS = new ArrayList<>();
- private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
private static String ROBBER;
private static String ZOMBIE;
private static String [] GROUP = new String [] {"supergroup"};
@@ -223,8 +211,6 @@ public class TestWALSplit {
/**
* Simulates splitting a WAL out from under a regionserver that is still trying to write it.
* Ensures we do not lose edits.
- * @throws IOException
- * @throws InterruptedException
*/
@Test
public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
@@ -553,7 +539,7 @@ public class TestWALSplit {
@Test
public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(Integer.MAX_VALUE);
corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
Corruptions.APPEND_GARBAGE, true);
@@ -562,7 +548,7 @@ public class TestWALSplit {
@Test
public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(Integer.MAX_VALUE);
corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
@@ -571,7 +557,7 @@ public class TestWALSplit {
@Test
public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(Integer.MAX_VALUE);
corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
@@ -587,7 +573,7 @@ public class TestWALSplit {
@Test
public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
.asList(FaultyProtobufLogReader.FailureType.values()).stream()
.filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
@@ -654,14 +640,14 @@ public class TestWALSplit {
@Test (expected = IOException.class)
public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
}
@Test
public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
try {
splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
} catch (IOException e) {
@@ -673,7 +659,7 @@ public class TestWALSplit {
private void ignoreCorruption(final Corruptions corruption, final int entryCount,
final int expectedCount) throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
final String REGION = "region__1";
REGIONS.clear();
@@ -698,7 +684,7 @@ public class TestWALSplit {
in.close();
// should not have stored the EOF files as corrupt
- FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
+ FileStatus[] archivedLogs = fs.exists(CORRUPTDIR)? fs.listStatus(CORRUPTDIR): new FileStatus[0];
assertEquals(0, archivedLogs.length);
}
@@ -717,7 +703,7 @@ public class TestWALSplit {
@Test
public void testLogsGetArchivedAfterSplit() throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
generateWALs(-1);
useDifferentDFSClient();
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
@@ -793,7 +779,7 @@ public class TestWALSplit {
@Test
public void testIOEOnOutputThread() throws Exception {
- conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
generateWALs(-1);
useDifferentDFSClient();
@@ -847,7 +833,7 @@ public class TestWALSplit {
t.setDaemon(true);
t.start();
try {
- logSplitter.splitLogFile(logfiles[largestLogFile], null);
+ logSplitter.splitWAL(logfiles[largestLogFile], null);
fail("Didn't throw!");
} catch (IOException ioe) {
assertTrue(ioe.toString().contains("Injected"));
@@ -944,7 +930,7 @@ public class TestWALSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
- null, wals, null);
+ Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@@ -1002,9 +988,7 @@ public class TestWALSplit {
makeRegionDirs(regions);
// Create a splitter that reads and writes the data without touching disk
- WALSplitter logSplitter =
- new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
-
+ WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
/* Produce a mock writer that doesn't write anywhere */
@Override
protected Writer createWriter(Path logfile) throws IOException {
@@ -1039,8 +1023,8 @@ public class TestWALSplit {
/* Produce a mock reader that generates fake entries */
@Override
- protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
- throws IOException {
+ protected Reader getReader(FileStatus file, boolean skipErrors,
+ CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
Reader mockReader = Mockito.mock(Reader.class);
Mockito.doAnswer(new Answer<Entry>() {
int index = 0;
@@ -1064,7 +1048,7 @@ public class TestWALSplit {
}
};
- logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
+ logSplitter.splitWAL(fs.getFileStatus(logPath), null);
// Verify number of written edits per region
Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
@@ -1119,7 +1103,7 @@ public class TestWALSplit {
@Test
public void testSplitLogFileFirstLineCorruptionLog()
throws IOException {
- conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
generateWALs(1, 10, -1);
FileStatus logfile = fs.listStatus(WALDIR)[0];
@@ -1175,7 +1159,7 @@ public class TestWALSplit {
}
};
try{
- logSplitter.splitLogFile(logfiles[0], null);
+ logSplitter.splitWAL(logfiles[0], null);
} catch (IOException e) {
LOG.info(e.toString(), e);
fail("Throws IOException when spliting "
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
index c7b45fe..bb02af3 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaTableLocator.java
@@ -221,8 +221,8 @@ public final class MetaTableLocator {
LOG.warn("Tried to set null ServerName in hbase:meta; skipping -- ServerName required");
return;
}
- LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}", replicaId,
- serverName);
+ LOG.info("Setting hbase:meta (replicaId={}) location in ZooKeeper as {}, state={}", replicaId,
+ serverName, state);
// Make the MetaRegionServer pb and then get its bytes and save this as
// the znode content.
MetaRegionServer pbrsr = MetaRegionServer.newBuilder()
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index f933070..63e2857 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -32,7 +32,10 @@ import org.slf4j.LoggerFactory;
/**
* Common methods and attributes used by SplitLogManager and SplitLogWorker running distributed
* splitting of WAL logs.
+ * @deprecated since 2.4.0 and 3.0.0 replaced by procedure-based WAL splitting; see
+ * SplitWALManager.
*/
+@Deprecated
@InterfaceAudience.Private
public final class ZKSplitLog {
private static final Logger LOG = LoggerFactory.getLogger(ZKSplitLog.class);