You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/03/26 18:26:45 UTC
svn commit: r1581954 - in /hbase/branches/0.98/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Author: enis
Date: Wed Mar 26 17:26:44 2014
New Revision: 1581954
URL: http://svn.apache.org/r1581954
Log:
HBASE-10829 Flush is skipped after log replay if the last recovered edits file is skipped
Modified:
hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1581954&r1=1581953&r2=1581954&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Mar 26 17:26:44 2014
@@ -3076,7 +3076,8 @@ public class HRegion implements HeapSize
}
try {
- seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
+ // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
+ seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
} catch (IOException e) {
boolean skipErrors = conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
@@ -3203,6 +3204,7 @@ public class HRegion implements HeapSize
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
+ currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
@@ -3237,7 +3239,6 @@ public class HRegion implements HeapSize
skippedEdits++;
continue;
}
- currentEditSeqId = key.getLogSeqNum();
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1581954&r1=1581953&r2=1581954&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Mar 26 17:26:44 2014
@@ -140,10 +140,11 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
/**
* Basic stand-alone testing of HRegion. No clusters!
- *
+ *
* A lot of the meta information for an HRegion now lives inside other HRegions
* or in the HBaseMaster, so only basic testing is possible.
*/
@@ -209,7 +210,7 @@ public class TestHRegion {
* case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
* in current memstore. The fix is removing all conditions except abort check so we ensure 2
* flushes for region close."
- * @throws IOException
+ * @throws IOException
*/
@Test (timeout=60000)
public void testCloseCarryingSnapshot() throws IOException {
@@ -258,6 +259,7 @@ public class TestHRegion {
// Inject our faulty LocalFileSystem
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
user.runAs(new PrivilegedExceptionAction<Object>() {
+ @Override
public Object run() throws Exception {
// Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf);
@@ -524,6 +526,67 @@ public class TestHRegion {
}
@Test
+ public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
+ String method = "testSkipRecoveredEditsReplayTheLastFileIgnored";
+ TableName tableName = TableName.valueOf(method);
+ byte[] family = Bytes.toBytes("family");
+ this.region = initHRegion(tableName, method, CONF, family);
+ try {
+ Path regiondir = region.getRegionFileSystem().getRegionDir();
+ FileSystem fs = region.getRegionFileSystem().getFileSystem();
+ byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+ assertEquals(0, region.getStoreFileList(
+ region.getStores().keySet().toArray(new byte[0][])).size());
+
+ Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+
+ long maxSeqId = 1050;
+ long minSeqId = 1000;
+
+ for (long i = minSeqId; i <= maxSeqId; i += 10) {
+ Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
+ fs.create(recoveredEdits);
+ HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
+
+ long time = System.nanoTime();
+ WALEdit edit = null;
+ if (i == maxSeqId) {
+ edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder()
+ .setTableName(ByteString.copyFrom(tableName.getName()))
+ .setFamilyName(ByteString.copyFrom(regionName))
+ .setEncodedRegionName(ByteString.copyFrom(regionName))
+ .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
+ .build());
+ } else {
+ edit = new WALEdit();
+ edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
+ .toBytes(i)));
+ }
+ writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
+ HConstants.DEFAULT_CLUSTER_ID), edit));
+ writer.close();
+ }
+
+ long recoverSeqId = 1030;
+ Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ MonitoredTask status = TaskMonitor.get().createStatus(method);
+ for (Store store : region.getStores().values()) {
+ maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
+ }
+ long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
+ assertEquals(maxSeqId, seqId);
+
+ // assert that the files are flushed
+ assertEquals(1, region.getStoreFileList(
+ region.getStores().keySet().toArray(new byte[0][])).size());
+
+ } finally {
+ HRegion.closeHRegion(this.region);
+ this.region = null;
+ } }
+
+ @Test
public void testRecoveredEditsReplayCompaction() throws Exception {
String method = name.getMethodName();
TableName tableName = TableName.valueOf(method);