You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/25 00:30:32 UTC

svn commit: r1188419 [3/3] - in /hbase/branches/0.89-fb: ./ src/main/java/org/apache/hadoop/hbase/mapreduce/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regions...

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,455 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestSplitLogManager {
+  private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+  static {
+    Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+  }
+
+  private ZooKeeperWrapper zkw;
+  private SplitLogManager slm;
+  private Configuration conf;
+
+  private final static HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+
+  static AtomicBoolean stopper = new AtomicBoolean();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    stopper.set(false);
+    conf = TEST_UTIL.getConfiguration();
+    zkw = ZooKeeperWrapper.createInstance(conf, "split-log-manager-tests");
+    zkw.deleteChildrenRecursively(zkw.parentZNode);
+    zkw.createZNodeIfNotExists(zkw.parentZNode);
+    assertTrue(zkw.checkExists(zkw.parentZNode) != -1);
+    LOG.debug(zkw.parentZNode + " created");
+    zkw.createZNodeIfNotExists(zkw.splitLogZNode);
+    assertTrue(zkw.checkExists(zkw.splitLogZNode) != -1);
+    LOG.debug(zkw.splitLogZNode + " created");
+
+    resetCounters();
+  }
+
+  @After
+  public void teardown() throws IOException, KeeperException {
+    stopper.set(true);
+    zkw.close();
+    slm.stop();
+  }
+
+  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    long curt = System.currentTimeMillis();
+    long endt = curt + timems;
+    while (curt < endt) {
+      if (ctr.get() == oldval) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(newval, ctr.get());
+        return;
+      }
+    }
+    LOG.debug("Wait for counter failed");
+    assertTrue(false);
+  }
+
+  private String submitTaskAndWait(TaskBatch batch, String name)
+  throws KeeperException, InterruptedException {
+    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
+    NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
+    zkw.registerListener(listener);
+    zkw.watchAndCheckExists(tasknode);
+
+    slm.installTask(name, batch);
+    assertEquals(1, batch.installed);
+    assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
+    assertEquals(1L, tot_mgr_node_create_queued.get());
+
+    LOG.debug("waiting for task node creation");
+    listener.waitForCreation();
+    LOG.debug("task created");
+    return tasknode;
+  }
+
+  /**
+   * Test whether the splitlog correctly creates a task in zookeeper
+   * @throws Exception
+   */
+  @Test
+  public void testTaskCreation() throws Exception {
+    LOG.info("TestTaskCreation - test the creation of a task in zk");
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+
+    byte[] data = zkw.getData("", tasknode);
+    LOG.info("Task node created " + new String(data));
+    assertTrue(TaskState.TASK_UNASSIGNED.equals(data, "dummy-master"));
+  }
+
+  @Test
+  public void testOrphanTaskAcquisition() throws Exception {
+    LOG.info("TestOrphanTaskAcquisition");
+
+    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
+    zkw.getZooKeeper().create(tasknode,
+        TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+    to = to + 2 * 100;
+
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+    Task task = slm.findOrCreateOrphanTask(tasknode);
+    assertTrue(task.isOrphan());
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 100);
+    assertFalse(task.isUnassigned());
+    long curt = System.currentTimeMillis();
+    assertTrue((task.last_update <= curt) &&
+        (task.last_update > (curt - 1000)));
+    LOG.info("waiting for manager to resubmit the orphan task");
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+    assertTrue(task.isUnassigned());
+    waitForCounter(tot_mgr_rescan, 0, 1, to + 100);
+  }
+
+  @Test
+  public void testUnassignedOrphan() throws Exception {
+    LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
+        " startup");
+    String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
+    //create an unassigned orphan task
+    zkw.getZooKeeper().create(tasknode,
+        TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+    int version = zkw.checkExists(tasknode);
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+    Task task = slm.findOrCreateOrphanTask(tasknode);
+    assertTrue(task.isOrphan());
+    assertTrue(task.isUnassigned());
+    // wait for RESCAN node to be created
+    waitForCounter(tot_mgr_rescan, 0, 1, 500);
+    Task task2 = slm.findOrCreateOrphanTask(tasknode);
+    assertTrue(task == task2);
+    LOG.debug("task = " + task);
+    assertEquals(1L, tot_mgr_resubmit.get());
+    assertEquals(1, task.incarnation);
+    assertEquals(0, task.unforcedResubmits);
+    assertTrue(task.isOrphan());
+    assertTrue(task.isUnassigned());
+    assertTrue(zkw.checkExists(tasknode) > version);
+  }
+
+  @Test
+  public void testMultipleResubmits() throws Exception {
+    LOG.info("TestMultipleResbmits - no indefinite resubmissions");
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+    to = to + 2 * 100;
+
+    conf.setInt("hbase.splitlog.max.resubmit", 2);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    int version = zkw.checkExists(tasknode);
+
+    zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+    int version1 = zkw.checkExists(tasknode);
+    assertTrue(version1 > version);
+    zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker2"));
+    waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+    waitForCounter(tot_mgr_resubmit, 1, 2, to + 100);
+    int version2 = zkw.checkExists(tasknode);
+    assertTrue(version2 > version1);
+    zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker3"));
+    waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+    waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100);
+    Thread.sleep(to + 100);
+    assertEquals(2L, tot_mgr_resubmit.get());
+  }
+
+  @Test
+  public void testRescanCleanup() throws Exception {
+    LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+    to = to + 2 * 100;
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    int version = zkw.checkExists(tasknode);
+
+    zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+    int version1 = zkw.checkExists(tasknode);
+    assertTrue(version1 > version);
+    byte[] taskstate = zkw.getData("", tasknode);
+    assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+        taskstate));
+    waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+
+    return;
+  }
+
+  @Test
+  public void testTaskDone() throws Exception {
+    LOG.info("TestTaskDone - cleanup task node once in DONE state");
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    zkw.setData(tasknode, TaskState.TASK_DONE.get("worker"));
+    synchronized (batch) {
+      while (batch.installed != batch.done) {
+        batch.wait();
+      }
+    }
+    waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+    assertTrue(zkw.checkExists(tasknode) == -1);
+  }
+
+  @Test
+  public void testTaskErr() throws Exception {
+    LOG.info("TestTaskErr - cleanup task node once in ERR state");
+
+    conf.setInt("hbase.splitlog.max.resubmit", 0);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    zkw.setData(tasknode, TaskState.TASK_ERR.get("worker"));
+    synchronized (batch) {
+      while (batch.installed != batch.error) {
+        batch.wait();
+      }
+    }
+    waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+    assertTrue(zkw.checkExists(tasknode) == -1);
+    conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);
+  }
+
+  @Test
+  public void testTaskResigned() throws Exception {
+    LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    zkw.setData(tasknode, TaskState.TASK_RESIGNED.get("worker"));
+    int version = zkw.checkExists(tasknode);
+
+    waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
+    int version1 = zkw.checkExists(tasknode);
+    assertTrue(version1 > version);
+
+    byte[] taskstate = zkw.getData("", tasknode);
+    assertTrue(Arrays.equals(taskstate,
+        TaskState.TASK_UNASSIGNED.get("dummy-master")));
+  }
+
+  @Test
+  public void testUnassignedTimeout() throws Exception {
+    LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
+        " resubmit");
+
+    // create an orphan task in OWNED state
+    String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
+    zkw.getZooKeeper().create(tasknode1,
+        TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    int to = 1000;
+    conf.setInt("hbase.splitlog.manager.timeout", to);
+    conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
+    conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+
+
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+
+
+    // submit another task which will stay in unassigned mode
+    TaskBatch batch = new TaskBatch();
+    submitTaskAndWait(batch, "foo/1");
+
+    // keep updating the orphan owned node every to/2 seconds
+    for (int i = 0; i < (3 * to)/100; i++) {
+      Thread.sleep(100);
+      zkw.setData(tasknode1,
+          TaskState.TASK_OWNED.get("dummy-worker"));
+    }
+
+
+    // since we have stopped heartbeating the owned node therefore it should
+    // get resubmitted
+    LOG.info("waiting for manager to resubmit the orphan task");
+    waitForCounter(tot_mgr_resubmit, 0, 1, to + 500);
+
+    // now all the nodes are unassigned. manager should post another rescan
+    waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500);
+  }
+
+  @Test
+  public void testDeadWorker() throws Exception {
+    LOG.info("testDeadWorker");
+
+    conf.setLong("hbase.splitlog.max.resubmit", 0);
+    slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+    slm.finishInitialization();
+    TaskBatch batch = new TaskBatch();
+
+    String tasknode = submitTaskAndWait(batch, "foo/1");
+    int version = zkw.checkExists(tasknode);
+
+    zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
+    waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+    slm.handleDeadWorker("worker1");
+    waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
+    waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000);
+
+    int version1 = zkw.checkExists(tasknode);
+    assertTrue(version1 > version);
+    byte[] taskstate = zkw.getData("", tasknode);
+    assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+        taskstate));
+    return;
+  }
+
+  public static class NodeCreationListener implements Watcher {
+    private static final Log LOG = LogFactory
+        .getLog(NodeCreationListener.class);
+
+    private Semaphore lock;
+    private String node;
+    protected ZooKeeperWrapper watcher;
+
+    public NodeCreationListener(ZooKeeperWrapper watcher, String node) {
+      this.watcher = watcher;
+      this.lock = new Semaphore(0);
+      this.node = node;
+    }
+
+    private void nodeCreated(String path) {
+      if (path.equals(node)) {
+        LOG.debug("nodeCreated(" + path + ")");
+        lock.release();
+      }
+    }
+
+    public void waitForCreation() throws InterruptedException {
+      lock.acquire();
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      switch (event.getType()) {
+      case NodeCreated:
+        nodeCreated(event.getPath());
+        break;
+      case NodeDeleted:
+        // no-op
+        break;
+      case NodeDataChanged:
+        // no-op
+        break;
+      case NodeChildrenChanged:
+        // no-op
+        break;
+      }
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,283 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class TestSplitLogWorker {
+  private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
+  static {
+    Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+  }
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+  private ZooKeeperWrapper zkw;
+  private SplitLogWorker slw;
+
+  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    long curt = System.currentTimeMillis();
+    long endt = curt + timems;
+    while (curt < endt) {
+      if (ctr.get() == oldval) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(newval, ctr.get());
+        return;
+      }
+    }
+    assertTrue(false);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    slw = null;
+    zkw = ZooKeeperWrapper.createInstance(TEST_UTIL.getConfiguration(),
+        "split-log-worker-tests");
+    zkw.deleteChildrenRecursively(zkw.parentZNode);
+    zkw.createZNodeIfNotExists(zkw.parentZNode);
+    assertTrue(zkw.checkExists(zkw.parentZNode) != -1);
+    LOG.debug(zkw.parentZNode + " created");
+    zkw.createZNodeIfNotExists(zkw.splitLogZNode);
+    assertTrue(zkw.checkExists(zkw.splitLogZNode) != -1);
+    LOG.debug(zkw.splitLogZNode + " created");
+    resetCounters();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    zkw.close();
+    if (slw != null) {
+      slw.stop();
+      slw.worker.join(3000);
+      if (slw.worker.isAlive()) {
+        assertTrue("could not stop the worker thread" == null);
+      }
+    }
+  }
+
+  SplitLogWorker.TaskExecutor neverEndingTask =
+    new SplitLogWorker.TaskExecutor() {
+
+      @Override
+      public Status exec(String name, CancelableProgressable p) {
+        while (true) {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            return Status.PREEMPTED;
+          }
+          if (!p.progress()) {
+            return Status.PREEMPTED;
+          }
+        }
+      }
+
+  };
+
+  @Test
+  public void testAcquireTaskAtStartup() throws Exception {
+    LOG.info("testAcquireTaskAtStartup");
+
+    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
+        TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs",
+        neverEndingTask);
+    slw.start();
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
+    assertTrue(TaskState.TASK_OWNED.equals(
+        zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs"));
+  }
+
+  @Test
+  public void testRaceForTask() throws Exception {
+    LOG.info("testRaceForTask");
+
+    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "svr1", neverEndingTask);
+    SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "svr2", neverEndingTask);
+    slw1.start();
+    slw2.start();
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    waitForCounter(tot_wkr_failed_to_grab_task_lost_race, 0, 1, 1000);
+    assertTrue(TaskState.TASK_OWNED.equals(
+        zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr1")
+        || TaskState.TASK_OWNED
+            .equals(
+                zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "trft")),
+                "svr2"));
+    slw1.stop();
+    slw2.stop();
+    slw1.worker.join();
+    slw2.worker.join();
+  }
+
+  @Test
+  public void testPreemptTask() throws Exception {
+    LOG.info("testPreemptTask");
+
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "tpt_svr", neverEndingTask);
+    slw.start();
+    Thread.yield(); // let the worker start
+    Thread.sleep(100);
+
+    // this time create a task node after starting the splitLogWorker
+    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    assertEquals(1, slw.taskReadySeq);
+    assertTrue(TaskState.TASK_OWNED.equals(
+        zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "tpt_task")),
+        "tpt_svr"));
+
+    zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
+        TaskState.TASK_UNASSIGNED.get("manager"));
+    waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+  }
+
+  @Test
+  public void testMultipleTasks() throws Exception {
+    LOG.info("testMultipleTasks");
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "tmt_svr", neverEndingTask);
+    slw.start();
+    Thread.yield(); // let the worker start
+    Thread.sleep(100);
+
+    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    // now the worker is busy doing the above task
+
+    // create another task
+    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    // preempt the first task, have it owned by another worker
+    zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
+        TaskState.TASK_OWNED.get("another-worker"));
+    waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+
+    waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+    assertEquals(2, slw.taskReadySeq);
+    assertTrue(TaskState.TASK_OWNED.equals(
+        zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2")),
+        "tmt_svr"));
+  }
+
+  // @Test
+  public void testRescan() throws Exception {
+    LOG.info("testRescan");
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "svr", neverEndingTask);
+    slw.start();
+    Thread.yield(); // let the worker start
+    Thread.sleep(100);
+
+    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    // now the worker is busy doing the above task
+
+    // preempt the task, have it owned by another worker
+    zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+        TaskState.TASK_UNASSIGNED.get("manager"));
+    waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+
+    // create a RESCAN node
+    zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT_SEQUENTIAL);
+
+    waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+    // RESCAN node might not have been processed if the worker became busy
+    // with the above task. preempt the task again so that now the RESCAN
+    // node is processed
+    zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+        TaskState.TASK_UNASSIGNED.get("manager"));
+    waitForCounter(tot_wkr_preempt_task, 1, 2, 1000);
+    waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000);
+
+    List<String> nodes = zkw.listChildrenNoWatch(zkw.splitLogZNode);
+    LOG.debug(nodes);
+    int num = 0;
+    for (String node : nodes) {
+      num++;
+      if (node.startsWith("RESCAN")) {
+        assertTrue(TaskState.TASK_DONE.equals(
+            zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, node)), "svr"));
+      }
+    }
+    assertEquals(2, num);
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Mon Oct 24 22:30:31 2011
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hbase.regionserver.wal;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.FileNotFoundException;
@@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
 import org.junit.After;
