You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/03/26 18:32:20 UTC

svn commit: r1581957 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Author: enis
Date: Wed Mar 26 17:32:20 2014
New Revision: 1581957

URL: http://svn.apache.org/r1581957
Log:
HBASE-10829 Flush is skipped after log replay if the last recovered edits file is skipped

Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1581957&r1=1581956&r2=1581957&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Mar 26 17:32:20 2014
@@ -2899,7 +2899,8 @@ public class HRegion implements HeapSize
       }
 
       try {
-        seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
+        // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
+        seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
       } catch (IOException e) {
         boolean skipErrors = conf.getBoolean(
             HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
@@ -3021,6 +3022,7 @@ public class HRegion implements HeapSize
           if (firstSeqIdInLog == -1) {
             firstSeqIdInLog = key.getLogSeqNum();
           }
+          currentEditSeqId = key.getLogSeqNum();
           boolean flush = false;
           for (KeyValue kv: val.getKeyValues()) {
             // Check this edit is for me. Also, guard against writing the special
@@ -3055,7 +3057,6 @@ public class HRegion implements HeapSize
               skippedEdits++;
               continue;
             }
-            currentEditSeqId = key.getLogSeqNum();
             // Once we are over the limit, restoreEdit will keep returning true to
             // flush -- but don't flush until we've played all the kvs that make up
             // the WALEdit.

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1581957&r1=1581956&r2=1581957&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Wed Mar 26 17:32:20 2014
@@ -139,10 +139,11 @@ import org.mockito.Mockito;
 import org.apache.hadoop.hbase.DroppedSnapshotException;
 
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 
 /**
  * Basic stand-alone testing of HRegion.  No clusters!
- * 
+ *
  * A lot of the meta information for an HRegion now lives inside other HRegions
  * or in the HBaseMaster, so only basic testing is possible.
  */
@@ -206,7 +207,7 @@ public class TestHRegion {
    * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
    * in current memstore. The fix is removing all conditions except abort check so we ensure 2
    * flushes for region close."
-   * @throws IOException 
+   * @throws IOException
    */
   @Test (timeout=60000)
   public void testCloseCarryingSnapshot() throws IOException {
@@ -255,6 +256,7 @@ public class TestHRegion {
     // Inject our faulty LocalFileSystem
     conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
     user.runAs(new PrivilegedExceptionAction<Object>() {
+      @Override
       public Object run() throws Exception {
         // Make sure it worked (above is sensitive to caching details in hadoop core)
         FileSystem fs = FileSystem.get(conf);
@@ -521,6 +523,67 @@ public class TestHRegion {
   }
 
   @Test
+  public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
+    String method = "testSkipRecoveredEditsReplayTheLastFileIgnored";
+    TableName tableName = TableName.valueOf(method);
+    byte[] family = Bytes.toBytes("family");
+    this.region = initHRegion(tableName, method, CONF, family);
+    try {
+      Path regiondir = region.getRegionFileSystem().getRegionDir();
+      FileSystem fs = region.getRegionFileSystem().getFileSystem();
+      byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
+
+      assertEquals(0, region.getStoreFileList(
+        region.getStores().keySet().toArray(new byte[0][])).size());
+
+      Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
+
+      long maxSeqId = 1050;
+      long minSeqId = 1000;
+
+      for (long i = minSeqId; i <= maxSeqId; i += 10) {
+        Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
+        fs.create(recoveredEdits);
+        HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
+
+        long time = System.nanoTime();
+        WALEdit edit = null;
+        if (i == maxSeqId) {
+          edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder()
+          .setTableName(ByteString.copyFrom(tableName.getName()))
+          .setFamilyName(ByteString.copyFrom(regionName))
+          .setEncodedRegionName(ByteString.copyFrom(regionName))
+          .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
+          .build());
+        } else {
+          edit = new WALEdit();
+          edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
+            .toBytes(i)));
+        }
+        writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
+            HConstants.DEFAULT_CLUSTER_ID), edit));
+        writer.close();
+      }
+
+      long recoverSeqId = 1030;
+      Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+      MonitoredTask status = TaskMonitor.get().createStatus(method);
+      for (Store store : region.getStores().values()) {
+        maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
+      }
+      long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
+      assertEquals(maxSeqId, seqId);
+
+      // assert that the files are flushed
+      assertEquals(1, region.getStoreFileList(
+        region.getStores().keySet().toArray(new byte[0][])).size());
+
+    } finally {
+      HRegion.closeHRegion(this.region);
+      this.region = null;
+    }  }
+
+  @Test
   public void testRecoveredEditsReplayCompaction() throws Exception {
     String method = name.getMethodName();
     TableName tableName = TableName.valueOf(method);