You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ti...@apache.org on 2019/05/30 02:50:53 UTC
[hbase] branch branch-2 updated: HBASE-22454 refactor WALSplitter
This is an automated email from the ASF dual-hosted git repository.
tianjy pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 34514cc HBASE-22454 refactor WALSplitter
34514cc is described below
commit 34514ccf0ab29b921044d18d751e8507d6ee23dc
Author: Jingyun Tian <ti...@gmail.com>
AuthorDate: Wed May 29 11:07:52 2019 +0800
HBASE-22454 refactor WALSplitter
---
.../ZKSplitLogManagerCoordination.java | 4 +-
.../master/assignment/AssignmentManagerUtil.java | 4 +-
.../assignment/MergeTableRegionsProcedure.java | 6 +-
.../hbase/master/assignment/RegionStateStore.java | 4 +-
.../assignment/SplitTableRegionProcedure.java | 8 +-
.../master/procedure/DisableTableProcedure.java | 4 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 25 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 15 +-
.../RegionReplicaReplicationEndpoint.java | 4 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 4 +-
.../wal/BoundedLogWriterCreationOutputSink.java | 151 +++
.../org/apache/hadoop/hbase/wal/EntryBuffers.java | 158 +++
.../hbase/wal/LogRecoveredEditsOutputSink.java | 460 +++++++
.../org/apache/hadoop/hbase/wal/OutputSink.java | 252 ++++
.../org/apache/hadoop/hbase/wal/WALSplitUtil.java | 523 +++++++
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 1423 +-------------------
.../hadoop/hbase/master/AbstractTestDLS.java | 6 +-
.../TestDeleteColumnFamilyProcedureFromClient.java | 6 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 14 +-
.../regionserver/TestHRegionReplayEvents.java | 2 +-
.../hbase/regionserver/TestRecoveredEdits.java | 4 +-
.../TestRecoveredEditsReplayAndAbort.java | 4 +-
.../regionserver/wal/AbstractTestWALReplay.java | 14 +-
.../hbase/snapshot/TestRestoreSnapshotHelper.java | 6 +-
.../hadoop/hbase/wal/TestReadWriteSeqIdFiles.java | 16 +-
.../apache/hadoop/hbase/wal/TestWALMethods.java | 27 +-
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 14 +-
27 files changed, 1680 insertions(+), 1478 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
index fac1532..3ff722a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective;
import org.apache.hadoop.hbase.master.SplitLogManager.Task;
import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKMetadata;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
@@ -92,7 +92,7 @@ public class ZKSplitLogManagerCoordination extends ZKListener implements
@Override
public Status finish(ServerName workerName, String logfile) {
try {
- WALSplitter.finishSplitLogFile(logfile, conf);
+ WALSplitUtil.finishSplitLogFile(logfile, conf);
} catch (IOException e) {
LOG.warn("Could not finish splitting of log file " + logfile, e);
return Status.ERR;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
index 4f9343c..d401141 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
@@ -229,7 +229,7 @@ final class AssignmentManagerUtil {
}
static void checkClosedRegion(MasterProcedureEnv env, RegionInfo regionInfo) throws IOException {
- if (WALSplitter.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) {
+ if (WALSplitUtil.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) {
throw new IOException("Recovered.edits are found in Region: " + regionInfo +
", abort split/merge to prevent data loss");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
index 241e8f9..11ae8fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -777,11 +777,11 @@ public class MergeTableRegionsProcedure
long maxSequenceId = -1L;
for (RegionInfo region : regionsToMerge) {
maxSequenceId =
- Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId(
+ Math.max(maxSequenceId, WALSplitUtil.getMaxRegionSequenceId(
walFS, getWALRegionDir(env, region)));
}
if (maxSequenceId > 0) {
- WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
+ WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion),
maxSequenceId);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index ce4bc38..a8491fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -219,7 +219,7 @@ public class RegionStateStore {
private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
FileSystem walFS = master.getMasterWalManager().getFileSystem();
long maxSeqId =
- WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
+ WALSplitUtil.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir(
master.getConfiguration(), region.getTable(), region.getEncodedName()));
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 657f397..346905a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -68,7 +68,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -876,11 +876,11 @@ public class SplitTableRegionProcedure
private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException {
FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem();
long maxSequenceId =
- WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
+ WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion()));
if (maxSequenceId > 0) {
- WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI),
+ WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI),
maxSequenceId);
- WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI),
+ WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI),
maxSequenceId);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
index 9cacc5d..85a8226 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,7 +117,7 @@ public class DisableTableProcedure
for (RegionInfo region : env.getAssignmentManager().getRegionStates()
.getRegionsOfTable(tableName)) {
long maxSequenceId =
- WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
+ WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region));
long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM;
mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum,
EnvironmentEdgeManager.currentTime()));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 24bafab..6e14873 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -174,8 +174,8 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitter;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.TraceScope;
@@ -1009,15 +1009,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// (particularly if no recovered edits, seqid will be -1).
long nextSeqId = maxSeqId + 1;
if (!isRestoredRegion) {
- long maxSeqIdFromFile =
- WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica());
+ long maxSeqIdFromFile = WALSplitUtil.getMaxRegionSequenceId(getWalFileSystem(),
+ getWALRegionDirOfDefaultReplica());
nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1;
// The openSeqNum will always be increase even for read only region, as we rely on it to
// determine whether a region has been successfully reopend, so here we always need to update
// the max sequence id file.
if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName());
- WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1);
+ WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
+ nextSeqId - 1);
}
}
@@ -1174,7 +1175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// checking region folder exists is due to many tests which delete the table folder while a
// table is still online
if (getWalFileSystem().exists(getWALRegionDir())) {
- WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
+ WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(),
mvcc.getReadPoint());
}
}
@@ -4586,13 +4587,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
NavigableSet<Path> filesUnderRootDir = null;
if (!regionDir.equals(defaultRegionDir)) {
filesUnderRootDir =
- WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir);
+ WALSplitUtil.getSplitEditFilesSorted(rootFS, defaultRegionDir);
seqid = Math.max(seqid,
replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter,
defaultRegionDir));
}
- NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir);
+ NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionDir);
seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
files, reporter, regionDir));
@@ -4605,7 +4606,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
- String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName();
+ String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir).getName();
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file: files) {
fakeStoreFiles.add(
@@ -4682,7 +4683,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
if (skipErrors) {
- Path p = WALSplitter.moveAsideBadEditsFile(fs, edits);
+ Path p = WALSplitUtil.moveAsideBadEditsFile(fs, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
@@ -4862,7 +4863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
- Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
+ Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
msg = "EnLongAddered EOF. Most likely due to Master failure during " +
"wal splitting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
@@ -4872,7 +4873,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
- Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits);
+ Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
msg = "File corruption enLongAddered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index adfa84f..64ecbc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -140,7 +140,8 @@ import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -1109,14 +1110,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @throws IOException
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
- final List<WALSplitter.MutationReplay> mutations, long replaySeqId) throws IOException {
+ final List<MutationReplay> mutations, long replaySeqId) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
- for (Iterator<WALSplitter.MutationReplay> it = mutations.iterator(); it.hasNext();) {
- WALSplitter.MutationReplay m = it.next();
+ for (Iterator<MutationReplay> it = mutations.iterator(); it.hasNext();) {
+ MutationReplay m = it.next();
- if (m.type == MutationType.PUT) {
+ if (m.getType() == MutationType.PUT) {
batchContainsPuts = true;
} else {
batchContainsDelete = true;
@@ -1160,7 +1161,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
return region.batchReplay(mutations.toArray(
- new WALSplitter.MutationReplay[mutations.size()]), replaySeqId);
+ new MutationReplay[mutations.size()]), replaySeqId);
} finally {
updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete);
}
@@ -2217,7 +2218,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
entry.getKey().getWriteTime());
}
Pair<WALKey, WALEdit> walEntry = (coprocessorHost == null) ? null : new Pair<>();
- List<WALSplitter.MutationReplay> edits = WALSplitter.getMutationsFromWALEntry(entry,
+ List<MutationReplay> edits = WALSplitUtil.getMutationsFromWALEntry(entry,
cells, walEntry, durability);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index f7721e0..d1498a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.EntryBuffers;
+import org.apache.hadoop.hbase.wal.OutputSink;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index a5c1fce..f749061 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -133,7 +133,7 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@@ -4463,7 +4463,7 @@ public class HBaseFsck extends Configured implements Closeable {
// This is special case if a region is left after split
he.hdfsOnlyEdits = true;
FileStatus[] subDirs = fs.listStatus(regionDir.getPath());
- Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath());
+ Path ePath = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir.getPath());
for (FileStatus subDir : subDirs) {
errors.progress();
String sdName = subDir.getPath().getName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
new file mode 100644
index 0000000..5fa7bef
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ * Bounded means the output streams will be no more than the size of threadpool
+ */
+@InterfaceAudience.Private
+public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class);
+
+ private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
+
+ public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter,
+ WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+ super(walSplitter, controller, entryBuffers, numWriters);
+ }
+
+ @Override
+ public List<Path> finishWritingAndClose() throws IOException {
+ boolean isSuccessful;
+ List<Path> result;
+ try {
+ isSuccessful = finishWriting(false);
+ } finally {
+ result = close();
+ }
+ if (isSuccessful) {
+ splits = result;
+ }
+ return splits;
+ }
+
+ @Override
+ boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
+ List<Path> paths) throws InterruptedException, ExecutionException {
+ for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers
+ .entrySet()) {
+ LOG.info("Submitting writeThenClose of {}",
+ Arrays.toString(buffer.getValue().encodedRegionName));
+ completionService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Path dst = writeThenClose(buffer.getValue());
+ paths.add(dst);
+ return null;
+ }
+ });
+ }
+ boolean progress_failed = false;
+ for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
+ Future<Void> future = completionService.take();
+ future.get();
+ if (!progress_failed && reporter != null && !reporter.progress()) {
+ progress_failed = true;
+ }
+ }
+
+ return progress_failed;
+ }
+
+ /**
+ * since the splitting process may create multiple output files, we need a map
+ * regionRecoverStatMap to track the output count of each region.
+ * @return a map from encoded region ID to the number of edits written out for that region.
+ */
+ @Override
+ public Map<byte[], Long> getOutputCounts() {
+ Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
+ for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) {
+ regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
+ }
+ return regionRecoverStatMapResult;
+ }
+
+ /**
+ * @return the number of recovered regions
+ */
+ @Override
+ public int getNumberOfRecoveredRegions() {
+ return regionRecoverStatMap.size();
+ }
+
+ /**
+ * Append the buffer to a new recovered edits file, then close it after all done
+ * @param buffer contain all entries of a certain region
+ * @throws IOException when closeWriter failed
+ */
+ @Override
+ public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+ writeThenClose(buffer);
+ }
+
+ private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+ WALSplitter.WriterAndPath wap = appendBuffer(buffer, false);
+ if (wap != null) {
+ String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
+ Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
+ if (value != null) {
+ Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
+ regionRecoverStatMap.put(encodedRegionName, newValue);
+ }
+ }
+
+ Path dst = null;
+ List<IOException> thrown = new ArrayList<>();
+ if (wap != null) {
+ dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
+ }
+ if (!thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ return dst;
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
new file mode 100644
index 0000000..f0974be
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class which accumulates edits and separates them into a buffer per region while simultaneously
+ * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
+ * pull region-specific buffers from this class.
+ */
+@InterfaceAudience.Private
+public class EntryBuffers {
+ private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
+
+ PipelineController controller;
+
+ Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+ /*
+ * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
+ * up bytes from a region if we're already writing data for that region in a different IO thread.
+ */
+ Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+
+ long totalBuffered = 0;
+ long maxHeapUsage;
+ boolean splitWriterCreationBounded;
+
+ public EntryBuffers(PipelineController controller, long maxHeapUsage) {
+ this(controller, maxHeapUsage, false);
+ }
+
+ public EntryBuffers(PipelineController controller, long maxHeapUsage,
+ boolean splitWriterCreationBounded) {
+ this.controller = controller;
+ this.maxHeapUsage = maxHeapUsage;
+ this.splitWriterCreationBounded = splitWriterCreationBounded;
+ }
+
+ /**
+ * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
+ * crossed the specified threshold.
+ */
+ public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
+ WALKey key = entry.getKey();
+ RegionEntryBuffer buffer;
+ long incrHeap;
+ synchronized (this) {
+ buffer = buffers.get(key.getEncodedRegionName());
+ if (buffer == null) {
+ buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
+ buffers.put(key.getEncodedRegionName(), buffer);
+ }
+ incrHeap = buffer.appendEntry(entry);
+ }
+
+ // If we crossed the chunk threshold, wait for more space to be available
+ synchronized (controller.dataAvailable) {
+ totalBuffered += incrHeap;
+ while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
+ LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
+ controller.dataAvailable.wait(2000);
+ }
+ controller.dataAvailable.notifyAll();
+ }
+ controller.checkForErrors();
+ }
+
+ /**
+ * @return RegionEntryBuffer a buffer of edits to be written.
+ */
+ synchronized RegionEntryBuffer getChunkToWrite() {
+ // The core part of limiting opening writers is it doesn't return chunk only if the
+ // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
+ // region during splitting. It will flush all the logs in the buffer after splitting
+ // through a threadpool, which means the number of writers it created is under control.
+ if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
+ return null;
+ }
+ long biggestSize = 0;
+ byte[] biggestBufferKey = null;
+
+ for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
+ long size = entry.getValue().heapSize();
+ if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
+ biggestSize = size;
+ biggestBufferKey = entry.getKey();
+ }
+ }
+ if (biggestBufferKey == null) {
+ return null;
+ }
+
+ RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
+ currentlyWriting.add(biggestBufferKey);
+ return buffer;
+ }
+
+ void doneWriting(RegionEntryBuffer buffer) {
+ synchronized (this) {
+ boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
+ assert removed;
+ }
+ long size = buffer.heapSize();
+
+ synchronized (controller.dataAvailable) {
+ totalBuffered -= size;
+ // We may unblock writers
+ controller.dataAvailable.notifyAll();
+ }
+ }
+
+ synchronized boolean isRegionCurrentlyWriting(byte[] region) {
+ return currentlyWriting.contains(region);
+ }
+
+ public void waitUntilDrained() {
+ synchronized (controller.dataAvailable) {
+ while (totalBuffered > 0) {
+ try {
+ controller.dataAvailable.wait(2000);
+ } catch (InterruptedException e) {
+ LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
+ Thread.interrupted();
+ break;
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
new file mode 100644
index 0000000..aa649e4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
+
+/**
+ * Class that manages the output streams from the log splitting process.
+ */
+@InterfaceAudience.Private
+public class LogRecoveredEditsOutputSink extends OutputSink {
+ private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class);
+ private WALSplitter walSplitter;
+ private FileSystem walFS;
+ private Configuration conf;
+
+ public LogRecoveredEditsOutputSink(WALSplitter walSplitter,
+ WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
+ // More threads could potentially write faster at the expense
+ // of causing more disk seeks as the logs are split.
+ // 3. After a certain setting (probably around 3) the
+ // process will be bound on the reader in the current
+ // implementation anyway.
+ super(controller, entryBuffers, numWriters);
+ this.walSplitter = walSplitter;
+ this.walFS = walSplitter.walFS;
+ this.conf = walSplitter.conf;
+ }
+
+ /**
+ * @return null if failed to report progress
+ */
+ @Override
+ public List<Path> finishWritingAndClose() throws IOException {
+ boolean isSuccessful = false;
+ List<Path> result = null;
+ try {
+ isSuccessful = finishWriting(false);
+ } finally {
+ result = close();
+ List<IOException> thrown = closeLogWriters(null);
+ if (CollectionUtils.isNotEmpty(thrown)) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ }
+ if (isSuccessful) {
+ splits = result;
+ }
+ return splits;
+ }
+
+ // delete the one with fewer wal entries
+ private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst)
+ throws IOException {
+ long dstMinLogSeqNum = -1L;
+ try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+ WAL.Entry entry = reader.next();
+ if (entry != null) {
+ dstMinLogSeqNum = entry.getKey().getSequenceId();
+ }
+ } catch (EOFException e) {
+ LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
+ e);
+ }
+ if (wap.minLogSeqNum < dstMinLogSeqNum) {
+ LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ + walFS.getFileStatus(dst).getLen());
+ if (!walFS.delete(dst, false)) {
+ LOG.warn("Failed deleting of old {}", dst);
+ throw new IOException("Failed deleting of old " + dst);
+ }
+ } else {
+ LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path
+ + ", length=" + walFS.getFileStatus(wap.path).getLen());
+ if (!walFS.delete(wap.path, false)) {
+ LOG.warn("Failed deleting of {}", wap.path);
+ throw new IOException("Failed deleting of " + wap.path);
+ }
+ }
+ }
+
+ /**
+ * Close all of the output streams.
+ * @return the list of paths written.
+ */
+ List<Path> close() throws IOException {
+ Preconditions.checkState(!closeAndCleanCompleted);
+
+ final List<Path> paths = new ArrayList<>();
+ final List<IOException> thrown = Lists.newArrayList();
+ ThreadPoolExecutor closeThreadPool =
+ Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
+ private int count = 1;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "split-log-closeStream-" + count++);
+ return t;
+ }
+ });
+ CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
+ boolean progress_failed;
+ try {
+ progress_failed = executeCloseTask(completionService, thrown, paths);
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ } finally {
+ closeThreadPool.shutdownNow();
+ }
+ if (!thrown.isEmpty()) {
+ throw MultipleIOException.createIOException(thrown);
+ }
+ writersClosed = true;
+ closeAndCleanCompleted = true;
+ if (progress_failed) {
+ return null;
+ }
+ return paths;
+ }
+
+ /**
+ * @param completionService threadPool to execute the closing tasks
+ * @param thrown store the exceptions
+ * @param paths arrayList to store the paths written
+ * @return if close tasks executed successful
+ */
+ boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
+ List<Path> paths) throws InterruptedException, ExecutionException {
+ for (final Map.Entry<String, WALSplitter.SinkWriter> writersEntry : writers.entrySet()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path);
+ }
+ completionService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue();
+ Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
+ paths.add(dst);
+ return null;
+ }
+ });
+ }
+ boolean progress_failed = false;
+ for (int i = 0, n = this.writers.size(); i < n; i++) {
+ Future<Void> future = completionService.take();
+ future.get();
+ if (!progress_failed && reporter != null && !reporter.progress()) {
+ progress_failed = true;
+ }
+ }
+ return progress_failed;
+ }
+
+ Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap,
+ List<IOException> thrown) throws IOException {
+ LOG.trace("Closing {}", wap.path);
+ try {
+ wap.writer.close();
+ } catch (IOException ioe) {
+ LOG.error("Could not close log at {}", wap.path, ioe);
+ thrown.add(ioe);
+ return null;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped "
+ + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms");
+ }
+ if (wap.editsWritten == 0) {
+ // just remove the empty recovered.edits file
+ if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) {
+ LOG.warn("Failed deleting empty {}", wap.path);
+ throw new IOException("Failed deleting empty " + wap.path);
+ }
+ return null;
+ }
+
+ Path dst = getCompletedRecoveredEditsFilePath(wap.path,
+ regionMaximumEditLogSeqNum.get(encodedRegionName));
+ try {
+ if (!dst.equals(wap.path) && walFS.exists(dst)) {
+ deleteOneWithFewerEntries(wap, dst);
+ }
+ // Skip the unit tests which create a splitter that reads and
+ // writes the data without touching disk.
+ // TestHLogSplit#testThreading is an example.
+ if (walFS.exists(wap.path)) {
+ if (!walFS.rename(wap.path, dst)) {
+ throw new IOException("Failed renaming " + wap.path + " to " + dst);
+ }
+ LOG.info("Rename {} to {}", wap.path, dst);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Could not rename {} to {}", wap.path, dst, ioe);
+ thrown.add(ioe);
+ return null;
+ }
+ return dst;
+ }
+
+ private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
+ if (writersClosed) {
+ return thrown;
+ }
+ if (thrown == null) {
+ thrown = Lists.newArrayList();
+ }
+ try {
+ for (WriterThread writerThread : writerThreads) {
+ while (writerThread.isAlive()) {
+ writerThread.setShouldStop(true);
+ writerThread.interrupt();
+ try {
+ writerThread.join(10);
+ } catch (InterruptedException e) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(e);
+ throw iie;
+ }
+ }
+ }
+ } finally {
+ WALSplitter.WriterAndPath wap = null;
+ for (WALSplitter.SinkWriter tmpWAP : writers.values()) {
+ try {
+ wap = (WALSplitter.WriterAndPath) tmpWAP;
+ wap.writer.close();
+ } catch (IOException ioe) {
+ LOG.error("Couldn't close log at {}", wap.path, ioe);
+ thrown.add(ioe);
+ continue;
+ }
+ LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in "
+ + (wap.nanosSpent / 1000 / 1000) + "ms)");
+ }
+ writersClosed = true;
+ }
+
+ return thrown;
+ }
+
+ /**
+ * Get a writer and path for a log starting at the given entry. This function is threadsafe so
+ * long as multiple threads are always acting on different regions.
+ * @return null if this region shouldn't output any logs
+ */
+ WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
+ byte[] region = entry.getKey().getEncodedRegionName();
+ String regionName = Bytes.toString(region);
+ WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName);
+ if (ret != null) {
+ return ret;
+ }
+ // If we already decided that this region doesn't get any output
+ // we don't need to check again.
+ if (blacklistedRegions.contains(region)) {
+ return null;
+ }
+ ret = createWAP(region, entry);
+ if (ret == null) {
+ blacklistedRegions.add(region);
+ return null;
+ }
+ if (reusable) {
+ writers.put(regionName, ret);
+ }
+ return ret;
+ }
+
+ /**
+ * @return a path with a write for that path. caller should close.
+ */
+ WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
+ String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+ HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+ Path regionedits = getRegionSplitEditsPath(entry,
+ walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf);
+ if (regionedits == null) {
+ return null;
+ }
+ FileSystem walFs = FSUtils.getWALFileSystem(conf);
+ if (walFs.exists(regionedits)) {
+ LOG.warn("Found old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ + walFs.getFileStatus(regionedits).getLen());
+ if (!walFs.delete(regionedits, false)) {
+ LOG.warn("Failed delete of old {}", regionedits);
+ }
+ }
+ WALProvider.Writer w = walSplitter.createWriter(regionedits);
+ LOG.debug("Creating writer path={}", regionedits);
+ return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
+ }
+
+
+
+ void filterCellByStore(WAL.Entry logEntry) {
+ Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
+ .get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+ if (MapUtils.isEmpty(maxSeqIdInStores)) {
+ return;
+ }
+ // Create the array list for the cells that aren't filtered.
+ // We make the assumption that most cells will be kept.
+ ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
+ for (Cell cell : logEntry.getEdit().getCells()) {
+ if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ keptCells.add(cell);
+ } else {
+ byte[] family = CellUtil.cloneFamily(cell);
+ Long maxSeqId = maxSeqIdInStores.get(family);
+ // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
+ // or the master was crashed before and we can not get the information.
+ if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
+ keptCells.add(cell);
+ }
+ }
+ }
+
+ // Anything in the keptCells array list is still live.
+ // So rather than removing the cells from the array list
+ // which would be an O(n^2) operation, we just replace the list
+ logEntry.getEdit().setCells(keptCells);
+ }
+
+ @Override
+ public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+ appendBuffer(buffer, true);
+ }
+
+ WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable)
+ throws IOException {
+ List<WAL.Entry> entries = buffer.entryBuffer;
+ if (entries.isEmpty()) {
+ LOG.warn("got an empty buffer, skipping");
+ return null;
+ }
+
+ WALSplitter.WriterAndPath wap = null;
+
+ long startTime = System.nanoTime();
+ try {
+ int editsCount = 0;
+
+ for (WAL.Entry logEntry : entries) {
+ if (wap == null) {
+ wap = getWriterAndPath(logEntry, reusable);
+ if (wap == null) {
+ // This log spews the full edit. Can be massive in the log. Enable only debugging
+ // WAL lost edit issues.
+ LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
+ return null;
+ }
+ }
+ filterCellByStore(logEntry);
+ if (!logEntry.getEdit().isEmpty()) {
+ wap.writer.append(logEntry);
+ this.updateRegionMaximumEditLogSeqNum(logEntry);
+ editsCount++;
+ } else {
+ wap.incrementSkippedEdits(1);
+ }
+ }
+ // Pass along summary statistics
+ wap.incrementEdits(editsCount);
+ wap.incrementNanoTime(System.nanoTime() - startTime);
+ } catch (IOException e) {
+ e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
+ LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
+ throw e;
+ }
+ return wap;
+ }
+
+ @Override
+ public boolean keepRegionEvent(WAL.Entry entry) {
+ ArrayList<Cell> cells = entry.getEdit().getCells();
+ for (Cell cell : cells) {
+ if (WALEdit.isCompactionMarker(cell)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return a map from encoded region ID to the number of edits written out for that region.
+ */
+ @Override
+ public Map<byte[], Long> getOutputCounts() {
+ TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<String, WALSplitter.SinkWriter> entry : writers.entrySet()) {
+ ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
+ }
+ return ret;
+ }
+
+ @Override
+ public int getNumberOfRecoveredRegions() {
+ return writers.size();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
new file mode 100644
index 0000000..729ea8b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+/**
+ * The following class is an abstraction class to provide a common interface to support different
+ * ways of consuming recovered edits.
+ */
+@InterfaceAudience.Private
+public abstract class OutputSink {
+ private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
+
+ protected WALSplitter.PipelineController controller;
+ protected EntryBuffers entryBuffers;
+
+ protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>();
+ protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
+ new ConcurrentHashMap<>();
+
+ protected final List<WriterThread> writerThreads = Lists.newArrayList();
+
+ /* Set of regions which we've decided should not output edits */
+ protected final Set<byte[]> blacklistedRegions =
+ Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
+
+ protected boolean closeAndCleanCompleted = false;
+
+ protected boolean writersClosed = false;
+
+ protected final int numThreads;
+
+ protected CancelableProgressable reporter = null;
+
+ protected AtomicLong skippedEdits = new AtomicLong();
+
+ protected List<Path> splits = null;
+
+ public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
+ int numWriters) {
+ numThreads = numWriters;
+ this.controller = controller;
+ this.entryBuffers = entryBuffers;
+ }
+
+ void setReporter(CancelableProgressable reporter) {
+ this.reporter = reporter;
+ }
+
+ /**
+ * Start the threads that will pump data from the entryBuffers to the output files.
+ */
+ public synchronized void startWriterThreads() {
+ for (int i = 0; i < numThreads; i++) {
+ WriterThread t = new WriterThread(controller, entryBuffers, this, i);
+ t.start();
+ writerThreads.add(t);
+ }
+ }
+
+ /**
+ * Update region's maximum edit log SeqNum.
+ */
+ void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
+ synchronized (regionMaximumEditLogSeqNum) {
+ String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
+ Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
+ if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
+ regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
+ }
+ }
+ }
+
+ /**
+ * @return the number of currently opened writers
+ */
+ int getNumOpenWriters() {
+ return this.writers.size();
+ }
+
+ long getSkippedEdits() {
+ return this.skippedEdits.get();
+ }
+
+ /**
+ * Wait for writer threads to dump all info to the sink
+ * @return true when there is no error
+ */
+ protected boolean finishWriting(boolean interrupt) throws IOException {
+ LOG.debug("Waiting for split writer threads to finish");
+ boolean progress_failed = false;
+ for (WriterThread t : writerThreads) {
+ t.finish();
+ }
+ if (interrupt) {
+ for (WriterThread t : writerThreads) {
+ t.interrupt(); // interrupt the writer threads. We are stopping now.
+ }
+ }
+
+ for (WriterThread t : writerThreads) {
+ if (!progress_failed && reporter != null && !reporter.progress()) {
+ progress_failed = true;
+ }
+ try {
+ t.join();
+ } catch (InterruptedException ie) {
+ IOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ throw iie;
+ }
+ }
+ controller.checkForErrors();
+ LOG.info("{} split writers finished; closing.", this.writerThreads.size());
+ return (!progress_failed);
+ }
+
+ public abstract List<Path> finishWritingAndClose() throws IOException;
+
+ /**
+ * @return a map from encoded region ID to the number of edits written out for that region.
+ */
+ public abstract Map<byte[], Long> getOutputCounts();
+
+ /**
+ * @return number of regions we've recovered
+ */
+ public abstract int getNumberOfRecoveredRegions();
+
+ /**
+ * @param buffer A WAL Edit Entry
+ */
+ public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException;
+
+ /**
+ * WriterThread call this function to help flush internal remaining edits in buffer before close
+ * @return true when underlying sink has something to flush
+ */
+ public boolean flush() throws IOException {
+ return false;
+ }
+
+ /**
+ * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
+ * want to get all of those edits.
+ * @return Return true if this sink wants to accept this region-level WALEdit.
+ */
+ public abstract boolean keepRegionEvent(WAL.Entry entry);
+
+ public static class WriterThread extends Thread {
+ private volatile boolean shouldStop = false;
+ private WALSplitter.PipelineController controller;
+ private EntryBuffers entryBuffers;
+ private OutputSink outputSink = null;
+
+ WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
+ OutputSink sink, int i) {
+ super(Thread.currentThread().getName() + "-Writer-" + i);
+ this.controller = controller;
+ this.entryBuffers = entryBuffers;
+ outputSink = sink;
+ }
+
+ @Override
+ public void run() {
+ try {
+ doRun();
+ } catch (Throwable t) {
+ LOG.error("Exiting thread", t);
+ controller.writerThreadError(t);
+ }
+ }
+
+ private void doRun() throws IOException {
+ LOG.trace("Writer thread starting");
+ while (true) {
+ WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
+ if (buffer == null) {
+ // No data currently available, wait on some more to show up
+ synchronized (controller.dataAvailable) {
+ if (shouldStop && !this.outputSink.flush()) {
+ return;
+ }
+ try {
+ controller.dataAvailable.wait(500);
+ } catch (InterruptedException ie) {
+ if (!shouldStop) {
+ throw new RuntimeException(ie);
+ }
+ }
+ }
+ continue;
+ }
+
+ assert buffer != null;
+ try {
+ writeBuffer(buffer);
+ } finally {
+ entryBuffers.doneWriting(buffer);
+ }
+ }
+ }
+
+ private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
+ outputSink.append(buffer);
+ }
+
+ void setShouldStop(boolean shouldStop) {
+ this.shouldStop = shouldStop;
+ }
+
+ void finish() {
+ synchronized (controller.dataAvailable) {
+ shouldStop = true;
+ controller.dataAvailable.notifyAll();
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
new file mode 100644
index 0000000..d518f2e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java
@@ -0,0 +1,523 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+
+/**
+ * This class provides static methods to support WAL splitting related works
+ */
+@InterfaceAudience.Private
+public final class WALSplitUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class);
+
+ private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
+ private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
+ private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
+ private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
+ private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
+
+ private WALSplitUtil() {
+ }
+
+ /**
+ * Completes the work done by splitLogFile by archiving logs
+ * <p>
+ * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed
+ * the splitLogFile() part. If the master crashes then this function might get called multiple
+ * times.
+ * <p>
+ * @param logfile
+ * @param conf
+ * @throws IOException
+ */
+ public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException {
+ Path walDir = FSUtils.getWALRootDir(conf);
+ Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path walPath;
+ if (FSUtils.isStartingWithPath(walDir, logfile)) {
+ walPath = new Path(logfile);
+ } else {
+ walPath = new Path(walDir, logfile);
+ }
+ finishSplitLogFile(walDir, oldLogDir, walPath, conf);
+ }
+
+ static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath,
+ Configuration conf) throws IOException {
+ List<Path> processedLogs = new ArrayList<>();
+ List<Path> corruptedLogs = new ArrayList<>();
+ FileSystem walFS = walDir.getFileSystem(conf);
+ if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) {
+ corruptedLogs.add(walPath);
+ } else {
+ processedLogs.add(walPath);
+ }
+ archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf);
+ Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName());
+ walFS.delete(stagingDir, true);
+ }
+
+ /**
+ * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log
+ * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation
+ */
+ private static void archiveWALs(final List<Path> corruptedWALs, final List<Path> processedWALs,
+ final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException {
+ final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
+ if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
+ LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
+ corruptDir);
+ }
+ if (!walFS.mkdirs(corruptDir)) {
+ LOG.info("Unable to mkdir {}", corruptDir);
+ }
+ walFS.mkdirs(oldWALDir);
+
+ // this method can get restarted or called multiple times for archiving
+ // the same log files.
+ for (Path corruptedWAL : corruptedWALs) {
+ Path p = new Path(corruptDir, corruptedWAL.getName());
+ if (walFS.exists(corruptedWAL)) {
+ if (!walFS.rename(corruptedWAL, p)) {
+ LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p);
+ } else {
+ LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p);
+ }
+ }
+ }
+
+ for (Path p : processedWALs) {
+ Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p);
+ if (walFS.exists(p)) {
+ if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
+ LOG.warn("Unable to move {} to {}", p, newPath);
+ } else {
+ LOG.info("Archived processed log {} to {}", p, newPath);
+ }
+ }
+ }
+ }
+
+ /**
+ * Path to a file under RECOVERED_EDITS_DIR directory of the region found in <code>logEntry</code>
+ * named for the sequenceid in the passed <code>logEntry</code>: e.g.
+ * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
+ * RECOVERED_EDITS_DIR under the region creating it if necessary.
+ * @param walEntry walEntry to recover
+ * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
+ * @param tmpDirName of the directory used to sideline old recovered edits file
+ * @param conf configuration
+ * @return Path to file into which to dump split log edits.
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ @VisibleForTesting
+ static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit,
+ String tmpDirName, Configuration conf) throws IOException {
+ FileSystem walFS = FSUtils.getWALFileSystem(conf);
+ Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName());
+ String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName());
+ Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
+ Path dir = getRegionDirRecoveredEditsDir(regionDir);
+
+ if (walFS.exists(dir) && walFS.isFile(dir)) {
+ Path tmp = new Path(tmpDirName);
+ if (!walFS.exists(tmp)) {
+ walFS.mkdirs(tmp);
+ }
+ tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
+ LOG.warn("Found existing old file: {}. It could be some "
+ + "leftover of an old installation. It should be a folder instead. "
+ + "So moving it to {}",
+ dir, tmp);
+ if (!walFS.rename(dir, tmp)) {
+ LOG.warn("Failed to sideline old file {}", dir);
+ }
+ }
+
+ if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
+ LOG.warn("mkdir failed on {}", dir);
+ }
+ // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
+ // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
+ // region's replayRecoveredEdits will not delete it
+ String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId());
+ fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
+ return new Path(dir, fileName);
+ }
+
+ private static String getTmpRecoveredEditsFileName(String fileName) {
+ return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
+ }
+
+ /**
+ * Get the completed recovered edits file path, renaming it to be by last edit in the file from
+ * its first edit. Then we could use the name to skip recovered edits when doing
+ * {@link HRegion#replayRecoveredEditsIfAny}.
+ * @return dstPath take file's last edit log seq num as the name
+ */
+ static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) {
+ String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum);
+ return new Path(srcPath.getParent(), fileName);
+ }
+
+ @VisibleForTesting
+ static String formatRecoveredEditsFileName(final long seqid) {
+ return String.format("%019d", seqid);
+ }
+
+ /**
+ * @param regionDir This regions directory in the filesystem.
+ * @return The directory that holds recovered edits files for the region <code>regionDir</code>
+ */
+ public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
+ return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
+ }
+
+ /**
+ * Check whether there is recovered.edits in the region dir
+ * @param conf conf
+ * @param regionInfo the region to check
+ * @throws IOException IOException
+ * @return true if recovered.edits exist in the region dir
+ */
+ public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo)
+ throws IOException {
+ // No recovered.edits for non default replica regions
+ if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
+ return false;
+ }
+ // Only default replica region can reach here, so we can use regioninfo
+ // directly without converting it to default replica's regioninfo.
+ Path regionDir =
+ FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName());
+ NavigableSet<Path> files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir);
+ return files != null && !files.isEmpty();
+ }
+
+ /**
+ * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix.
+ * @param walFS WAL FileSystem used to retrieving split edits files.
+ * @param regionDir WAL region dir to look for recovered edits files under.
+ * @return Files in passed <code>regionDir</code> as a sorted set.
+ * @throws IOException
+ */
+ public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
+ final Path regionDir) throws IOException {
+ NavigableSet<Path> filesSorted = new TreeSet<>();
+ Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
+ if (!walFS.exists(editsdir)) {
+ return filesSorted;
+ }
+ FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ boolean result = false;
+ try {
+ // Return files and only files that match the editfile names pattern.
+ // There can be other files in this directory other than edit files.
+ // In particular, on error, we'll move aside the bad edit file giving
+ // it a timestamp suffix. See moveAsideBadEditsFile.
+ Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
+ result = walFS.isFile(p) && m.matches();
+ // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
+ // because it means splitwal thread is writting this file.
+ if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
+ result = false;
+ }
+ // Skip SeqId Files
+ if (isSequenceIdFile(p)) {
+ result = false;
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed isFile check on {}", p, e);
+ }
+ return result;
+ }
+ });
+ if (ArrayUtils.isNotEmpty(files)) {
+ Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
+ }
+ return filesSorted;
+ }
+
+ /**
+ * Move aside a bad edits file.
+ * @param walFS WAL FileSystem used to rename bad edits file.
+ * @param edits Edits file to move aside.
+ * @return The name of the moved aside file.
+ * @throws IOException
+ */
+ public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
+ throws IOException {
+ Path moveAsideName =
+ new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
+ if (!walFS.rename(edits, moveAsideName)) {
+ LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
+ }
+ return moveAsideName;
+ }
+
+ /**
+ * Is the given file a region open sequence id file.
+ */
+ @VisibleForTesting
+ public static boolean isSequenceIdFile(final Path file) {
+ return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
+ || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
+ }
+
+ private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
+ throws IOException {
+ // TODO: Why are we using a method in here as part of our normal region open where
+ // there is no splitting involved? Fix. St.Ack 01/20/2017.
+ Path editsDir = getRegionDirRecoveredEditsDir(regionDir);
+ try {
+ FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile);
+ return files != null ? files : new FileStatus[0];
+ } catch (FileNotFoundException e) {
+ return new FileStatus[0];
+ }
+ }
+
+ private static long getMaxSequenceId(FileStatus[] files) {
+ long maxSeqId = -1L;
+ for (FileStatus file : files) {
+ String fileName = file.getPath().getName();
+ try {
+ maxSeqId = Math.max(maxSeqId, Long
+ .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
+ } catch (NumberFormatException ex) {
+ LOG.warn("Invalid SeqId File Name={}", fileName);
+ }
+ }
+ return maxSeqId;
+ }
+
+ /**
+ * Get the max sequence id which is stored in the region directory. -1 if none.
+ */
+ public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
+ return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
+ }
+
+ /**
+ * Create a file with name as region's max sequence id
+ */
+ public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
+ throws IOException {
+ FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
+ long maxSeqId = getMaxSequenceId(files);
+ if (maxSeqId > newMaxSeqId) {
+ throw new IOException("The new max sequence id " + newMaxSeqId
+ + " is less than the old max sequence id " + maxSeqId);
+ }
+ // write a new seqId file
+ Path newSeqIdFile =
+ new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
+ if (newMaxSeqId != maxSeqId) {
+ try {
+ if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
+ throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
+ }
+ LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
+ maxSeqId);
+ } catch (FileAlreadyExistsException ignored) {
+ // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
+ }
+ }
+ // remove old ones
+ for (FileStatus status : files) {
+ if (!newSeqIdFile.equals(status.getPath())) {
+ walFS.delete(status.getPath(), false);
+ }
+ }
+ }
+
+ /** A struct used by getMutationsFromWALEntry */
+ public static class MutationReplay implements Comparable<MutationReplay> {
+ public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation,
+ long nonceGroup, long nonce) {
+ this.type = type;
+ this.mutation = mutation;
+ if (this.mutation.getDurability() != Durability.SKIP_WAL) {
+ // using ASYNC_WAL for relay
+ this.mutation.setDurability(Durability.ASYNC_WAL);
+ }
+ this.nonceGroup = nonceGroup;
+ this.nonce = nonce;
+ }
+
+ private final ClientProtos.MutationProto.MutationType type;
+ public final Mutation mutation;
+ public final long nonceGroup;
+ public final long nonce;
+
+ @Override
+ public int compareTo(final MutationReplay d) {
+ return this.mutation.compareTo(d.mutation);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof MutationReplay)) {
+ return false;
+ } else {
+ return this.compareTo((MutationReplay) obj) == 0;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return this.mutation.hashCode();
+ }
+
+ public ClientProtos.MutationProto.MutationType getType() {
+ return type;
+ }
+ }
+
+ /**
+ * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey &
+ * WALEdit from the passed in WALEntry
+ * @param entry
+ * @param cells
+ * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
+ * extracted from the passed in WALEntry.
+ * @return list of Pair<MutationType, Mutation> to be replayed
+ * @throws IOException
+ */
+ public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
+ CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
+ if (entry == null) {
+ // return an empty array
+ return Collections.emptyList();
+ }
+
+ long replaySeqId =
+ (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber()
+ : entry.getKey().getLogSequenceNumber();
+ int count = entry.getAssociatedCellCount();
+ List<MutationReplay> mutations = new ArrayList<>();
+ Cell previousCell = null;
+ Mutation m = null;
+ WALKeyImpl key = null;
+ WALEdit val = null;
+ if (logEntry != null) {
+ val = new WALEdit();
+ }
+
+ for (int i = 0; i < count; i++) {
+ // Throw index out of bounds if our cell count is off
+ if (!cells.advance()) {
+ throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
+ }
+ Cell cell = cells.current();
+ if (val != null) val.add(cell);
+
+ boolean isNewRowOrType =
+ previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
+ || !CellUtil.matchingRows(previousCell, cell);
+ if (isNewRowOrType) {
+ // Create new mutation
+ if (CellUtil.isDelete(cell)) {
+ m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+ // Deletes don't have nonces.
+ mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m,
+ HConstants.NO_NONCE, HConstants.NO_NONCE));
+ } else {
+ m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+ // Puts might come from increment or append, thus we need nonces.
+ long nonceGroup =
+ entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
+ long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
+ mutations.add(
+ new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce));
+ }
+ }
+ if (CellUtil.isDelete(cell)) {
+ ((Delete) m).add(cell);
+ } else {
+ ((Put) m).add(cell);
+ }
+ m.setDurability(durability);
+ previousCell = cell;
+ }
+
+ // reconstruct WALKey
+ if (logEntry != null) {
+ org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
+ entry.getKey();
+ List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
+ for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
+ clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
+ }
+ key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(),
+ TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId,
+ walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(),
+ walKeyProto.getNonce(), null);
+ logEntry.setFirst(key);
+ logEntry.setSecond(val);
+ }
+
+ return mutations;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index c436db2..300fbf6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -17,74 +17,43 @@
*/
package org.apache.hadoop.hbase.wal;
+import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
+
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.text.ParseException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -92,16 +61,10 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
-import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/**
* This class is responsible for splitting up a bunch of regionserver commit log
* files that are no longer being written to, into new files, one per region, for
@@ -148,12 +111,11 @@ public class WALSplitter {
@VisibleForTesting
- WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
- FileSystem walFS, LastSequenceId idChecker,
- SplitLogWorkerCoordination splitLogWorkerCoordination) {
+ WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
+ LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf);
- String codecClassName = conf
- .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
+ String codecClassName =
+ conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
this.walFS = walFS;
@@ -170,14 +132,27 @@ public class WALSplitter {
splitWriterCreationBounded);
int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
- if(splitWriterCreationBounded){
- outputSink = new BoundedLogWriterCreationOutputSink(
- controller, entryBuffers, numWriterThreads);
- }else {
- outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
+ if (splitWriterCreationBounded) {
+ outputSink =
+ new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads);
+ } else {
+ outputSink =
+ new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
}
}
+ WALFactory getWalFactory(){
+ return this.walFactory;
+ }
+
+ FileStatus getFileBeingSplit() {
+ return fileBeingSplit;
+ }
+
+ Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
+ return regionMaxSeqIdInStores;
+ }
+
/**
* Splits a WAL file into region's recovered-edits directory.
* This is the main entry point for distributed log splitting from SplitLogWorker.
@@ -361,357 +336,7 @@ public class WALSplitter {
}
/**
- * Completes the work done by splitLogFile by archiving logs
- * <p>
- * It is invoked by SplitLogManager once it knows that one of the
- * SplitLogWorkers have completed the splitLogFile() part. If the master
- * crashes then this function might get called multiple times.
- * <p>
- * @param logfile
- * @param conf
- * @throws IOException
- */
- public static void finishSplitLogFile(String logfile,
- Configuration conf) throws IOException {
- Path walDir = FSUtils.getWALRootDir(conf);
- Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME);
- Path logPath;
- if (FSUtils.isStartingWithPath(walDir, logfile)) {
- logPath = new Path(logfile);
- } else {
- logPath = new Path(walDir, logfile);
- }
- finishSplitLogFile(walDir, oldLogDir, logPath, conf);
- }
-
- private static void finishSplitLogFile(Path walDir, Path oldLogDir,
- Path logPath, Configuration conf) throws IOException {
- List<Path> processedLogs = new ArrayList<>();
- List<Path> corruptedLogs = new ArrayList<>();
- FileSystem walFS = walDir.getFileSystem(conf);
- if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) {
- corruptedLogs.add(logPath);
- } else {
- processedLogs.add(logPath);
- }
- archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf);
- Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName());
- walFS.delete(stagingDir, true);
- }
-
- /**
- * Moves processed logs to a oldLogDir after successful processing Moves
- * corrupted logs (any log that couldn't be successfully parsed to corruptDir
- * (.corrupt) for later investigation
- *
- * @param corruptedLogs
- * @param processedLogs
- * @param oldLogDir
- * @param walFS WAL FileSystem to archive files on.
- * @param conf
- * @throws IOException
- */
- private static void archiveLogs(
- final List<Path> corruptedLogs,
- final List<Path> processedLogs, final Path oldLogDir,
- final FileSystem walFS, final Configuration conf) throws IOException {
- final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
- if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
- LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
- corruptDir);
- }
- if (!walFS.mkdirs(corruptDir)) {
- LOG.info("Unable to mkdir {}", corruptDir);
- }
- walFS.mkdirs(oldLogDir);
-
- // this method can get restarted or called multiple times for archiving
- // the same log files.
- for (Path corrupted : corruptedLogs) {
- Path p = new Path(corruptDir, corrupted.getName());
- if (walFS.exists(corrupted)) {
- if (!walFS.rename(corrupted, p)) {
- LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
- } else {
- LOG.warn("Moved corrupted log {} to {}", corrupted, p);
- }
- }
- }
-
- for (Path p : processedLogs) {
- Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
- if (walFS.exists(p)) {
- if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) {
- LOG.warn("Unable to move {} to {}", p, newPath);
- } else {
- LOG.info("Archived processed log {} to {}", p, newPath);
- }
- }
- }
- }
-
- /**
- * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
- * <code>logEntry</code> named for the sequenceid in the passed
- * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
- * This method also ensures existence of RECOVERED_EDITS_DIR under the region
- * creating it if necessary.
- * @param logEntry
- * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
- * @param tmpDirName of the directory used to sideline old recovered edits file
- * @param conf
- * @return Path to file into which to dump split log edits.
- * @throws IOException
- */
- @SuppressWarnings("deprecation")
- @VisibleForTesting
- static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
- String tmpDirName, Configuration conf) throws IOException {
- FileSystem walFS = FSUtils.getWALFileSystem(conf);
- Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName());
- String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
- Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
- Path dir = getRegionDirRecoveredEditsDir(regionDir);
-
-
- if (walFS.exists(dir) && walFS.isFile(dir)) {
- Path tmp = new Path(tmpDirName);
- if (!walFS.exists(tmp)) {
- walFS.mkdirs(tmp);
- }
- tmp = new Path(tmp,
- HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
- LOG.warn("Found existing old file: {}. It could be some "
- + "leftover of an old installation. It should be a folder instead. "
- + "So moving it to {}", dir, tmp);
- if (!walFS.rename(dir, tmp)) {
- LOG.warn("Failed to sideline old file {}", dir);
- }
- }
-
- if (!walFS.exists(dir) && !walFS.mkdirs(dir)) {
- LOG.warn("mkdir failed on {}", dir);
- }
- // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
- // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
- // region's replayRecoveredEdits will not delete it
- String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
- fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
- return new Path(dir, fileName);
- }
-
- private static String getTmpRecoveredEditsFileName(String fileName) {
- return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
- }
-
- /**
- * Get the completed recovered edits file path, renaming it to be by last edit
- * in the file from its first edit. Then we could use the name to skip
- * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
- * @param srcPath
- * @param maximumEditLogSeqNum
- * @return dstPath take file's last edit log seq num as the name
- */
- private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
- long maximumEditLogSeqNum) {
- String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
- return new Path(srcPath.getParent(), fileName);
- }
-
- @VisibleForTesting
- static String formatRecoveredEditsFileName(final long seqid) {
- return String.format("%019d", seqid);
- }
-
- private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
- private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
-
- /**
- * @param regionDir
- * This regions directory in the filesystem.
- * @return The directory that holds recovered edits files for the region
- * <code>regionDir</code>
- */
- public static Path getRegionDirRecoveredEditsDir(final Path regionDir) {
- return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR);
- }
-
- /**
- * Check whether there is recovered.edits in the region dir
- * @param conf conf
- * @param regionInfo the region to check
- * @throws IOException IOException
- * @return true if recovered.edits exist in the region dir
- */
- public static boolean hasRecoveredEdits(final Configuration conf,
- final RegionInfo regionInfo) throws IOException {
- // No recovered.edits for non default replica regions
- if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
- return false;
- }
- //Only default replica region can reach here, so we can use regioninfo
- //directly without converting it to default replica's regioninfo.
- Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(),
- regionInfo.getEncodedName());
- NavigableSet<Path> files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir);
- return files != null && !files.isEmpty();
- }
-
-
- /**
- * Returns sorted set of edit files made by splitter, excluding files
- * with '.temp' suffix.
- *
- * @param walFS WAL FileSystem used to retrieving split edits files.
- * @param regionDir WAL region dir to look for recovered edits files under.
- * @return Files in passed <code>regionDir</code> as a sorted set.
- * @throws IOException
- */
- public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem walFS,
- final Path regionDir) throws IOException {
- NavigableSet<Path> filesSorted = new TreeSet<>();
- Path editsdir = getRegionDirRecoveredEditsDir(regionDir);
- if (!walFS.exists(editsdir)) {
- return filesSorted;
- }
- FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
- @Override
- public boolean accept(Path p) {
- boolean result = false;
- try {
- // Return files and only files that match the editfile names pattern.
- // There can be other files in this directory other than edit files.
- // In particular, on error, we'll move aside the bad edit file giving
- // it a timestamp suffix. See moveAsideBadEditsFile.
- Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
- result = walFS.isFile(p) && m.matches();
- // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
- // because it means splitwal thread is writting this file.
- if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
- result = false;
- }
- // Skip SeqId Files
- if (isSequenceIdFile(p)) {
- result = false;
- }
- } catch (IOException e) {
- LOG.warn("Failed isFile check on {}", p, e);
- }
- return result;
- }
- });
- if (ArrayUtils.isNotEmpty(files)) {
- Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
- }
- return filesSorted;
- }
-
- /**
- * Move aside a bad edits file.
- *
- * @param walFS WAL FileSystem used to rename bad edits file.
- * @param edits
- * Edits file to move aside.
- * @return The name of the moved aside file.
- * @throws IOException
- */
- public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
- throws IOException {
- Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
- + System.currentTimeMillis());
- if (!walFS.rename(edits, moveAsideName)) {
- LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
- }
- return moveAsideName;
- }
-
- private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
- private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
- private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
-
- /**
- * Is the given file a region open sequence id file.
- */
- @VisibleForTesting
- public static boolean isSequenceIdFile(final Path file) {
- return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
- || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
- }
-
- private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir)
- throws IOException {
- // TODO: Why are we using a method in here as part of our normal region open where
- // there is no splitting involved? Fix. St.Ack 01/20/2017.
- Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
- try {
- FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile);
- return files != null ? files : new FileStatus[0];
- } catch (FileNotFoundException e) {
- return new FileStatus[0];
- }
- }
-
- private static long getMaxSequenceId(FileStatus[] files) {
- long maxSeqId = -1L;
- for (FileStatus file : files) {
- String fileName = file.getPath().getName();
- try {
- maxSeqId = Math.max(maxSeqId, Long
- .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH)));
- } catch (NumberFormatException ex) {
- LOG.warn("Invalid SeqId File Name={}", fileName);
- }
- }
- return maxSeqId;
- }
-
- /**
- * Get the max sequence id which is stored in the region directory. -1 if none.
- */
- public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException {
- return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir));
- }
-
- /**
- * Create a file with name as region's max sequence id
- */
- public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId)
- throws IOException {
- FileStatus[] files = getSequenceIdFiles(walFS, regionDir);
- long maxSeqId = getMaxSequenceId(files);
- if (maxSeqId > newMaxSeqId) {
- throw new IOException("The new max sequence id " + newMaxSeqId +
- " is less than the old max sequence id " + maxSeqId);
- }
- // write a new seqId file
- Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir),
- newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX);
- if (newMaxSeqId != maxSeqId) {
- try {
- if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) {
- throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
- }
- LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId,
- maxSeqId);
- } catch (FileAlreadyExistsException ignored) {
- // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
- }
- }
- // remove old ones
- for (FileStatus status : files) {
- if (!newSeqIdFile.equals(status.getPath())) {
- walFS.delete(status.getPath(), false);
- }
- }
- }
-
- /**
* Create a new {@link Reader} for reading logs to split.
- *
- * @param file
- * @return A new Reader instance, caller should close
- * @throws IOException
- * @throws CorruptedLogFileException
*/
protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
throws IOException, CorruptedLogFileException {
@@ -761,7 +386,7 @@ public class WALSplitter {
}
static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
- throws CorruptedLogFileException, IOException {
+ throws CorruptedLogFileException, IOException {
try {
return in.next();
} catch (EOFException eof) {
@@ -771,9 +396,8 @@ public class WALSplitter {
} catch (IOException e) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
- if (e.getCause() != null &&
- (e.getCause() instanceof ParseException ||
- e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
+ if (e.getCause() != null && (e.getCause() instanceof ParseException
+ || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
LOG.warn("Parse exception from wal {}. Continuing", path, e);
return null;
}
@@ -781,19 +405,18 @@ public class WALSplitter {
throw e;
}
CorruptedLogFileException t =
- new CorruptedLogFileException("skipErrors=true Ignoring exception" +
- " while parsing wal " + path + ". Marking as corrupted");
+ new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing wal "
+ + path + ". Marking as corrupted");
t.initCause(e);
throw t;
}
}
/**
- * Create a new {@link Writer} for writing log splits.
+ * Create a new {@link WALProvider.Writer} for writing log splits.
* @return a new Writer instance, caller should close
*/
- protected Writer createWriter(Path logfile)
- throws IOException {
+ protected WALProvider.Writer createWriter(Path logfile) throws IOException {
return walFactory.createRecoveredEditsWriter(walFS, logfile);
}
@@ -826,7 +449,7 @@ public class WALSplitter {
// Wait/notify for when data has been produced by the writer thread,
// consumed by the reader thread, or an exception occurred
- public final Object dataAvailable = new Object();
+ final Object dataAvailable = new Object();
void writerThreadError(Throwable t) {
thrown.compareAndSet(null, t);
@@ -837,7 +460,9 @@ public class WALSplitter {
*/
void checkForErrors() throws IOException {
Throwable thrown = this.thrown.get();
- if (thrown == null) return;
+ if (thrown == null) {
+ return;
+ }
if (thrown instanceof IOException) {
throw new IOException(thrown);
} else {
@@ -847,134 +472,6 @@ public class WALSplitter {
}
/**
- * Class which accumulates edits and separates them into a buffer per region
- * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
- * a predefined threshold.
- *
- * Writer threads then pull region-specific buffers from this class.
- */
- public static class EntryBuffers {
- PipelineController controller;
-
- Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
- /* Track which regions are currently in the middle of writing. We don't allow
- an IO thread to pick up bytes from a region if we're already writing
- data for that region in a different IO thread. */
- Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
-
- long totalBuffered = 0;
- long maxHeapUsage;
- boolean splitWriterCreationBounded;
-
- public EntryBuffers(PipelineController controller, long maxHeapUsage) {
- this(controller, maxHeapUsage, false);
- }
-
- public EntryBuffers(PipelineController controller, long maxHeapUsage,
- boolean splitWriterCreationBounded){
- this.controller = controller;
- this.maxHeapUsage = maxHeapUsage;
- this.splitWriterCreationBounded = splitWriterCreationBounded;
- }
-
- /**
- * Append a log entry into the corresponding region buffer.
- * Blocks if the total heap usage has crossed the specified threshold.
- *
- * @throws InterruptedException
- * @throws IOException
- */
- public void appendEntry(Entry entry) throws InterruptedException, IOException {
- WALKey key = entry.getKey();
-
- RegionEntryBuffer buffer;
- long incrHeap;
- synchronized (this) {
- buffer = buffers.get(key.getEncodedRegionName());
- if (buffer == null) {
- buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName());
- buffers.put(key.getEncodedRegionName(), buffer);
- }
- incrHeap= buffer.appendEntry(entry);
- }
-
- // If we crossed the chunk threshold, wait for more space to be available
- synchronized (controller.dataAvailable) {
- totalBuffered += incrHeap;
- while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
- LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
- controller.dataAvailable.wait(2000);
- }
- controller.dataAvailable.notifyAll();
- }
- controller.checkForErrors();
- }
-
- /**
- * @return RegionEntryBuffer a buffer of edits to be written.
- */
- synchronized RegionEntryBuffer getChunkToWrite() {
- // The core part of limiting opening writers is it doesn't return chunk only if the
- // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
- // region during splitting. It will flush all the logs in the buffer after splitting
- // through a threadpool, which means the number of writers it created is under control.
- if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
- return null;
- }
- long biggestSize = 0;
- byte[] biggestBufferKey = null;
-
- for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
- long size = entry.getValue().heapSize();
- if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
- biggestSize = size;
- biggestBufferKey = entry.getKey();
- }
- }
- if (biggestBufferKey == null) {
- return null;
- }
-
- RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
- currentlyWriting.add(biggestBufferKey);
- return buffer;
- }
-
- void doneWriting(RegionEntryBuffer buffer) {
- synchronized (this) {
- boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
- assert removed;
- }
- long size = buffer.heapSize();
-
- synchronized (controller.dataAvailable) {
- totalBuffered -= size;
- // We may unblock writers
- controller.dataAvailable.notifyAll();
- }
- }
-
- synchronized boolean isRegionCurrentlyWriting(byte[] region) {
- return currentlyWriting.contains(region);
- }
-
- public void waitUntilDrained() {
- synchronized (controller.dataAvailable) {
- while (totalBuffered > 0) {
- try {
- controller.dataAvailable.wait(2000);
- } catch (InterruptedException e) {
- LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
- Thread.interrupted();
- break;
- }
- }
- }
- }
- }
-
- /**
* A buffer of some number of edits for a given region.
* This accumulates edits and also provides a memory optimization in order to
* share a single byte array instance for the table and region name.
@@ -1026,723 +523,6 @@ public class WALSplitter {
}
}
- public static class WriterThread extends Thread {
- private volatile boolean shouldStop = false;
- private PipelineController controller;
- private EntryBuffers entryBuffers;
- private OutputSink outputSink = null;
-
- WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
- super(Thread.currentThread().getName() + "-Writer-" + i);
- this.controller = controller;
- this.entryBuffers = entryBuffers;
- outputSink = sink;
- }
-
- @Override
- public void run() {
- try {
- doRun();
- } catch (Throwable t) {
- LOG.error("Exiting thread", t);
- controller.writerThreadError(t);
- }
- }
-
- private void doRun() throws IOException {
- LOG.trace("Writer thread starting");
- while (true) {
- RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
- if (buffer == null) {
- // No data currently available, wait on some more to show up
- synchronized (controller.dataAvailable) {
- if (shouldStop && !this.outputSink.flush()) {
- return;
- }
- try {
- controller.dataAvailable.wait(500);
- } catch (InterruptedException ie) {
- if (!shouldStop) {
- throw new RuntimeException(ie);
- }
- }
- }
- continue;
- }
-
- assert buffer != null;
- try {
- writeBuffer(buffer);
- } finally {
- entryBuffers.doneWriting(buffer);
- }
- }
- }
-
- private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
- outputSink.append(buffer);
- }
-
- void finish() {
- synchronized (controller.dataAvailable) {
- shouldStop = true;
- controller.dataAvailable.notifyAll();
- }
- }
- }
-
- /**
- * The following class is an abstraction class to provide a common interface to support
- * different ways of consuming recovered edits.
- */
- public static abstract class OutputSink {
-
- protected PipelineController controller;
- protected EntryBuffers entryBuffers;
-
- protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
- protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
- new ConcurrentHashMap<>();
-
-
- protected final List<WriterThread> writerThreads = Lists.newArrayList();
-
- /* Set of regions which we've decided should not output edits */
- protected final Set<byte[]> blacklistedRegions = Collections
- .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
-
- protected boolean closeAndCleanCompleted = false;
-
- protected boolean writersClosed = false;
-
- protected final int numThreads;
-
- protected CancelableProgressable reporter = null;
-
- protected AtomicLong skippedEdits = new AtomicLong();
-
- protected List<Path> splits = null;
-
- public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
- numThreads = numWriters;
- this.controller = controller;
- this.entryBuffers = entryBuffers;
- }
-
- void setReporter(CancelableProgressable reporter) {
- this.reporter = reporter;
- }
-
- /**
- * Start the threads that will pump data from the entryBuffers to the output files.
- */
- public synchronized void startWriterThreads() {
- for (int i = 0; i < numThreads; i++) {
- WriterThread t = new WriterThread(controller, entryBuffers, this, i);
- t.start();
- writerThreads.add(t);
- }
- }
-
- /**
- *
- * Update region's maximum edit log SeqNum.
- */
- void updateRegionMaximumEditLogSeqNum(Entry entry) {
- synchronized (regionMaximumEditLogSeqNum) {
- String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
- Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
- if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
- regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
- }
- }
- }
-
- /**
- * @return the number of currently opened writers
- */
- int getNumOpenWriters() {
- return this.writers.size();
- }
-
- long getSkippedEdits() {
- return this.skippedEdits.get();
- }
-
- /**
- * Wait for writer threads to dump all info to the sink
- * @return true when there is no error
- * @throws IOException
- */
- protected boolean finishWriting(boolean interrupt) throws IOException {
- LOG.debug("Waiting for split writer threads to finish");
- boolean progress_failed = false;
- for (WriterThread t : writerThreads) {
- t.finish();
- }
- if (interrupt) {
- for (WriterThread t : writerThreads) {
- t.interrupt(); // interrupt the writer threads. We are stopping now.
- }
- }
-
- for (WriterThread t : writerThreads) {
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- try {
- t.join();
- } catch (InterruptedException ie) {
- IOException iie = new InterruptedIOException();
- iie.initCause(ie);
- throw iie;
- }
- }
- controller.checkForErrors();
- LOG.info("{} split writers finished; closing.", this.writerThreads.size());
- return (!progress_failed);
- }
-
- public abstract List<Path> finishWritingAndClose() throws IOException;
-
- /**
- * @return a map from encoded region ID to the number of edits written out for that region.
- */
- public abstract Map<byte[], Long> getOutputCounts();
-
- /**
- * @return number of regions we've recovered
- */
- public abstract int getNumberOfRecoveredRegions();
-
- /**
- * @param buffer A WAL Edit Entry
- * @throws IOException
- */
- public abstract void append(RegionEntryBuffer buffer) throws IOException;
-
- /**
- * WriterThread call this function to help flush internal remaining edits in buffer before close
- * @return true when underlying sink has something to flush
- */
- public boolean flush() throws IOException {
- return false;
- }
-
- /**
- * Some WALEdit's contain only KV's for account on what happened to a region.
- * Not all sinks will want to get all of those edits.
- *
- * @return Return true if this sink wants to accept this region-level WALEdit.
- */
- public abstract boolean keepRegionEvent(Entry entry);
- }
-
- /**
- * Class that manages the output streams from the log splitting process.
- */
- class LogRecoveredEditsOutputSink extends OutputSink {
-
- public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
- int numWriters) {
- // More threads could potentially write faster at the expense
- // of causing more disk seeks as the logs are split.
- // 3. After a certain setting (probably around 3) the
- // process will be bound on the reader in the current
- // implementation anyway.
- super(controller, entryBuffers, numWriters);
- }
-
- /**
- * @return null if failed to report progress
- * @throws IOException
- */
- @Override
- public List<Path> finishWritingAndClose() throws IOException {
- boolean isSuccessful = false;
- List<Path> result = null;
- try {
- isSuccessful = finishWriting(false);
- } finally {
- result = close();
- List<IOException> thrown = closeLogWriters(null);
- if (CollectionUtils.isNotEmpty(thrown)) {
- throw MultipleIOException.createIOException(thrown);
- }
- }
- if (isSuccessful) {
- splits = result;
- }
- return splits;
- }
-
- // delete the one with fewer wal entries
- private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
- throws IOException {
- long dstMinLogSeqNum = -1L;
- try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
- WAL.Entry entry = reader.next();
- if (entry != null) {
- dstMinLogSeqNum = entry.getKey().getSequenceId();
- }
- } catch (EOFException e) {
- LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
- dst, e);
- }
- if (wap.minLogSeqNum < dstMinLogSeqNum) {
- LOG.warn("Found existing old edits file. It could be the result of a previous failed"
- + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
- + walFS.getFileStatus(dst).getLen());
- if (!walFS.delete(dst, false)) {
- LOG.warn("Failed deleting of old {}", dst);
- throw new IOException("Failed deleting of old " + dst);
- }
- } else {
- LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
- + ", length=" + walFS.getFileStatus(wap.p).getLen());
- if (!walFS.delete(wap.p, false)) {
- LOG.warn("Failed deleting of {}", wap.p);
- throw new IOException("Failed deleting of " + wap.p);
- }
- }
- }
-
- /**
- * Close all of the output streams.
- * @return the list of paths written.
- */
- List<Path> close() throws IOException {
- Preconditions.checkState(!closeAndCleanCompleted);
-
- final List<Path> paths = new ArrayList<>();
- final List<IOException> thrown = Lists.newArrayList();
- ThreadPoolExecutor closeThreadPool = Threads
- .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
- private int count = 1;
-
- @Override public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "split-log-closeStream-" + count++);
- return t;
- }
- });
- CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
- boolean progress_failed;
- try {
- progress_failed = executeCloseTask(completionService, thrown, paths);
- } catch (InterruptedException e) {
- IOException iie = new InterruptedIOException();
- iie.initCause(e);
- throw iie;
- } catch (ExecutionException e) {
- throw new IOException(e.getCause());
- } finally {
- closeThreadPool.shutdownNow();
- }
- if (!thrown.isEmpty()) {
- throw MultipleIOException.createIOException(thrown);
- }
- writersClosed = true;
- closeAndCleanCompleted = true;
- if (progress_failed) {
- return null;
- }
- return paths;
- }
-
- /**
- * @param completionService threadPool to execute the closing tasks
- * @param thrown store the exceptions
- * @param paths arrayList to store the paths written
- * @return if close tasks executed successful
- */
- boolean executeCloseTask(CompletionService<Void> completionService,
- List<IOException> thrown, List<Path> paths)
- throws InterruptedException, ExecutionException {
- for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
- }
- completionService.submit(new Callable<Void>() {
- @Override public Void call() throws Exception {
- WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
- Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
- paths.add(dst);
- return null;
- }
- });
- }
- boolean progress_failed = false;
- for (int i = 0, n = this.writers.size(); i < n; i++) {
- Future<Void> future = completionService.take();
- future.get();
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- }
- return progress_failed;
- }
-
- Path closeWriter(String encodedRegionName, WriterAndPath wap,
- List<IOException> thrown) throws IOException{
- LOG.trace("Closing " + wap.p);
- try {
- wap.w.close();
- } catch (IOException ioe) {
- LOG.error("Couldn't close log at " + wap.p, ioe);
- thrown.add(ioe);
- return null;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
- + " edits, skipped " + wap.editsSkipped + " edits in "
- + (wap.nanosSpent / 1000 / 1000) + "ms");
- }
- if (wap.editsWritten == 0) {
- // just remove the empty recovered.edits file
- if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) {
- LOG.warn("Failed deleting empty " + wap.p);
- throw new IOException("Failed deleting empty " + wap.p);
- }
- return null;
- }
-
- Path dst = getCompletedRecoveredEditsFilePath(wap.p,
- regionMaximumEditLogSeqNum.get(encodedRegionName));
- try {
- if (!dst.equals(wap.p) && walFS.exists(dst)) {
- deleteOneWithFewerEntries(wap, dst);
- }
- // Skip the unit tests which create a splitter that reads and
- // writes the data without touching disk.
- // TestHLogSplit#testThreading is an example.
- if (walFS.exists(wap.p)) {
- if (!walFS.rename(wap.p, dst)) {
- throw new IOException("Failed renaming " + wap.p + " to " + dst);
- }
- LOG.info("Rename " + wap.p + " to " + dst);
- }
- } catch (IOException ioe) {
- LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
- thrown.add(ioe);
- return null;
- }
- return dst;
- }
-
- private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
- if (writersClosed) {
- return thrown;
- }
- if (thrown == null) {
- thrown = Lists.newArrayList();
- }
- try {
- for (WriterThread t : writerThreads) {
- while (t.isAlive()) {
- t.shouldStop = true;
- t.interrupt();
- try {
- t.join(10);
- } catch (InterruptedException e) {
- IOException iie = new InterruptedIOException();
- iie.initCause(e);
- throw iie;
- }
- }
- }
- } finally {
- WriterAndPath wap = null;
- for (SinkWriter tmpWAP : writers.values()) {
- try {
- wap = (WriterAndPath) tmpWAP;
- wap.w.close();
- } catch (IOException ioe) {
- LOG.error("Couldn't close log at " + wap.p, ioe);
- thrown.add(ioe);
- continue;
- }
- LOG.info(
- "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
- / 1000 / 1000) + "ms)");
- }
- writersClosed = true;
- }
-
- return thrown;
- }
-
- /**
- * Get a writer and path for a log starting at the given entry. This function is threadsafe so
- * long as multiple threads are always acting on different regions.
- * @return null if this region shouldn't output any logs
- */
- WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
- byte region[] = entry.getKey().getEncodedRegionName();
- String regionName = Bytes.toString(region);
- WriterAndPath ret = (WriterAndPath) writers.get(regionName);
- if (ret != null) {
- return ret;
- }
- // If we already decided that this region doesn't get any output
- // we don't need to check again.
- if (blacklistedRegions.contains(region)) {
- return null;
- }
- ret = createWAP(region, entry);
- if (ret == null) {
- blacklistedRegions.add(region);
- return null;
- }
- if(reusable) {
- writers.put(regionName, ret);
- }
- return ret;
- }
-
- /**
- * @return a path with a write for that path. caller should close.
- */
- WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
- String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
- HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
- Path regionedits = getRegionSplitEditsPath(entry,
- fileBeingSplit.getPath().getName(), tmpDirName, conf);
- if (regionedits == null) {
- return null;
- }
- FileSystem walFs = FSUtils.getWALFileSystem(conf);
- if (walFs.exists(regionedits)) {
- LOG.warn("Found old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
- + walFs.getFileStatus(regionedits).getLen());
- if (!walFs.delete(regionedits, false)) {
- LOG.warn("Failed delete of old {}", regionedits);
- }
- }
- Writer w = createWriter(regionedits);
- LOG.debug("Creating writer path={}", regionedits);
- return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
- }
-
- void filterCellByStore(Entry logEntry) {
- Map<byte[], Long> maxSeqIdInStores =
- regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
- if (MapUtils.isEmpty(maxSeqIdInStores)) {
- return;
- }
- // Create the array list for the cells that aren't filtered.
- // We make the assumption that most cells will be kept.
- ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
- for (Cell cell : logEntry.getEdit().getCells()) {
- if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
- keptCells.add(cell);
- } else {
- byte[] family = CellUtil.cloneFamily(cell);
- Long maxSeqId = maxSeqIdInStores.get(family);
- // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
- // or the master was crashed before and we can not get the information.
- if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
- keptCells.add(cell);
- }
- }
- }
-
- // Anything in the keptCells array list is still live.
- // So rather than removing the cells from the array list
- // which would be an O(n^2) operation, we just replace the list
- logEntry.getEdit().setCells(keptCells);
- }
-
- @Override
- public void append(RegionEntryBuffer buffer) throws IOException {
- appendBuffer(buffer, true);
- }
-
- WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
- List<Entry> entries = buffer.entryBuffer;
- if (entries.isEmpty()) {
- LOG.warn("got an empty buffer, skipping");
- return null;
- }
-
- WriterAndPath wap = null;
-
- long startTime = System.nanoTime();
- try {
- int editsCount = 0;
-
- for (Entry logEntry : entries) {
- if (wap == null) {
- wap = getWriterAndPath(logEntry, reusable);
- if (wap == null) {
- if (LOG.isTraceEnabled()) {
- // This log spews the full edit. Can be massive in the log. Enable only debugging
- // WAL lost edit issues.
- LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
- }
- return null;
- }
- }
- filterCellByStore(logEntry);
- if (!logEntry.getEdit().isEmpty()) {
- wap.w.append(logEntry);
- this.updateRegionMaximumEditLogSeqNum(logEntry);
- editsCount++;
- } else {
- wap.incrementSkippedEdits(1);
- }
- }
- // Pass along summary statistics
- wap.incrementEdits(editsCount);
- wap.incrementNanoTime(System.nanoTime() - startTime);
- } catch (IOException e) {
- e = e instanceof RemoteException ?
- ((RemoteException)e).unwrapRemoteException() : e;
- LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
- throw e;
- }
- return wap;
- }
-
- @Override
- public boolean keepRegionEvent(Entry entry) {
- ArrayList<Cell> cells = entry.getEdit().getCells();
- for (Cell cell : cells) {
- if (WALEdit.isCompactionMarker(cell)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * @return a map from encoded region ID to the number of edits written out for that region.
- */
- @Override
- public Map<byte[], Long> getOutputCounts() {
- TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
- ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
- }
- return ret;
- }
-
- @Override
- public int getNumberOfRecoveredRegions() {
- return writers.size();
- }
- }
-
- /**
- *
- */
- class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
-
- private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
-
- public BoundedLogWriterCreationOutputSink(PipelineController controller,
- EntryBuffers entryBuffers, int numWriters) {
- super(controller, entryBuffers, numWriters);
- }
-
- @Override
- public List<Path> finishWritingAndClose() throws IOException {
- boolean isSuccessful;
- List<Path> result;
- try {
- isSuccessful = finishWriting(false);
- } finally {
- result = close();
- }
- if (isSuccessful) {
- splits = result;
- }
- return splits;
- }
-
- @Override
- boolean executeCloseTask(CompletionService<Void> completionService,
- List<IOException> thrown, List<Path> paths)
- throws InterruptedException, ExecutionException {
- for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
- LOG.info("Submitting writeThenClose of {}",
- Arrays.toString(buffer.getValue().encodedRegionName));
- completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- Path dst = writeThenClose(buffer.getValue());
- paths.add(dst);
- return null;
- }
- });
- }
- boolean progress_failed = false;
- for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
- Future<Void> future = completionService.take();
- future.get();
- if (!progress_failed && reporter != null && !reporter.progress()) {
- progress_failed = true;
- }
- }
-
- return progress_failed;
- }
-
- /**
- * since the splitting process may create multiple output files, we need a map
- * regionRecoverStatMap to track the output count of each region.
- * @return a map from encoded region ID to the number of edits written out for that region.
- */
- @Override
- public Map<byte[], Long> getOutputCounts() {
- Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
- for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
- regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
- }
- return regionRecoverStatMapResult;
- }
-
- /**
- * @return the number of recovered regions
- */
- @Override
- public int getNumberOfRecoveredRegions() {
- return regionRecoverStatMap.size();
- }
-
- /**
- * Append the buffer to a new recovered edits file, then close it after all done
- * @param buffer contain all entries of a certain region
- * @throws IOException when closeWriter failed
- */
- @Override
- public void append(RegionEntryBuffer buffer) throws IOException {
- writeThenClose(buffer);
- }
-
- private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
- WriterAndPath wap = appendBuffer(buffer, false);
- if(wap != null) {
- String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
- Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
- if (value != null) {
- Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
- regionRecoverStatMap.put(encodedRegionName, newValue);
- }
- }
-
- Path dst = null;
- List<IOException> thrown = new ArrayList<>();
- if(wap != null){
- dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
- }
- if (!thrown.isEmpty()) {
- throw MultipleIOException.createIOException(thrown);
- }
- return dst;
- }
- }
-
/**
* Class wraps the actual writer which writes data out and related statistics
*/
@@ -1771,14 +551,14 @@ public class WALSplitter {
* Private data structure that wraps a Writer and its Path, also collecting statistics about the
* data written to this output.
*/
- private final static class WriterAndPath extends SinkWriter {
- final Path p;
- final Writer w;
+ final static class WriterAndPath extends SinkWriter {
+ final Path path;
+ final Writer writer;
final long minLogSeqNum;
- WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
- this.p = p;
- this.w = w;
+ WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) {
+ this.path = path;
+ this.writer = writer;
this.minLogSeqNum = minLogSeqNum;
}
}
@@ -1790,125 +570,4 @@ public class WALSplitter {
super(s);
}
}
-
- /** A struct used by getMutationsFromWALEntry */
- public static class MutationReplay implements Comparable<MutationReplay> {
- public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
- this.type = type;
- this.mutation = mutation;
- if(this.mutation.getDurability() != Durability.SKIP_WAL) {
- // using ASYNC_WAL for relay
- this.mutation.setDurability(Durability.ASYNC_WAL);
- }
- this.nonceGroup = nonceGroup;
- this.nonce = nonce;
- }
-
- public final MutationType type;
- public final Mutation mutation;
- public final long nonceGroup;
- public final long nonce;
-
- @Override
- public int compareTo(final MutationReplay d) {
- return this.mutation.compareTo(d.mutation);
- }
-
- @Override
- public boolean equals(Object obj) {
- if(!(obj instanceof MutationReplay)) {
- return false;
- } else {
- return this.compareTo((MutationReplay)obj) == 0;
- }
- }
-
- @Override
- public int hashCode() {
- return this.mutation.hashCode();
- }
- }
-
- /**
- * This function is used to construct mutations from a WALEntry. It also
- * reconstructs WALKey & WALEdit from the passed in WALEntry
- * @param entry
- * @param cells
- * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
- * extracted from the passed in WALEntry.
- * @return list of Pair<MutationType, Mutation> to be replayed
- * @throws IOException
- */
- public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
- Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
- if (entry == null) {
- // return an empty array
- return Collections.emptyList();
- }
-
- long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
- entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
- int count = entry.getAssociatedCellCount();
- List<MutationReplay> mutations = new ArrayList<>();
- Cell previousCell = null;
- Mutation m = null;
- WALKeyImpl key = null;
- WALEdit val = null;
- if (logEntry != null) {
- val = new WALEdit();
- }
-
- for (int i = 0; i < count; i++) {
- // Throw index out of bounds if our cell count is off
- if (!cells.advance()) {
- throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
- }
- Cell cell = cells.current();
- if (val != null) val.add(cell);
-
- boolean isNewRowOrType =
- previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
- || !CellUtil.matchingRows(previousCell, cell);
- if (isNewRowOrType) {
- // Create new mutation
- if (CellUtil.isDelete(cell)) {
- m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- // Deletes don't have nonces.
- mutations.add(new MutationReplay(
- MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
- } else {
- m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
- // Puts might come from increment or append, thus we need nonces.
- long nonceGroup = entry.getKey().hasNonceGroup()
- ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
- long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
- mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
- }
- }
- if (CellUtil.isDelete(cell)) {
- ((Delete) m).add(cell);
- } else {
- ((Put) m).add(cell);
- }
- m.setDurability(durability);
- previousCell = cell;
- }
-
- // reconstruct WALKey
- if (logEntry != null) {
- org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto =
- entry.getKey();
- List<UUID> clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount());
- for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
- clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
- }
- key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
- walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
- clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
- logEntry.setFirst(key);
- logEntry.setSecond(val);
- }
-
- return mutations;
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 084dd89..8087ae8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -81,7 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
@@ -226,14 +226,14 @@ public abstract class AbstractTestDLS {
for (RegionInfo hri : regions) {
Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
- Path editsdir = WALSplitter
+ Path editsdir = WALSplitUtil
.getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
tableName, hri.getEncodedName()));
LOG.debug("checking edits dir " + editsdir);
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
- if (WALSplitter.isSequenceIdFile(p)) {
+ if (WALSplitUtil.isSequenceIdFile(p)) {
return false;
}
return true;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java
index d5ec62d..0e90246 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -163,7 +163,7 @@ public class TestDeleteColumnFamilyProcedureFromClient {
FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
@Override
public boolean accept(Path p) {
- if (WALSplitter.isSequenceIdFile(p)) {
+ if (WALSplitUtil.isSequenceIdFile(p)) {
return false;
}
return true;
@@ -244,7 +244,7 @@ public class TestDeleteColumnFamilyProcedureFromClient {
FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() {
@Override
public boolean accept(Path p) {
- if (WALSplitter.isSequenceIdFile(p)) {
+ if (WALSplitUtil.isSequenceIdFile(p)) {
return false;
}
return true;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index c09f702..514d0d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -161,7 +161,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -687,7 +687,7 @@ public class TestHRegion {
FileSystem fs = region.getRegionFileSystem().getFileSystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
- Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
@@ -738,7 +738,7 @@ public class TestHRegion {
FileSystem fs = region.getRegionFileSystem().getFileSystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
- Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
@@ -791,7 +791,7 @@ public class TestHRegion {
Path regiondir = region.getRegionFileSystem().getRegionDir();
FileSystem fs = region.getRegionFileSystem().getFileSystem();
- Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
for (int i = 1000; i < 1050; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
FSDataOutputStream dos = fs.create(recoveredEdits);
@@ -824,7 +824,7 @@ public class TestHRegion {
assertEquals(0, region.getStoreFileList(columns).size());
- Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
@@ -940,7 +940,7 @@ public class TestHRegion {
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
- Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
fs.create(recoveredEdits);
@@ -1065,7 +1065,7 @@ public class TestHRegion {
// now write those markers to the recovered edits again.
- Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
fs.create(recoveredEdits);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 98119db..ae4154f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.AfterClass;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
index 4a32820..c3f6d04 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -149,7 +149,7 @@ public class TestRecoveredEdits {
assertTrue(storeFiles.isEmpty());
region.close();
Path regionDir = region.getRegionDir(hbaseRootDir, hri);
- Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
+ Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir);
// This is a little fragile getting this path to a file of 10M of edits.
Path recoveredEditsFile = new Path(
System.getProperty("test.build.classes", "target/test-classes"),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
index e55f3ee..d9f7766 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -146,7 +146,7 @@ public class TestRecoveredEditsReplayAndAbort {
FileSystem fs = region.getRegionFileSystem().getFileSystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
- Path recoveredEditsDir = WALSplitter
+ Path recoveredEditsDir = WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1200;
long minSeqId = 1000;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 7663313..16c5837 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.junit.After;
@@ -902,15 +903,12 @@ public abstract class AbstractTestWALReplay {
assertTrue(listStatus.length > 0);
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, wals);
- FileStatus[] listStatus1 = this.fs.listStatus(
- new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(),
- "recovered.edits")), new PathFilter() {
+ FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName),
+ new Path(hri.getEncodedName(), "recovered.edits")),
+ new PathFilter() {
@Override
public boolean accept(Path p) {
- if (WALSplitter.isSequenceIdFile(p)) {
- return false;
- }
- return true;
+ return !WALSplitUtil.isSequenceIdFile(p);
}
});
int editCount = 0;
@@ -956,7 +954,7 @@ public abstract class AbstractTestWALReplay {
runWALSplit(this.conf);
// here we let the DFSInputStream throw an IOException just after the WALHeader.
- Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first();
+ Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
FSDataInputStream stream = fs.open(editFile);
stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java
index acc1f55..3a21405 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -167,13 +167,13 @@ public class TestRestoreSnapshotHelper {
region.initialize();
Path recoveredEdit =
FSUtils.getWALRegionDir(conf, tableName, region.getRegionInfo().getEncodedName());
- long maxSeqId = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
+ long maxSeqId = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit);
// open restored region without set restored flag
HRegion region2 = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs,
conf, restoredRegion, htd, null);
region2.initialize();
- long maxSeqId2 = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit);
+ long maxSeqId2 = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit);
Assert.assertTrue(maxSeqId2 > maxSeqId);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
index 8ae638c..6386112 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java
@@ -66,30 +66,30 @@ public class TestReadWriteSeqIdFiles {
@Test
public void test() throws IOException {
- WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
- assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
- WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
- assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR));
+ WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L);
+ assertEquals(1000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR));
+ WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L);
+ assertEquals(2000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR));
// can not write a sequence id which is smaller
try {
- WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
+ WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L);
} catch (IOException e) {
// expected
LOG.info("Expected error", e);
}
- Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR);
+ Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(REGION_DIR);
FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
- return WALSplitter.isSequenceIdFile(p);
+ return WALSplitUtil.isSequenceIdFile(p);
}
});
// only one seqid file should exist
assertEquals(1, files.length);
// verify all seqId files aren't treated as recovered.edits files
- NavigableSet<Path> recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR);
+ NavigableSet<Path> recoveredEdits = WALSplitUtil.getSplitEditFilesSorted(walFS, REGION_DIR);
assertEquals(0, recoveredEdits.size());
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
index b20b3a5..741d449 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.junit.ClassRule;
@@ -89,17 +88,17 @@ public class TestWALMethods {
Path regiondir = util.getDataTestDir("regiondir");
fs.delete(regiondir, true);
fs.mkdirs(regiondir);
- Path recoverededits = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
- String first = WALSplitter.formatRecoveredEditsFileName(-1);
+ Path recoverededits = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
+ String first = WALSplitUtil.formatRecoveredEditsFileName(-1);
createFile(fs, recoverededits, first);
- createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(0));
- createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(1));
- createFile(fs, recoverededits, WALSplitter
+ createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(0));
+ createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(1));
+ createFile(fs, recoverededits, WALSplitUtil
.formatRecoveredEditsFileName(11));
- createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(2));
- createFile(fs, recoverededits, WALSplitter
+ createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(2));
+ createFile(fs, recoverededits, WALSplitUtil
.formatRecoveredEditsFileName(50));
- String last = WALSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE);
+ String last = WALSplitUtil.formatRecoveredEditsFileName(Long.MAX_VALUE);
createFile(fs, recoverededits, last);
createFile(fs, recoverededits,
Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
@@ -108,21 +107,21 @@ public class TestWALMethods {
FSUtils.setRootDir(walConf, regiondir);
(new WALFactory(walConf, "dummyLogName")).getWAL(null);
- NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+ NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
assertEquals(7, files.size());
assertEquals(files.pollFirst().getName(), first);
assertEquals(files.pollLast().getName(), last);
assertEquals(files.pollFirst().getName(),
- WALSplitter
+ WALSplitUtil
.formatRecoveredEditsFileName(0));
assertEquals(files.pollFirst().getName(),
- WALSplitter
+ WALSplitUtil
.formatRecoveredEditsFileName(1));
assertEquals(files.pollFirst().getName(),
- WALSplitter
+ WALSplitUtil
.formatRecoveredEditsFileName(2));
assertEquals(files.pollFirst().getName(),
- WALSplitter
+ WALSplitUtil
.formatRecoveredEditsFileName(11));
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index b4e2533..696ddea 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -399,7 +399,7 @@ public class TestWALSplit {
Path regiondir = new Path(tdir,
RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
fs.mkdirs(regiondir);
- Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
fs.createNewFile(parent); // create a recovered.edits file
String parentOfParent = p.getParent().getParent().getName();
@@ -414,7 +414,7 @@ public class TestWALSplit {
new Entry(new WALKeyImpl(encoded,
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
- Path p = WALSplitter.getRegionSplitEditsPath(entry,
+ Path p = WALSplitUtil.getRegionSplitEditsPath(entry,
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
return p;
}
@@ -422,10 +422,10 @@ public class TestWALSplit {
@Test
public void testHasRecoveredEdits() throws IOException {
Path p = createRecoveredEditsPathForRegion();
- assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
+ assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
String renamedEdit = p.getName().split("-")[0];
fs.createNewFile(new Path(p.getParent(), renamedEdit));
- assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
+ assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
}
private void useDifferentDFSClient() throws IOException {
@@ -1157,7 +1157,7 @@ public class TestWALSplit {
// After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix.
- NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+ NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
if (files != null && !files.isEmpty()) {
for (Path file : files) {
if (!this.walFS.delete(file, false)) {
@@ -1243,12 +1243,12 @@ public class TestWALSplit {
throws IOException {
Path tdir = FSUtils.getWALTableDir(conf, table);
@SuppressWarnings("deprecation")
- Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+ Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
Bytes.toString(Bytes.toBytes(region))));
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
- if (WALSplitter.isSequenceIdFile(p)) {
+ if (WALSplitUtil.isSequenceIdFile(p)) {
return false;
}
return true;