@@ -88,6 +88,8 @@ public class TestHLogSplit {
   private static final String HLOG_FILE_PREFIX = "hlog.dat.";
   private static List<String> regions;
   private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
+  private static final Path tabledir = new Path(hbaseDir,
+      Bytes.toString(TABLE_NAME));
 
 
   static enum Corruptions {
@@ -694,6 +696,119 @@ public class TestHLogSplit {
     }
   }
 
+  private CancelableProgressable reporter = new CancelableProgressable() {
+    int count = 0;
+
+    @Override
+    public boolean progress() {
+      count++;
+      LOG.debug("progress = " + count);
+      return true;
+    }
+  };
+
+  @Test
+  public void testSplitLogFileWithOneRegion() throws IOException {
+    LOG.info("testSplitLogFileWithOneRegion");
+    final String REGION = "region__1";
+    regions.removeAll(regions);
+    regions.add(REGION);
+
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+    fs.initialize(fs.getUri(), conf);
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+        reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+
+    Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+    Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
+
+    assertEquals(true, logsAreEqual(originalLog, splitLog));
+  }
+
+  @Test
+  public void testSplitLogFileDeletedRegionDir() throws IOException {
+    LOG.info("testSplitLogFileDeletedRegionDir");
+    final String REGION = "region__1";
+    regions.removeAll(regions);
+    regions.add(REGION);
+
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+    fs.initialize(fs.getUri(), conf);
+
+    Path regiondir = new Path(tabledir, REGION);
+    LOG.info("Region directory is" + regiondir);
+    fs.delete(regiondir, true);
+
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+        reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+    // This test passes if there are no exceptions when
+    // the region directory has been removed
+
+    assertTrue(!fs.exists(regiondir));
+  }
+
+  @Test
+  public void testSplitLogFileEmpty() throws IOException {
+    LOG.info("testSplitLogFileEmpty");
+    injectEmptyFile(".empty", true);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+
+    fs.initialize(fs.getUri(), conf);
+
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+        reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+    Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
+    FileStatus[] files = this.fs.listStatus(tdir);
+    assertTrue(files == null || files.length == 0);
+
+    assertEquals(0, countHLog(fs.listStatus(oldLogDir)[0].getPath(), fs, conf));
+  }
+
+  @Test
+  public void testSplitLogFileMultipleRegions() throws IOException {
+    LOG.info("testSplitLogFileMultipleRegions");
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+    fs.initialize(fs.getUri(), conf);
+
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+        reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+    for (String region : regions) {
+      Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals(10, countHLog(recovered, fs, conf));
+    }
+  }
+
+  @Test
+  public void testSplitLogFileFirstLineCorruptionLog() throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+
+    corruptHLog(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE,
+        true, fs);
+
+    fs.initialize(fs.getUri(), conf);
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+        reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+
+    final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
+        "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
+    assertEquals(1, fs.listStatus(corruptDir).length);
+  }
+
   private void flushToConsole(String s) {
     System.out.println(s);
     System.out.flush();