You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/02/18 12:49:29 UTC
hbase git commit: HBASE-14949 Resolve name conflict when splitting if
there are duplicated WAL entries
Repository: hbase
Updated Branches:
refs/heads/master 6f8c7dca1 -> d2ba87509
HBASE-14949 Resolve name conflict when splitting if there are duplicated WAL entries
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d2ba8750
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d2ba8750
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d2ba8750
Branch: refs/heads/master
Commit: d2ba87509b8d193f58183beff4ab76c7edf47e11
Parents: 6f8c7dc
Author: zhangduo <zh...@apache.org>
Authored: Thu Feb 18 10:31:01 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Feb 18 19:48:52 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/wal/WALSplitter.java | 88 +++++++++++-----
.../hbase/regionserver/wal/TestWALReplay.java | 105 ++++++++++++++++---
2 files changed, 154 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d2ba8750/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 8abd950..54b82b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -176,6 +176,10 @@ public class WALSplitter {
// Min batch size when replay WAL edits
private final int minBatchSize;
+ // the file being split currently
+ private FileStatus fileBeingSplit;
+
+ @VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker,
CoordinatedStateManager csm, RecoveryMode mode) {
@@ -267,6 +271,7 @@ public class WALSplitter {
* log splitting implementation, splits one log file.
* @param logfile should be an actual log file.
*/
+ @VisibleForTesting
boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
Preconditions.checkState(status == null);
Preconditions.checkArgument(logfile.isFile(),
@@ -285,6 +290,7 @@ public class WALSplitter {
TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
Reader in = null;
+ this.fileBeingSplit = logfile;
try {
long logLength = logfile.getLen();
LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
@@ -349,7 +355,7 @@ public class WALSplitter {
}
lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
}
- if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
+ if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
editsSkipped++;
continue;
}
@@ -435,7 +441,7 @@ public class WALSplitter {
finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
}
- static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+ private static void finishSplitLogFile(Path rootdir, Path oldLogDir,
Path logPath, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
@@ -509,12 +515,13 @@ public class WALSplitter {
* @param fs
* @param logEntry
* @param rootDir HBase root dir.
+ * @param fileBeingSplit the file being split currently. Used to generate tmp file name.
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
@SuppressWarnings("deprecation")
- static Path getRegionSplitEditsPath(final FileSystem fs,
- final Entry logEntry, final Path rootDir, boolean isCreate)
+ private static Path getRegionSplitEditsPath(final FileSystem fs,
+ final Entry logEntry, final Path rootDir, FileStatus fileBeingSplit)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
@@ -542,17 +549,18 @@ public class WALSplitter {
}
}
- if (isCreate && !fs.exists(dir)) {
- if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
+ if (!fs.exists(dir) && !fs.mkdirs(dir)) {
+ LOG.warn("mkdir failed on " + dir);
}
+ // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
- String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
- fileName = getTmpRecoveredEditsFileName(fileName);
+ String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
+ fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileBeingSplit.getPath().getName());
return new Path(dir, fileName);
}
- static String getTmpRecoveredEditsFileName(String fileName) {
+ private static String getTmpRecoveredEditsFileName(String fileName) {
return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
}
@@ -564,12 +572,13 @@ public class WALSplitter {
* @param maximumEditLogSeqNum
* @return dstPath take file's last edit log seq num as the name
*/
- static Path getCompletedRecoveredEditsFilePath(Path srcPath,
- Long maximumEditLogSeqNum) {
+ private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
+ long maximumEditLogSeqNum) {
String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
return new Path(srcPath.getParent(), fileName);
}
+ @VisibleForTesting
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
@@ -1175,9 +1184,9 @@ public class WALSplitter {
synchronized (regionMaximumEditLogSeqNum) {
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
.getEncodedRegionName());
- if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
+ if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
- .getLogSeqNum());
+ .getSequenceId());
}
}
}
@@ -1296,6 +1305,39 @@ public class WALSplitter {
return splits;
}
+ // delete the one with fewer wal entries
+ private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
+ long dstMinLogSeqNum = -1L;
+ try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
+ WAL.Entry entry = reader.next();
+ if (entry != null) {
+ dstMinLogSeqNum = entry.getKey().getSequenceId();
+ }
+ } catch (EOFException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?",
+ e);
+ }
+ }
+ if (wap.minLogSeqNum < dstMinLogSeqNum) {
+ LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ + fs.getFileStatus(dst).getLen());
+ if (!fs.delete(dst, false)) {
+ LOG.warn("Failed deleting of old " + dst);
+ throw new IOException("Failed deleting of old " + dst);
+ }
+ } else {
+ LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
+ + ", length=" + fs.getFileStatus(wap.p).getLen());
+ if (!fs.delete(wap.p, false)) {
+ LOG.warn("Failed deleting of " + wap.p);
+ throw new IOException("Failed deleting of " + wap.p);
+ }
+ }
+ }
+
/**
* Close all of the output streams.
* @return the list of paths written.
@@ -1351,13 +1393,7 @@ public class WALSplitter {
regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
try {
if (!dst.equals(wap.p) && fs.exists(dst)) {
- LOG.warn("Found existing old edits file. It could be the "
- + "result of a previous failed split attempt. Deleting " + dst + ", length="
- + fs.getFileStatus(dst).getLen());
- if (!fs.delete(dst, false)) {
- LOG.warn("Failed deleting of old " + dst);
- throw new IOException("Failed deleting of old " + dst);
- }
+ deleteOneWithFewerEntries(wap, dst);
}
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
@@ -1482,7 +1518,7 @@ public class WALSplitter {
* @return a path with a write for that path. caller should close.
*/
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
- Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
+ Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit);
if (regionedits == null) {
return null;
}
@@ -1496,7 +1532,7 @@ public class WALSplitter {
}
Writer w = createWriter(regionedits);
LOG.debug("Creating writer path=" + regionedits);
- return new WriterAndPath(regionedits, w);
+ return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
}
private void filterCellByStore(Entry logEntry) {
@@ -1516,7 +1552,7 @@ public class WALSplitter {
Long maxSeqId = maxSeqIdInStores.get(family);
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
// or the master was crashed before and we can not get the information.
- if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) {
+ if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
keptCells.add(cell);
}
}
@@ -1623,10 +1659,12 @@ public class WALSplitter {
private final static class WriterAndPath extends SinkWriter {
final Path p;
final Writer w;
+ final long minLogSeqNum;
- WriterAndPath(final Path p, final Writer w) {
+ WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
this.p = p;
this.w = w;
+ this.minLogSeqNum = minLogSeqNum;
}
}
@@ -1819,7 +1857,7 @@ public class WALSplitter {
}
if (maxStoreSequenceIds != null) {
Long maxStoreSeqId = maxStoreSequenceIds.get(family);
- if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
+ if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getSequenceId()) {
// skip current kv if column family doesn't exist anymore or already flushed
skippedCells.add(cell);
continue;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d2ba8750/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 40e5baa..dbc06ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -1034,6 +1035,56 @@ public class TestWALReplay {
assertEquals(result.size(), region2.get(g).size());
}
+ /**
+ * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
+ */
+ private void testNameConflictWhenSplit(boolean largeFirst) throws IOException {
+ final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+ final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
+ deleteDir(basedir);
+
+ final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+ HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+ HBaseTestingUtility.closeRegionAndWAL(region);
+ final byte[] family = htd.getColumnFamilies()[0].getName();
+ final byte[] rowName = tableName.getName();
+ FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1);
+ FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2);
+
+ Path largeFile = new Path(logDir, "wal-1");
+ Path smallFile = new Path(logDir, "wal-2");
+ writerWALFile(largeFile, Arrays.asList(entry1, entry2));
+ writerWALFile(smallFile, Arrays.asList(entry2));
+ FileStatus first, second;
+ if (largeFirst) {
+ first = fs.getFileStatus(largeFile);
+ second = fs.getFileStatus(smallFile);
+ } else {
+ first = fs.getFileStatus(smallFile);
+ second = fs.getFileStatus(largeFile);
+ }
+ WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null,
+ RecoveryMode.LOG_SPLITTING, wals);
+ WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null,
+ RecoveryMode.LOG_SPLITTING, wals);
+ WAL wal = createWAL(this.conf);
+ region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
+ assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
+ assertEquals(2, region.get(new Get(rowName)).size());
+ }
+
+ @Test
+ public void testNameConflictWhenSplit0() throws IOException {
+ testNameConflictWhenSplit(true);
+ }
+
+ @Test
+ public void testNameConflictWhenSplit1() throws IOException {
+ testNameConflictWhenSplit(false);
+ }
+
static class MockWAL extends FSHLog {
boolean doCompleteCacheFlush = false;
@@ -1102,27 +1153,42 @@ public class TestWALReplay {
}
}
+ private WALKey createWALKey(final TableName tableName, final HRegionInfo hri,
+ final MultiVersionConcurrencyControl mvcc) {
+ return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc);
+ }
+
+ private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
+ int index) {
+ byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
+ byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
+ return edit;
+ }
+
+ private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
+ byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
+ int index) throws IOException {
+ FSWALEntry entry =
+ new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit(
+ rowName, family, ee, index), htd, hri, true);
+ entry.stampRegionSequenceId();
+ return entry;
+ }
+
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc)
- throws IOException {
- String familyStr = Bytes.toString(family);
+ final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
for (int j = 0; j < count; j++) {
- byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
- byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
- WALEdit edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, qualifierBytes,
- ee.currentTime(), columnBytes));
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,999, mvcc),
- edit, true);
+ wal.append(htd, hri, createWALKey(tableName, hri, mvcc),
+ createWALEdit(rowName, family, ee, j), true);
}
wal.sync();
}
- static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
- final int count, EnvironmentEdge ee, final Region r,
- final String qualifierPrefix)
- throws IOException {
+ static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
+ EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
List<Put> puts = new ArrayList<Put>();
for (int j = 0; j < count; j++) {
byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
@@ -1183,4 +1249,15 @@ public class TestWALReplay {
htd.addFamily(c);
return htd;
}
+
+ private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException {
+ fs.mkdirs(file.getParent());
+ ProtobufLogWriter writer = new ProtobufLogWriter();
+ writer.init(fs, file, conf, true);
+ for (FSWALEntry entry : entries) {
+ writer.append(entry);
+ }
+ writer.sync();
+ writer.close();
+ }
}