You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/06/17 22:02:41 UTC
[hbase] branch branch-2.3 updated: HBASE-24577 Doc WALSplitter
classes (#1913)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 2eae886 HBASE-24577 Doc WALSplitter classes (#1913)
2eae886 is described below
commit 2eae886e3267543221a25a45a429a563dd17816d
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Wed Jun 17 14:59:30 2020 -0700
HBASE-24577 Doc WALSplitter classes (#1913)
Signed-off-by: Anoop Sam John <an...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../wal/BoundedRecoveredHFilesOutputSink.java | 19 +++++---
.../org/apache/hadoop/hbase/wal/OutputSink.java | 3 ++
.../hadoop/hbase/wal/RecoveredEditsOutputSink.java | 12 +++--
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 57 ++++++++++++++--------
.../hadoop/hbase/wal/TestWALSplitToHFile.java | 4 +-
5 files changed, 61 insertions(+), 34 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
index c340265..02c89a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
-
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
@@ -29,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparatorImpl;
@@ -51,13 +49,16 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A WALSplitter sink that outputs {@link org.apache.hadoop.hbase.io.hfile.HFile}s.
+ * Runs with a bounded number of HFile writers at any one time rather than let the count run up.
+ * @see BoundedRecoveredEditsOutputSink for a sink implementation that writes intermediate
+ * recovered.edits files.
+ */
@InterfaceAudience.Private
public class BoundedRecoveredHFilesOutputSink extends OutputSink {
private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class);
- public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
- public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
-
private final WALSplitter walSplitter;
// Since the splitting process may create multiple output files, we need a map
@@ -80,6 +81,8 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
Map<String, CellSet> familyCells = new HashMap<>();
Map<String, Long> familySeqIds = new HashMap<>();
boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME);
+ // First iterate all Cells to find which column families are present and to stamp Cell with
+ // sequence id.
for (WAL.Entry entry : buffer.entries) {
long seqId = entry.getKey().getSequenceId();
List<Cell> cells = entry.getEdit().getCells();
@@ -99,12 +102,13 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
}
}
- // The key point is create a new writer for each column family, write edits then close writer.
+ // Create a new hfile writer for each column family, write edits then close writer.
String regionName = Bytes.toString(buffer.encodedRegionName);
for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) {
String familyName = cellsEntry.getKey();
StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName,
familySeqIds.get(familyName), familyName, isMetaTable);
+ LOG.trace("Created {}", writer.getPath());
openingWritersNum.incrementAndGet();
try {
for (Cell cell : cellsEntry.getValue()) {
@@ -118,6 +122,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
openingWritersNum.decrementAndGet();
} finally {
writer.close();
+ LOG.trace("Closed {}, edits={}", writer.getPath(), familyCells.size());
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
index de62c4d..bdc7772 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java
@@ -56,6 +56,9 @@ public abstract class OutputSink {
protected final AtomicLong totalSkippedEdits = new AtomicLong();
+ /**
+ * List of all the files produced by this sink
+ */
protected final List<Path> splits = new ArrayList<>();
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
index c5cfd31..798c716 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
@@ -34,12 +33,15 @@ import org.apache.hadoop.io.MultipleIOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Class that manages the output streams from the log splitting process.
- * Every region only has one recovered edits.
+ * Every region only has one recovered edits file PER split WAL (if we split
+ * multiple WALs during a log-splitting session, on open, a Region may
+ * have multiple recovered.edits files to replay -- one per split WAL).
+ * @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on
+ * the number of writers active at one time (makes for better throughput).
*/
@InterfaceAudience.Private
class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
@@ -81,6 +83,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
if (ret == null) {
return null;
}
+ LOG.trace("Created {}", ret.path);
writers.put(Bytes.toString(region), ret);
return ret;
}
@@ -106,6 +109,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
for (RecoveredEditsWriter writer : writers.values()) {
closeCompletionService.submit(() -> {
Path dst = closeRecoveredEditsWriter(writer, thrown);
+ LOG.trace("Closed {}", dst);
splits.add(dst);
return null;
});
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 6dcf3d3..ae0347f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,10 +17,7 @@
*/
package org.apache.hadoop.hbase.wal;
-import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
-import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
-
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -63,17 +60,19 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
-
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
+
/**
- * This class is responsible for splitting up a bunch of regionserver commit log
- * files that are no longer being written to, into new files, one per region, for
- * recovering data on startup. Delete the old log files when finished.
+ * Split RegionServer WAL files. Splits the WAL into new files,
+ * one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
+ * See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or
+ * {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable,
+ * LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for
+ * entry-point.
*/
@InterfaceAudience.Private
public class WALSplitter {
@@ -96,7 +95,12 @@ public class WALSplitter {
OutputSink outputSink;
private EntryBuffers entryBuffers;
+ /**
+ * Coordinator for split log. Used by the zk-based log splitter.
+ * Not used by the procedure v2-based log splitter.
+ */
private SplitLogWorkerCoordination splitLogWorkerCoordination;
+
private final WALFactory walFactory;
private MonitoredTask status;
@@ -115,7 +119,20 @@ public class WALSplitter {
private final String tmpDirName;
+ /**
+ * Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files.
+ */
+ public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
+ public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
+
+ /**
+ * True if we are to run with bounded amount of writers rather than let the count blossom.
+ * Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that
+ * is always bounded. Only applies when you are doing recovery to 'recovered.edits'
+ * files (the old default). Bounded writing tends to have higher throughput.
+ */
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
+
public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
public final static String SPLIT_WAL_WRITER_THREADS =
"hbase.regionserver.hlog.splitlog.writer.threads";
@@ -184,11 +201,7 @@ public class WALSplitter {
}
/**
- * Splits a WAL file into region's recovered-edits directory.
- * This is the main entry point for distributed log splitting from SplitLogWorker.
- * <p>
- * If the log file has N regions then N recovered.edits files will be produced.
- * <p>
+ * Splits a WAL file.
* @return false if it is interrupted by the progress-able.
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
@@ -202,10 +215,13 @@ public class WALSplitter {
return s.splitLogFile(logfile, reporter);
}
- // A wrapper to split one log folder using the method used by distributed
- // log splitting. Used by tools and unit tests. It should be package private.
- // It is public only because TestWALObserver is in a different package,
- // which uses this method to do log splitting.
+ /**
+ * Split a folder of WAL files. Delete the directory when done.
+ * Used by tools and unit tests. It should be package private.
+ * It is public only because TestWALObserver is in a different package,
+ * which uses this method to do log splitting.
+ * @return List of output files created by the split.
+ */
@VisibleForTesting
public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
Configuration conf, final WALFactory factory) throws IOException {
@@ -233,7 +249,7 @@ public class WALSplitter {
}
/**
- * log splitting implementation, splits one log file.
+ * WAL splitting implementation, splits one log file.
* @param logfile should be an actual log file.
*/
@VisibleForTesting
@@ -285,7 +301,8 @@ public class WALSplitter {
String encodedRegionNameAsStr = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) {
- if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))) {
+ if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
+ encodedRegionNameAsStr))) {
// The region directory itself is not present in the FS. This indicates that
// the region/table is already removed. We can just skip all the edits for this
// region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
index dabfd42..ce4a472 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,13 +18,12 @@
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
-import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
+import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
-
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;