You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/04/07 17:31:00 UTC

svn commit: r1310788 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Author: tedyu
Date: Sat Apr  7 15:30:59 2012
New Revision: 1310788

URL: http://svn.apache.org/viewvc?rev=1310788&view=rev
Log:
HBASE-5689 Skipping RecoveredEdits may cause data loss (Chunhui)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1310788&r1=1310787&r2=1310788&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Sat Apr  7 15:30:59 2012
@@ -2718,7 +2718,6 @@ public class HRegion implements HeapSize
     long seqid = minSeqId;
     NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
     if (files == null || files.isEmpty()) return seqid;
-    boolean checkSafeToSkip = true;
     for (Path edits: files) {
       if (edits == null || !this.fs.exists(edits)) {
         LOG.warn("Null or non-existent edits file: " + edits);
@@ -2726,22 +2725,15 @@ public class HRegion implements HeapSize
       }
       if (isZeroLengthThenDelete(this.fs, edits)) continue;
 
-      if (checkSafeToSkip) {
-        Path higher = files.higher(edits);
-        long maxSeqId = Long.MAX_VALUE;
-        if (higher != null) {
-          // Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+"
-          String fileName = higher.getName();
-          maxSeqId = Math.abs(Long.parseLong(fileName));
-        }
-        if (maxSeqId <= minSeqId) {
-          String msg = "Maximum possible sequenceid for this log is " + maxSeqId
-              + ", skipped the whole file, path=" + edits;
-          LOG.debug(msg);
-          continue;
-        } else {
-          checkSafeToSkip = false;
-        }
+      long maxSeqId = Long.MAX_VALUE;
+      String fileName = edits.getName();
+      maxSeqId = Math.abs(Long.parseLong(fileName));
+      if (maxSeqId <= minSeqId) {
+        String msg = "Maximum sequenceid for this log is " + maxSeqId
+            + " and minimum sequenceid for the region is " + minSeqId
+            + ", skipped the whole file, path=" + edits;
+        LOG.debug(msg);
+        continue;
       }
 
       try {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1310788&r1=1310787&r2=1310788&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sat Apr  7 15:30:59 2012
@@ -426,6 +426,7 @@ public class HLogSplitter {
           }
         }
         wap.w.append(entry);
+        outputSink.updateRegionMaximumEditLogSeqNum(entry);
         editsCount++;
         // If sufficient edits have passed OR we've opened a few files, check if
         // we should report progress.
@@ -455,7 +456,8 @@ public class HLogSplitter {
       throw e;
     } finally {
       int n = 0;
-      for (Object o : logWriters.values()) {
+      for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
+        Object o = logWritersEntry.getValue();
         long t1 = EnvironmentEdgeManager.currentTimeMillis();
         if ((t1 - last_report_at) > period) {
           last_report_at = t;
@@ -471,7 +473,8 @@ public class HLogSplitter {
         WriterAndPath wap = (WriterAndPath)o;
         wap.w.close();
         LOG.debug("Closed " + wap.p);
-        Path dst = getCompletedRecoveredEditsFilePath(wap.p);
+        Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink
+            .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
         if (!dst.equals(wap.p) && fs.exists(dst)) {
           LOG.warn("Found existing old edits file. It could be the "
               + "result of a previous failed split attempt. Deleting " + dst
@@ -488,6 +491,7 @@ public class HLogSplitter {
           if (!fs.rename(wap.p, dst)) {
             throw new IOException("Failed renaming " + wap.p + " to " + dst);
           }
+          LOG.debug("Rename " + wap.p + " to " + dst);
         }
       }
       String msg = "Processed " + editsCount + " edits across " + n + " regions" +
@@ -681,16 +685,16 @@ public class HLogSplitter {
   }
 
   /**
-   * Convert path to a file under RECOVERED_EDITS_DIR directory without
-   * RECOVERED_LOG_TMPFILE_SUFFIX
+   * Get the completed recovered edits file path, renaming it to be by last edit
+   * in the file from its first edit. Then we could use the name to skip
+   * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}.
    * @param srcPath
-   * @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX
+   * @param maximumEditLogSeqNum
+   * @return dstPath take file's last edit log seq num as the name
    */
-  static Path getCompletedRecoveredEditsFilePath(Path srcPath) {
-    String fileName = srcPath.getName();
-    if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
-      fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0];
-    }
+  static Path getCompletedRecoveredEditsFilePath(Path srcPath,
+      Long maximumEditLogSeqNum) {
+    String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
     return new Path(srcPath.getParent(), fileName);
   }
 
@@ -1027,6 +1031,7 @@ public class HLogSplitter {
       }
     }
 
+
     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
       List<Entry> entries = buffer.entryBuffer;
       if (entries.isEmpty()) {
@@ -1050,6 +1055,7 @@ public class HLogSplitter {
             }
           }
           wap.w.append(logEntry);
+          outputSink.updateRegionMaximumEditLogSeqNum(logEntry);
           editsCount++;
         }
         // Pass along summary statistics
@@ -1138,6 +1144,8 @@ public class HLogSplitter {
   class OutputSink {
     private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
           new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
+    private final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
+        .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
     private final List<WriterThread> writerThreads = Lists.newArrayList();
 
     /* Set of regions which we've decided should not output edits */
@@ -1204,8 +1212,11 @@ public class HLogSplitter {
       List<Path> paths = new ArrayList<Path>();
       List<IOException> thrown = Lists.newArrayList();
       closeLogWriters(thrown);
-      for (WriterAndPath wap : logWriters.values()) {
-        Path dst = getCompletedRecoveredEditsFilePath(wap.p);
+      for (Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters
+          .entrySet()) {
+        WriterAndPath wap = logWritersEntry.getValue();
+        Path dst = getCompletedRecoveredEditsFilePath(wap.p,
+            regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
         try {
           if (!dst.equals(wap.p) && fs.exists(dst)) {
             LOG.warn("Found existing old edits file. It could be the "
@@ -1223,6 +1234,7 @@ public class HLogSplitter {
             if (!fs.rename(wap.p, dst)) {
               throw new IOException("Failed renaming " + wap.p + " to " + dst);
             }
+            LOG.debug("Rename " + wap.p + " to " + dst);
           }
         } catch (IOException ioe) {
           LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
@@ -1290,6 +1302,18 @@ public class HLogSplitter {
     }
 
     /**
+     * Update region's maximum edit log SeqNum.
+     */
+    void updateRegionMaximumEditLogSeqNum(Entry entry) {
+      regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(),
+          entry.getKey().getLogSeqNum());
+    }
+
+    Long getRegionMaximumEditLogSeqNum(byte[] region) {
+      return regionMaximumEditLogSeqNum.get(region);
+    }
+
+    /**
      * @return a map from encoded region ID to the number of edits written out
      * for that region.
      */

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1310788&r1=1310787&r2=1310788&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Sat Apr  7 15:30:59 2012
@@ -54,9 +54,11 @@ import org.apache.hadoop.hbase.Multithre
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
@@ -67,6 +69,7 @@ import org.apache.hadoop.hbase.filter.Nu
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
@@ -136,6 +139,95 @@ public class TestHRegion extends HBaseTe
     SchemaMetrics.validateMetricChanges(startingMetrics);
   }
 
+  public void testDataCorrectnessReplayingRecoveredEdits() throws Exception {
+    final int NUM_MASTERS = 1;
+    final int NUM_RS = 3;
+    TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+
+    try {
+      final byte[] TABLENAME = Bytes
+          .toBytes("testDataCorrectnessReplayingRecoveredEdits");
+      final byte[] FAMILY = Bytes.toBytes("family");
+      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
+      HMaster master = cluster.getMaster();
+
+      // Create table
+      HTableDescriptor desc = new HTableDescriptor(TABLENAME);
+      desc.addFamily(new HColumnDescriptor(FAMILY));
+      HBaseAdmin hbaseAdmin = TEST_UTIL.getHBaseAdmin();
+      hbaseAdmin.createTable(desc);
+
+      assertTrue(hbaseAdmin.isTableAvailable(TABLENAME));
+
+      // Put data: r1->v1
+      HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
+      putDataAndVerify(table, "r1", FAMILY, "v1", 1);
+
+      // Move region to target server
+      HRegionInfo regionInfo = table.getRegionLocation("r1").getRegionInfo();
+      int originServerNum = cluster.getServerWith(regionInfo.getRegionName());
+      HRegionServer originServer = cluster.getRegionServer(originServerNum);
+      int targetServerNum = NUM_RS - 1 - originServerNum;
+      HRegionServer targetServer = cluster.getRegionServer(targetServerNum);
+      hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
+          Bytes.toBytes(targetServer.getServerName().getServerName()));
+      do {
+        Thread.sleep(1);
+      } while (cluster.getServerWith(regionInfo.getRegionName()) == originServerNum);
+
+      // Put data: r2->v2
+      putDataAndVerify(table, "r2", FAMILY, "v2", 2);
+
+      // Move region to origin server
+      hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(),
+          Bytes.toBytes(originServer.getServerName().getServerName()));
+      do {
+        Thread.sleep(1);
+      } while (cluster.getServerWith(regionInfo.getRegionName()) == targetServerNum);
+
+      // Put data: r3->v3
+      putDataAndVerify(table, "r3", FAMILY, "v3", 3);
+
+      // Kill target server
+      targetServer.kill();
+      cluster.getRegionServerThreads().get(targetServerNum).join();
+      // Wait until finish processing of shutdown
+      while (master.getServerManager().areDeadServersInProgress()) {
+        Thread.sleep(5);
+      }
+      // Kill origin server
+      originServer.kill();
+      cluster.getRegionServerThreads().get(originServerNum).join();
+
+      // Put data: r4->v4
+      putDataAndVerify(table, "r4", FAMILY, "v4", 4);
+
+    } finally {
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  private void putDataAndVerify(HTable table, String row, byte[] family,
+      String value, int verifyNum) throws IOException {
+    System.out.println("=========Putting data :" + row);
+    Put put = new Put(Bytes.toBytes(row));
+    put.add(family, Bytes.toBytes("q1"), Bytes.toBytes(value));
+    table.put(put);
+    ResultScanner resultScanner = table.getScanner(new Scan());
+    List<Result> results = new ArrayList<Result>();
+    while (true) {
+      Result r = resultScanner.next();
+      if (r == null)
+        break;
+      results.add(r);
+    }
+    resultScanner.close();
+    if (results.size() != verifyNum) {
+      System.out.println(results);
+    }
+    assertEquals(verifyNum, results.size());
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // New tests that doesn't spin up a mini cluster but rather just test the
   // individual code pieces in the HRegion. Putting files locally in