You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/04/07 17:30:19 UTC
svn commit: r1310787 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Author: tedyu
Date: Sat Apr 7 15:30:19 2012
New Revision: 1310787
URL: http://svn.apache.org/viewvc?rev=1310787&view=rev
Log:
HBASE-5689 Skipping RecoveredEdits may cause data loss (Chunhui)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1310787&r1=1310786&r2=1310787&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Apr 7 15:30:19 2012
@@ -2697,7 +2697,6 @@ public class HRegion implements HeapSize
long seqid = minSeqId;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
if (files == null || files.isEmpty()) return seqid;
- boolean checkSafeToSkip = true;
for (Path edits: files) {
if (edits == null || !this.fs.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
@@ -2705,22 +2704,15 @@ public class HRegion implements HeapSize
}
if (isZeroLengthThenDelete(this.fs, edits)) continue;
- if (checkSafeToSkip) {
- Path higher = files.higher(edits);
- long maxSeqId = Long.MAX_VALUE;
- if (higher != null) {
- // Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+"
- String fileName = higher.getName();
- maxSeqId = Math.abs(Long.parseLong(fileName));
- }
- if (maxSeqId <= minSeqId) {
- String msg = "Maximum possible sequenceid for this log is " + maxSeqId
- + ", skipped the whole file, path=" + edits;
- LOG.debug(msg);
- continue;
- } else {
- checkSafeToSkip = false;
- }
+ long maxSeqId = Long.MAX_VALUE;
+ String fileName = edits.getName();
+ maxSeqId = Math.abs(Long.parseLong(fileName));
+ if (maxSeqId <= minSeqId) {
+ String msg = "Maximum sequenceid for this log is " + maxSeqId
+ + " and minimum sequenceid for the region is " + minSeqId
+ + ", skipped the whole file, path=" + edits;
+ LOG.debug(msg);
+ continue;
}
try {
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1310787&r1=1310786&r2=1310787&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sat Apr 7 15:30:19 2012
@@ -424,6 +424,7 @@ public class HLogSplitter {
}
}
wap.w.append(entry);
+ outputSink.updateRegionMaximumEditLogSeqNum(entry);
editsCount++;
// If sufficient edits have passed OR we've opened a few files, check if
// we should report progress.
@@ -453,7 +454,8 @@ public class HLogSplitter {
throw e;
} finally {
int n = 0;
- for (Object o : logWriters.values()) {
+ for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
+ Object o = logWritersEntry.getValue();
long t1 = EnvironmentEdgeManager.currentTimeMillis();
if ((t1 - last_report_at) > period) {
last_report_at = t;
@@ -469,7 +471,8 @@ public class HLogSplitter {
WriterAndPath wap = (WriterAndPath)o;
wap.w.close();
LOG.debug("Closed " + wap.p);
- Path dst = getCompletedRecoveredEditsFilePath(wap.p);
+ Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink
+ .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
if (!dst.equals(wap.p) && fs.exists(dst)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + dst
@@ -486,6 +489,7 @@ public class HLogSplitter {
if (!fs.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
+ LOG.debug("Rename " + wap.p + " to " + dst);
}
}
String msg = "Processed " + editsCount + " edits across " + n + " regions" +
@@ -679,16 +683,16 @@ public class HLogSplitter {
}
/**
- * Convert path to a file under RECOVERED_EDITS_DIR directory without
- * RECOVERED_LOG_TMPFILE_SUFFIX
+ * Get the completed recovered edits file path, renaming it to be by last edit
+ * in the file from its first edit. Then we could use the name to skip
+ * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
* @param srcPath
- * @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX
+ * @param maximumEditLogSeqNum
+ * @return dstPath take file's last edit log seq num as the name
*/
- static Path getCompletedRecoveredEditsFilePath(Path srcPath) {
- String fileName = srcPath.getName();
- if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
- fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0];
- }
+ static Path getCompletedRecoveredEditsFilePath(Path srcPath,
+ Long maximumEditLogSeqNum) {
+ String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
return new Path(srcPath.getParent(), fileName);
}
@@ -1025,6 +1029,7 @@ public class HLogSplitter {
}
}
+
private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) {
@@ -1048,6 +1053,7 @@ public class HLogSplitter {
}
}
wap.w.append(logEntry);
+ outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
editsCount++;
}
// Pass along summary statistics
@@ -1136,6 +1142,8 @@ public class HLogSplitter {
class OutputSink {
private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
+ private final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
+ .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
private final List<WriterThread> writerThreads = Lists.newArrayList();
/* Set of regions which we've decided should not output edits */
@@ -1202,8 +1210,11 @@ public class HLogSplitter {
List<Path> paths = new ArrayList<Path>();
List<IOException> thrown = Lists.newArrayList();
closeLogWriters(thrown);
- for (WriterAndPath wap : logWriters.values()) {
- Path dst = getCompletedRecoveredEditsFilePath(wap.p);
+ for (Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters
+ .entrySet()) {
+ WriterAndPath wap = logWritersEntry.getValue();
+ Path dst = getCompletedRecoveredEditsFilePath(wap.p,
+ regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
try {
if (!dst.equals(wap.p) && fs.exists(dst)) {
LOG.warn("Found existing old edits file. It could be the "
@@ -1221,6 +1232,7 @@ public class HLogSplitter {
if (!fs.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
+ LOG.debug("Rename " + wap.p + " to " + dst);
}
} catch (IOException ioe) {
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
@@ -1288,6 +1300,18 @@ public class HLogSplitter {
}
/**
+ * Update region's maximum edit log SeqNum.
+ */
+ void updateRegionMaximumEditLogSeqNum(Entry entry) {
+ regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(),
+ entry.getKey().getLogSeqNum());
+ }
+
+ Long getRegionMaximumEditLogSeqNum(byte[] region) {
+ return regionMaximumEditLogSeqNum.get(region);
+ }
+
+ /**
* @return a map from encoded region ID to the number of edits written out
* for that region.
*/
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1310787&r1=1310786&r2=1310787&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat Apr 7 15:30:19 2012
@@ -54,9 +54,11 @@ import org.apache.hadoop.hbase.Multithre
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
@@ -67,6 +69,7 @@ import org.apache.hadoop.hbase.filter.Nu
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
@@ -136,6 +139,95 @@ public class TestHRegion extends HBaseTe
SchemaMetrics.validateMetricChanges(startingMetrics);
}
+ public void testDataCorrectnessReplayingRecoveredEdits() throws Exception {
+ final int NUM_MASTERS = 1;
+ final int NUM_RS = 3;
+ TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+
+ try {
+ final byte[] TABLENAME = Bytes
+ .toBytes("testDataCorrectnessReplayingRecoveredEdits");
+ final byte[] FAMILY = Bytes.toBytes("family");
+ MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+ HMaster master = cluster.getMaster();
+
+ // Create table
+ HTableDescriptor desc = new HTableDescriptor(TABLENAME);
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ HBaseAdmin hbaseAdmin = TEST_UTIL.getHBaseAdmin();
+ hbaseAdmin.createTable(desc);
+
+ assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
+
+ // Put data: r1->v1
+ HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
+ putDataAndVerify(table, "r1", FAMILY, "v1", 1);
+
+ // Move region to target server
+ HRegionInfo regionInfo = table.getRegionLocation("r1").getRegionInfo();
+ int originServerNum = cluster.getServerWith(regionInfo.getRegionName());
+ HRegionServer originServer = cluster.getRegionServer(originServerNum);
+ int targetServerNum = NUM_RS - 1 - originServerNum;
+ HRegionServer targetServer = cluster.getRegionServer(targetServerNum);
+ hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
+ Bytes.toBytes(targetServer.getServerName().getServerName()));
+ do {
+ Thread.sleep(1);
+ } while (cluster.getServerWith(regionInfo.getRegionName()) == originServerNum);
+
+ // Put data: r2->v2
+ putDataAndVerify(table, "r2", FAMILY, "v2", 2);
+
+ // Move region to origin server
+ hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
+ Bytes.toBytes(originServer.getServerName().getServerName()));
+ do {
+ Thread.sleep(1);
+ } while (cluster.getServerWith(regionInfo.getRegionName()) == targetServerNum);
+
+ // Put data: r3->v3
+ putDataAndVerify(table, "r3", FAMILY, "v3", 3);
+
+ // Kill target server
+ targetServer.kill();
+ cluster.getRegionServerThreads().get(targetServerNum).join();
+ // Wait until finish processing of shutdown
+ while (master.getServerManager().areDeadServersInProgress()) {
+ Thread.sleep(5);
+ }
+ // Kill origin server
+ originServer.kill();
+ cluster.getRegionServerThreads().get(originServerNum).join();
+
+ // Put data: r4->v4
+ putDataAndVerify(table, "r4", FAMILY, "v4", 4);
+
+ } finally {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+ }
+
+ private void putDataAndVerify(HTable table, String row, byte[] family,
+ String value, int verifyNum) throws IOException {
+ System.out.println("=========Putting data :" + row);
+ Put put = new Put(Bytes.toBytes(row));
+ put.add(family, Bytes.toBytes("q1"), Bytes.toBytes(value));
+ table.put(put);
+ ResultScanner resultScanner = table.getScanner(new Scan());
+ List<Result> results = new ArrayList<Result>();
+ while (true) {
+ Result r = resultScanner.next();
+ if (r == null)
+ break;
+ results.add(r);
+ }
+ resultScanner.close();
+ if (results.size() != verifyNum) {
+ System.out.println(results);
+ }
+ assertEquals(verifyNum, results.size());
+ }
+
//////////////////////////////////////////////////////////////////////////////
// New tests that doesn't spin up a mini cluster but rather just test the
// individual code pieces in the HRegion. Putting files locally in