You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2011/10/14 02:40:41 UTC

svn commit: r1183180 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Author: garyh
Date: Fri Oct 14 00:40:41 2011
New Revision: 1183180

URL: http://svn.apache.org/viewvc?rev=1183180&view=rev
Log:
HBASE-4282  RegionServer should abort when WAL close fails with unflushed edits

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1183180&r1=1183179&r2=1183180&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Fri Oct 14 00:40:41 2011
@@ -133,6 +133,7 @@ public class HLog implements Syncable {
   private final String prefix;
   private final AtomicLong unflushedEntries = new AtomicLong(0);
   private volatile long syncedTillHere = 0;
+  private long lastDeferredTxid;
   private final Path oldLogDir;
   private boolean logRollRunning;
 
@@ -196,6 +197,7 @@ public class HLog implements Syncable {
 
   //number of transactions in the current Hlog.
   private final AtomicInteger numEntries = new AtomicInteger(0);
+
   // If live datanode count is lower than the default replicas value,
   // RollWriter will be triggered in each sync(So the RollWriter will be
   // triggered one by one in a short time). Using it as a workaround to slow
@@ -813,9 +815,12 @@ public class HLog implements Syncable {
       } catch (IOException e) {
         LOG.error("Failed close of HLog writer", e);
         int errors = closeErrorCount.incrementAndGet();
-        if (errors <= closeErrorsTolerated) {
+        if (errors <= closeErrorsTolerated && !hasDeferredEntries()) {
           LOG.warn("Riding over HLog close failure! error count="+errors);
         } else {
+          if (hasDeferredEntries()) {
+            LOG.error("Aborting due to unflushed edits in HLog");
+          }
           // Failed close of log file.  Means we're losing edits.  For now,
           // shut ourselves down to minimize loss.  Alternative is to try and
           // keep going.  See HBASE-930.
@@ -990,6 +995,9 @@ public class HLog implements Syncable {
       doWrite(regionInfo, logKey, logEdit, htd);
       txid = this.unflushedEntries.incrementAndGet();
       this.numEntries.incrementAndGet();
+      if (htd.isDeferredLogFlush()) {
+        lastDeferredTxid = txid;
+      }
     }
 
     // Sync if catalog region, and if not then check if that table supports
@@ -1068,6 +1076,9 @@ public class HLog implements Syncable {
         doWrite(info, logKey, edits, htd);
         this.numEntries.incrementAndGet();
         txid = this.unflushedEntries.incrementAndGet();
+        if (htd.isDeferredLogFlush()) {
+          lastDeferredTxid = txid;
+        }
       }
       // Sync if catalog region, and if not then check if that table supports
       // deferred log flushing
@@ -1789,6 +1800,11 @@ public class HLog implements Syncable {
     return coprocessorHost;
   }
 
+  /** Provide access to currently deferred sequence num for tests */
+  boolean hasDeferredEntries() {
+    return lastDeferredTxid > syncedTillHere;
+  }
+
   /**
    * Pass one or more log file names and it will either dump out a text version
    * on <code>stdout</code> or split the specified log files.

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java?rev=1183180&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java Fri Oct 14 00:40:41 2011
@@ -0,0 +1,152 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for conditions that should trigger RegionServer aborts when
+ * rolling the current HLog fails.
+ */
+public class TestLogRollAbort {
+  private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
+  private static MiniDFSCluster dfsCluster;
+  private static HBaseAdmin admin;
+  private static MiniHBaseCluster cluster;
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  // verbose logging on classes that are touched in these tests
+  {
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
+        .getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  // Need to override this setup so we can edit the config before it gets sent
+  // to the HDFS & HBase cluster startup.
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Tweak default timeout values down for faster recovery
+    TEST_UTIL.getConfiguration().setInt(
+        "hbase.regionserver.logroll.errors.tolerated", 2);
+    TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
+    TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
+    TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
+
+    // Increase the amount of time between client retries
+    TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 5 * 1000);
+
+    // make sure log.hflush() calls syncFs() to open a pipeline
+    TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
+    // lower the namenode & datanode heartbeat so the namenode
+    // quickly detects datanode failures
+    TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
+    TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+    // the namenode might still try to choose the recently-dead datanode
+    // for a pipeline, so try to a new pipeline multiple times
+    TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 10);
+    // set periodic sync to 2 min so it doesn't run during test
+    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.optionallogflushinterval",
+        120 * 1000);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(2);
+
+    cluster = TEST_UTIL.getHBaseCluster();
+    dfsCluster = TEST_UTIL.getDFSCluster();
+    admin = TEST_UTIL.getHBaseAdmin();
+
+    // disable region rebalancing (interferes with log watching)
+    cluster.getMaster().balanceSwitch(false);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Tests that RegionServer aborts if we hit an error closing the WAL when
+   * there are unsynced WAL edits.  See HBASE-4282.
+   */
+  @Test
+  public void testRSAbortWithUnflushedEdits() throws Exception {
+    LOG.info("Starting testRSAbortWithUnflushedEdits()");
+
+    // When the META table can be opened, the region servers are running
+    new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
+
+    // Create the test table and open it
+    String tableName = this.getClass().getSimpleName();
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
+    desc.setDeferredLogFlush(true);
+
+    admin.createTable(desc);
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+
+    HRegionServer server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
+    HLog log = server.getWAL();
+
+    assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
+    // don't run this test without append support (HDFS-200 & HDFS-142)
+    assertTrue("Need append support for this test",
+        FSUtils.isAppendSupported(TEST_UTIL.getConfiguration()));
+
+    Put p = new Put(Bytes.toBytes("row2001"));
+    p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2001));
+    table.put(p);
+
+    log.sync();
+
+    p = new Put(Bytes.toBytes("row2002"));
+    p.add(HConstants.CATALOG_FAMILY, Bytes.toBytes("col"), Bytes.toBytes(2002));
+    table.put(p);
+
+    dfsCluster.restartDataNodes();
+    LOG.info("Restarted datanodes");
+
+    assertTrue("Should have an outstanding WAL edit", log.hasDeferredEntries());
+    try {
+      log.rollWriter(true);
+      fail("Log roll should have triggered FailedLogCloseException");
+    } catch (FailedLogCloseException flce) {
+      assertTrue("Should have deferred flush log edits outstanding",
+          log.hasDeferredEntries());
+    }
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1183180&r1=1183179&r2=1183180&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Oct 14 00:40:41 2011
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -462,11 +463,10 @@ public class TestLogRolling  {
 
     // roll all datanodes in the pipeline
     dfsCluster.restartDataNodes();
-    Thread.sleep(10000);
+    Thread.sleep(1000);
     dfsCluster.waitActive();
     LOG.info("Data Nodes restarted");
 
-    //this.log.sync();
     // this write should succeed, but trigger a log roll
     writeData(table, 1003);
     long newFilenum = log.getFilenum();
@@ -474,12 +474,11 @@ public class TestLogRolling  {
     assertTrue("Missing datanode should've triggered a log roll",
         newFilenum > oldFilenum && newFilenum > curTime);
 
-    //this.log.sync();
     writeData(table, 1004);
 
     // roll all datanode again
     dfsCluster.restartDataNodes();
-    Thread.sleep(10000);
+    Thread.sleep(1000);
     dfsCluster.waitActive();
     LOG.info("Data Nodes restarted");
 
@@ -536,5 +535,11 @@ public class TestLogRolling  {
     } finally {
       scanner.close();
     }
+
+    // verify that no region servers aborted
+    for (JVMClusterUtil.RegionServerThread rsThread:
+        TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
+      assertFalse(rsThread.getRegionServer().isAborted());
+    }
   }
 }