You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2012/10/03 17:25:01 UTC

svn commit: r1393537 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java main/java/org/apache/hadoop/hbase/master/SplitLogManager.java test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java

Author: nkeywal
Date: Wed Oct  3 15:25:01 2012
New Revision: 1393537

URL: http://svn.apache.org/viewvc?rev=1393537&view=rev
Log:
HBASE-6738  Too aggressive task resubmission from the distributed log manager

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1393537&r1=1393536&r2=1393537&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Wed Oct  3 15:25:01 2012
@@ -110,7 +110,7 @@ public class MasterFileSystem {
       conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
     if (this.distributedLogSplitting) {
       this.splitLogManager = new SplitLogManager(master.getZooKeeper(),
-          master.getConfiguration(), master, master.getServerName());
+          master.getConfiguration(), master, services, master.getServerName());
       this.splitLogManager.finishInitialization(masterRecovery);
     } else {
       this.splitLogManager = null;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1393537&r1=1393536&r2=1393537&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Wed Oct  3 15:25:01 2012
@@ -98,12 +98,13 @@ import org.apache.zookeeper.data.Stat;
 public class SplitLogManager extends ZooKeeperListener {
   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
 
-  public static final int DEFAULT_TIMEOUT = 25000; // 25 sec
+  public static final int DEFAULT_TIMEOUT = 120000;
   public static final int DEFAULT_ZK_RETRIES = 3;
   public static final int DEFAULT_MAX_RESUBMIT = 3;
   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
 
   private final Stoppable stopper;
+  private final MasterServices master;
   private final ServerName serverName;
   private final TaskFinisher taskFinisher;
   private FileSystem fs;
@@ -116,11 +117,11 @@ public class SplitLogManager extends Zoo
   private long lastNodeCreateTime = Long.MAX_VALUE;
   public boolean ignoreZKDeleteForTesting = false;
 
-  private ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
+  private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
   private TimeoutMonitor timeoutMonitor;
 
   private volatile Set<ServerName> deadWorkers = null;
-  private Object deadWorkersLock = new Object();
+  private final Object deadWorkersLock = new Object();
 
   /**
    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher, Configuration,
@@ -135,8 +136,8 @@ public class SplitLogManager extends Zoo
    * @param serverName
    */
   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
-      Stoppable stopper, ServerName serverName) {
-    this(zkw, conf, stopper, serverName, new TaskFinisher() {
+       Stoppable stopper, MasterServices master, ServerName serverName) {
+    this(zkw, conf, stopper,  master, serverName, new TaskFinisher() {
       @Override
       public Status finish(ServerName workerName, String logfile) {
         try {
@@ -162,18 +163,19 @@ public class SplitLogManager extends Zoo
    * @param tf task finisher 
    */
   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
-      Stoppable stopper, ServerName serverName, TaskFinisher tf) {
+        Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) {
     super(zkw);
     this.taskFinisher = tf;
     this.conf = conf;
     this.stopper = stopper;
+    this.master = master;
     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
     this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
     this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
     this.unassignedTimeout =
       conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
-    LOG.debug("timeout = " + timeout);
-    LOG.debug("unassigned timeout = " + unassignedTimeout);
+    LOG.info("timeout = " + timeout);
+    LOG.info("unassigned timeout = " + unassignedTimeout);
 
     this.serverName = serverName;
     this.timeoutMonitor =
@@ -551,8 +553,18 @@ public class SplitLogManager extends Zoo
     }
     int version;
     if (directive != FORCE) {
-      if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) <
-          timeout) {
+      // We're going to resubmit:
+      //  1) immediately if the worker server is now marked as dead
+      //  2) after a configurable timeout if the server is not marked as dead but has still not
+      //       finished the task. This allows to continue if the worker cannot actually handle it,
+      //       for any reason.
+      final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
+      final boolean alive = master.getServerManager() != null ?
+          master.getServerManager().isServerOnline(task.cur_worker_name) : true;
+      if (alive && time < timeout) {
+        LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server " +
+            task.cur_worker_name + " is not marked as dead, we waited for " + time +
+            " while the timeout is " + timeout);
         return false;
       }
       if (task.unforcedResubmits >= resubmit_threshold) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1393537&r1=1393536&r2=1393537&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Wed Oct  3 15:25:01 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.SplitLogTask;
 import org.apache.hadoop.hbase.Stoppable;
@@ -64,15 +65,20 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 
 @Category(MediumTests.class)
 public class TestSplitLogManager {
   private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
   private final ServerName DUMMY_MASTER = new ServerName("dummy-master,1,1");
+  private final ServerManager sm = Mockito.mock(ServerManager.class);
+  private final MasterServices master =  Mockito.mock(MasterServices.class);
+
   static {
     Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
   }
@@ -103,14 +109,6 @@ public class TestSplitLogManager {
 
   };
 
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-  }
-
   @Before
   public void setup() throws Exception {
     TEST_UTIL = new HBaseTestingUtility();
@@ -128,6 +126,11 @@ public class TestSplitLogManager {
 
     stopped = false;
     resetCounters();
+
+    // By default, we let the test manage the error as before, so the server
+    //  does not appear as dead from the master point of view, only from the split log pov.
+    Mockito.when(sm.isServerOnline(Mockito.any(ServerName.class))).thenReturn(true);
+    Mockito.when(master.getServerManager()).thenReturn(sm);
   }
 
   @After
@@ -194,8 +197,9 @@ public class TestSplitLogManager {
    */
   @Test
   public void testTaskCreation() throws Exception {
+
     LOG.info("TestTaskCreation - test the creation of a task in zk");
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
 
@@ -221,7 +225,7 @@ public class TestSplitLogManager {
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
     to = to + 2 * 100;
 
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
     Task task = slm.findOrCreateOrphanTask(tasknode);
@@ -248,7 +252,7 @@ public class TestSplitLogManager {
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
 
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
     Task task = slm.findOrCreateOrphanTask(tasknode);
@@ -277,7 +281,7 @@ public class TestSplitLogManager {
     to = to + 2 * 100;
 
     conf.setInt("hbase.splitlog.max.resubmit", 2);
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
 
@@ -312,7 +316,7 @@ public class TestSplitLogManager {
 
     conf.setInt("hbase.splitlog.manager.timeout", 1000);
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
 
@@ -328,26 +332,21 @@ public class TestSplitLogManager {
         return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get());
       }
     }, 0, 1, 5*60000); // wait long enough
-    if (tot_mgr_resubmit_failed.get() == 0) {
-      int version1 = ZKUtil.checkExists(zkw, tasknode);
-      assertTrue(version1 > version);
-      byte[] taskstate = ZKUtil.getData(zkw, tasknode);
-      slt = SplitLogTask.parseFrom(taskstate);
-      assertTrue(slt.isUnassigned(DUMMY_MASTER));
-      
-      waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
-    } else {
-      LOG.warn("Could not run test. Lost ZK connection?");
-    }
+    Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get());
+    int version1 = ZKUtil.checkExists(zkw, tasknode);
+    assertTrue(version1 > version);
+    byte[] taskstate = ZKUtil.getData(zkw, tasknode);
+    slt = SplitLogTask.parseFrom(taskstate);
+    assertTrue(slt.isUnassigned(DUMMY_MASTER));
 
-    return;
+    waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
   }
 
   @Test
   public void testTaskDone() throws Exception {
     LOG.info("TestTaskDone - cleanup task node once in DONE state");
 
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -368,7 +367,7 @@ public class TestSplitLogManager {
     LOG.info("TestTaskErr - cleanup task node once in ERR state");
 
     conf.setInt("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
 
@@ -376,6 +375,7 @@ public class TestSplitLogManager {
     final ServerName worker1 = new ServerName("worker1,1,1");
     SplitLogTask slt = new SplitLogTask.Err(worker1);
     ZKUtil.setData(zkw, tasknode, slt.toByteArray());
+
     synchronized (batch) {
       while (batch.installed != batch.error) {
         batch.wait();
@@ -390,7 +390,7 @@ public class TestSplitLogManager {
   public void testTaskResigned() throws Exception {
     LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
     assertEquals(tot_mgr_resubmit.get(), 0);
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     assertEquals(tot_mgr_resubmit.get(), 0);
     TaskBatch batch = new TaskBatch();
@@ -431,7 +431,7 @@ public class TestSplitLogManager {
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
 
 
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
 
@@ -462,7 +462,7 @@ public class TestSplitLogManager {
     LOG.info("testDeadWorker");
 
     conf.setLong("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     TaskBatch batch = new TaskBatch();
 
@@ -487,9 +487,34 @@ public class TestSplitLogManager {
   }
 
   @Test
+  public void testWorkerCrash() throws Exception {
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    final ServerName worker1 = new ServerName("worker1,1,1");
+
+    SplitLogTask slt = new SplitLogTask.Owned(worker1);
+    ZKUtil.setData(zkw, tasknode, slt.toByteArray());
+    if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+
+    // Not yet resubmitted.
+    Assert.assertEquals(0, tot_mgr_resubmit.get());
+
+    // This server becomes dead
+    Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
+
+    Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
+
+    // It has been resubmitted
+    Assert.assertEquals(1, tot_mgr_resubmit.get());
+  }
+
+  @Test
   public void testEmptyLogDir() throws Exception {
     LOG.info("testEmptyLogDir");
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
@@ -508,7 +533,7 @@ public class TestSplitLogManager {
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
 
-    slm = new SplitLogManager(zkw, conf, stopper, DUMMY_MASTER, null);
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
     slm.finishInitialization();
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     final Path logDir = new Path(fs.getWorkingDirectory(),