You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/04/21 02:27:35 UTC

svn commit: r1470253 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/wal/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: tedyu
Date: Sun Apr 21 00:27:35 2013
New Revision: 1470253

URL: http://svn.apache.org/r1470253
Log:
HBASE-8354 Backport HBASE-7878 'recoverFileLease does not check return value of recoverLease' to 0.94 (Liang Xie)


Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Sun Apr 21 00:27:35 2013
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -177,6 +178,20 @@ public class HLogSplitter {
    */
   public List<Path> splitLog()
       throws IOException {
+    return splitLog((CountDownLatch) null);
+  }
+  
+  /**
+   * Split up a bunch of regionserver commit log files that are no longer being
+   * written to, into new files, one per region for region to replay on startup.
+   * Delete the old log files when finished.
+   *
+   * @param latch
+   * @throws IOException will throw if corrupted hlogs aren't tolerated
+   * @return the list of splits
+   */
+  public List<Path> splitLog(CountDownLatch latch)
+      throws IOException {
     Preconditions.checkState(!hasSplit,
         "An HLogSplitter instance may only be used once");
     hasSplit = true;
@@ -200,7 +215,7 @@ public class HLogSplitter {
     }
     logAndReport("Splitting " + logfiles.length + " hlog(s) in "
     + srcDir.toString());
-    splits = splitLog(logfiles);
+    splits = splitLog(logfiles, latch);
 
     splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
     String msg = "hlog file splitting completed in " + splitTime +
@@ -260,7 +275,8 @@ public class HLogSplitter {
    * After the process is complete, the log files are archived to a separate
    * directory.
    */
-  private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
+  private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
+      throws IOException {
     List<Path> processedLogs = new ArrayList<Path>();
     List<Path> corruptedLogs = new ArrayList<Path>();
     List<Path> splits = null;
@@ -307,10 +323,19 @@ public class HLogSplitter {
       }
       status.setStatus("Log splits complete. Checking for orphaned logs.");
       
-      if (fs.listStatus(srcDir).length > processedLogs.size()
+      if (latch != null) {
+        try {
+          latch.await();
+        } catch (InterruptedException ie) {
+          LOG.warn("wait for latch interrupted");
+          Thread.currentThread().interrupt();
+        }
+      }
+      FileStatus[] currFiles = fs.listStatus(srcDir);
+      if (currFiles.length > processedLogs.size()
           + corruptedLogs.size()) {
         throw new OrphanHLogAfterSplitException(
-            "Discovered orphan hlog after split. Maybe the "
+          "Discovered orphan hlog after split. Maybe the "
             + "HRegionServer was not dead when we started");
       }
     } finally {

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java Sun Apr 21 00:27:35 2013
@@ -51,6 +51,8 @@ public class FSHDFSUtils extends FSUtils
    */
   public static final long LEASE_SOFTLIMIT_PERIOD = 60 * 1000;
 
+  public static final String TEST_TRIGGER_DFS_APPEND = "hbase.test.trigger.dfs.append";
+
   @Override
   public void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
   throws IOException{
@@ -68,26 +70,37 @@ public class FSHDFSUtils extends FSUtils
 
     // Trying recovery
     boolean recovered = false;
+    long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 300000);
+    // conf parameter passed from unit test, indicating whether fs.append() should be triggered
+    boolean triggerAppend = conf.getBoolean(TEST_TRIGGER_DFS_APPEND, false);
+    Exception ex = null;
     while (!recovered) {
       try {
         try {
-          if (fs instanceof DistributedFileSystem) {
-            DistributedFileSystem dfs = (DistributedFileSystem)fs;
-            DistributedFileSystem.class.getMethod("recoverLease",
-              new Class[] {Path.class}).invoke(dfs, p);
-          } else {
-            throw new Exception("Not a DistributedFileSystem");
+          DistributedFileSystem dfs = (DistributedFileSystem) fs;
+          if (triggerAppend) throw new IOException();
+          try {
+            recovered = (Boolean) DistributedFileSystem.class.getMethod(
+              "recoverLease", new Class[] { Path.class }).invoke(dfs, p);
+            if (!recovered) LOG.debug("recoverLease returned false");
+          } catch (InvocationTargetException ite) {
+            // function was properly called, but threw it's own exception
+            throw (IOException) ite.getCause();
           }
-        } catch (InvocationTargetException ite) {
-          // function was properly called, but threw it's own exception
-          throw (IOException) ite.getCause();
         } catch (Exception e) {
           LOG.debug("Failed fs.recoverLease invocation, " + e.toString() +
             ", trying fs.append instead");
+          ex = e;
+        }
+        if (ex != null || System.currentTimeMillis() - startWaiting > recoveryTimeout) {
+          LOG.debug("trying fs.append for " + p + " with " + ex);
+          ex = null; // assume the following append() call would succeed
           FSDataOutputStream out = fs.append(p);
           out.close();
+          recovered = true;
+          LOG.debug("fs.append passed");
         }
-        recovered = true;
+        if (recovered) break;
       } catch (IOException e) {
         e = RemoteExceptionHandler.checkIOException(e);
         if (e instanceof AlreadyBeingCreatedException) {
@@ -111,9 +124,9 @@ public class FSHDFSUtils extends FSUtils
       }
       try {
         Thread.sleep(1000);
-      } catch (InterruptedException ex) {
+      } catch (InterruptedException ie) {
         InterruptedIOException iioe = new InterruptedIOException();
-        iioe.initCause(ex);
+        iioe.initCause(ie);
         throw iioe;
       }
     }

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Sun Apr 21 00:27:35 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -101,6 +102,7 @@ public class TestHLog  {
     // Make block sizes small.
     TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
     // needed for testAppendClose()
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
     // quicker heartbeat interval for faster DN death notification
     TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
@@ -370,17 +372,29 @@ public class TestHLog  {
     }
   }
   
-  // For this test to pass, requires:
-  // 1. HDFS-200 (append support)
-  // 2. HDFS-988 (SafeMode should freeze file operations
-  //              [FSNamesystem.nextGenerationStampForBlock])
-  // 3. HDFS-142 (on restart, maintain pendingCreates)
+  /*
+   * We pass different values to recoverFileLease() so that different code paths are covered
+   * 
+   * For this test to pass, requires:
+   * 1. HDFS-200 (append support)
+   * 2. HDFS-988 (SafeMode should freeze file operations
+   *              [FSNamesystem.nextGenerationStampForBlock])
+   * 3. HDFS-142 (on restart, maintain pendingCreates)
+   */
   @Test
   public void testAppendClose() throws Exception {
+    testAppendClose(true);
+    testAppendClose(false);
+  }
+
+  /*
+   * @param triggerDirectAppend whether to trigger direct call of fs.append()
+   */
+  public void testAppendClose(final boolean triggerDirectAppend) throws Exception {
     byte [] tableName = Bytes.toBytes(getName());
     HRegionInfo regioninfo = new HRegionInfo(tableName,
              HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
-    Path subdir = new Path(dir, "hlogdir");
+    Path subdir = new Path(dir, "hlogdir" + triggerDirectAppend);
     Path archdir = new Path(dir, "hlogdir_archive");
     HLog wal = new HLog(fs, subdir, archdir, conf);
     final int total = 20;
@@ -454,6 +468,7 @@ public class TestHLog  {
       public Exception exception = null;
       public void run() {
           try {
+            rlConf.setBoolean(FSHDFSUtils.TEST_TRIGGER_DFS_APPEND, triggerDirectAppend);
             FSUtils.getInstance(fs, rlConf)
               .recoverFileLease(recoveredFs, walPath, rlConf);
           } catch (IOException e) {

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1470253&r1=1470252&r2=1470253&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Sun Apr 21 00:27:35 2013
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -119,12 +120,11 @@ public class TestHLogSplit {
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
-    TEST_UTIL.getConfiguration().
-            setStrings("hbase.rootdir", hbaseDir.toString());
-    TEST_UTIL.getConfiguration().
-            setClass("hbase.regionserver.hlog.writer.impl",
-                InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
-
+    TEST_UTIL.getConfiguration().setStrings("hbase.rootdir", hbaseDir.toString());
+    TEST_UTIL.getConfiguration().setClass("hbase.regionserver.hlog.writer.impl",
+      InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
     TEST_UTIL.startMiniDFSCluster(2);
   }
 
@@ -189,11 +189,12 @@ public class TestHLogSplit {
 
     generateHLogs(-1);
 
+    CountDownLatch latch = new CountDownLatch(1);
     try {
-    (new ZombieNewLogWriterRegionServer(stop)).start();
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        hbaseDir, hlogDir, oldLogDir, fs);
-    logSplitter.splitLog();
+      (new ZombieNewLogWriterRegionServer(latch, stop)).start();
+      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir,
+        fs);
+      logSplitter.splitLog(latch);
     } finally {
       stop.set(true);
     }
@@ -553,16 +554,23 @@ public class TestHLogSplit {
     AtomicBoolean stop = new AtomicBoolean(false);
     generateHLogs(-1);
     fs.initialize(fs.getUri(), conf);
-    Thread zombie = new ZombieNewLogWriterRegionServer(stop);
+    CountDownLatch latch = new CountDownLatch(1);
+    Thread zombie = new ZombieNewLogWriterRegionServer(latch, stop);
 
+    List<Path> splits = null;
     try {
       zombie.start();
       try {
         HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
             hbaseDir, hlogDir, oldLogDir, fs);
-        logSplitter.splitLog();
-      } catch (IOException ex) {/* expected */}
-      int logFilesNumber = fs.listStatus(hlogDir).length;
+        splits = logSplitter.splitLog(latch);
+      } catch (IOException ex) {
+        /* expected */
+        LOG.warn("testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted", ex);
+      }
+      FileStatus[] files = fs.listStatus(hlogDir);
+      if (files == null) fail("no files in " + hlogDir + " with splits " + splits);
+      int logFilesNumber = files.length;
 
       assertEquals("Log files should not be archived if there's an extra file after split",
               NUM_WRITERS + 1, logFilesNumber);
@@ -959,8 +967,10 @@ public class TestHLogSplit {
    */
   class ZombieNewLogWriterRegionServer extends Thread {
     AtomicBoolean stop;
-    public ZombieNewLogWriterRegionServer(AtomicBoolean stop) {
+    CountDownLatch latch;
+    public ZombieNewLogWriterRegionServer(CountDownLatch latch, AtomicBoolean stop) {
       super("ZombieNewLogWriterRegionServer");
+      this.latch = latch;
       this.stop = stop;
     }
 
@@ -977,7 +987,7 @@ public class TestHLogSplit {
       try {
 
         while (!fs.exists(recoveredEdits) && !stop.get()) {
-          flushToConsole("Juliet: split not started, sleeping a bit...");
+          LOG.info("Juliet: split not started, sleeping a bit...");
           Threads.sleep(10);
         }
  
@@ -987,8 +997,10 @@ public class TestHLogSplit {
         appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
             ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
         writer.close();
-        flushToConsole("Juliet file creator: created file " + julietLog);
+        LOG.info("Juliet file creator: created file " + julietLog);
+        latch.countDown();
       } catch (IOException e1) {
+        LOG.error("Failed to create file " + julietLog, e1);
         assertTrue("Failed to create file " + julietLog, false);
       }
     }
@@ -1192,7 +1204,7 @@ public class TestHLogSplit {
       }
       if (i != leaveOpen) {
         writer[i].close();
-        flushToConsole("Closing writer " + i);
+        LOG.info("Closing writer " + i);
       }
     }
   }
@@ -1305,8 +1317,9 @@ public class TestHLogSplit {
                           byte[] row, byte[] family, byte[] qualifier,
                           byte[] value, long seq)
           throws IOException {
-
+    LOG.info(Thread.currentThread().getName() + " append");
     writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
+    LOG.info(Thread.currentThread().getName() + " sync");
     writer.sync();
     return seq;
   }