You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/25 00:30:32 UTC
svn commit: r1188419 [3/3] - in /hbase/branches/0.89-fb: ./
src/main/java/org/apache/hadoop/hbase/mapreduce/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/regions...
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,455 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.master.SplitLogManager.Task;
+import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestSplitLogManager {
+ private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class);
+ static {
+ Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+ }
+
+ private ZooKeeperWrapper zkw;
+ private SplitLogManager slm;
+ private Configuration conf;
+
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ static AtomicBoolean stopper = new AtomicBoolean();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ stopper.set(false);
+ conf = TEST_UTIL.getConfiguration();
+ zkw = ZooKeeperWrapper.createInstance(conf, "split-log-manager-tests");
+ zkw.deleteChildrenRecursively(zkw.parentZNode);
+ zkw.createZNodeIfNotExists(zkw.parentZNode);
+ assertTrue(zkw.checkExists(zkw.parentZNode) != -1);
+ LOG.debug(zkw.parentZNode + " created");
+ zkw.createZNodeIfNotExists(zkw.splitLogZNode);
+ assertTrue(zkw.checkExists(zkw.splitLogZNode) != -1);
+ LOG.debug(zkw.splitLogZNode + " created");
+
+ resetCounters();
+ }
+
+ @After
+ public void teardown() throws IOException, KeeperException {
+ stopper.set(true);
+ zkw.close();
+ slm.stop();
+ }
+
+ private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+ long timems) {
+ long curt = System.currentTimeMillis();
+ long endt = curt + timems;
+ while (curt < endt) {
+ if (ctr.get() == oldval) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ curt = System.currentTimeMillis();
+ } else {
+ assertEquals(newval, ctr.get());
+ return;
+ }
+ }
+ LOG.debug("Wait for counter failed");
+ assertTrue(false);
+ }
+
+ private String submitTaskAndWait(TaskBatch batch, String name)
+ throws KeeperException, InterruptedException {
+ String tasknode = ZKSplitLog.getEncodedNodeName(zkw, name);
+ NodeCreationListener listener = new NodeCreationListener(zkw, tasknode);
+ zkw.registerListener(listener);
+ zkw.watchAndCheckExists(tasknode);
+
+ slm.installTask(name, batch);
+ assertEquals(1, batch.installed);
+ assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
+ assertEquals(1L, tot_mgr_node_create_queued.get());
+
+ LOG.debug("waiting for task node creation");
+ listener.waitForCreation();
+ LOG.debug("task created");
+ return tasknode;
+ }
+
+ /**
+ * Test whether the splitlog correctly creates a task in zookeeper
+ * @throws Exception
+ */
+ @Test
+ public void testTaskCreation() throws Exception {
+ LOG.info("TestTaskCreation - test the creation of a task in zk");
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo/1");
+
+ byte[] data = zkw.getData("", tasknode);
+ LOG.info("Task node created " + new String(data));
+ assertTrue(TaskState.TASK_UNASSIGNED.equals(data, "dummy-master"));
+ }
+
+ @Test
+ public void testOrphanTaskAcquisition() throws Exception {
+ LOG.info("TestOrphanTaskAcquisition");
+
+ String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
+ zkw.getZooKeeper().create(tasknode,
+ TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+ to = to + 2 * 100;
+
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+ Task task = slm.findOrCreateOrphanTask(tasknode);
+ assertTrue(task.isOrphan());
+ waitForCounter(tot_mgr_heartbeat, 0, 1, 100);
+ assertFalse(task.isUnassigned());
+ long curt = System.currentTimeMillis();
+ assertTrue((task.last_update <= curt) &&
+ (task.last_update > (curt - 1000)));
+ LOG.info("waiting for manager to resubmit the orphan task");
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+ assertTrue(task.isUnassigned());
+ waitForCounter(tot_mgr_rescan, 0, 1, to + 100);
+ }
+
+ @Test
+ public void testUnassignedOrphan() throws Exception {
+ LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" +
+ " startup");
+ String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
+ //create an unassigned orphan task
+ zkw.getZooKeeper().create(tasknode,
+ TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ int version = zkw.checkExists(tasknode);
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+ Task task = slm.findOrCreateOrphanTask(tasknode);
+ assertTrue(task.isOrphan());
+ assertTrue(task.isUnassigned());
+ // wait for RESCAN node to be created
+ waitForCounter(tot_mgr_rescan, 0, 1, 500);
+ Task task2 = slm.findOrCreateOrphanTask(tasknode);
+ assertTrue(task == task2);
+ LOG.debug("task = " + task);
+ assertEquals(1L, tot_mgr_resubmit.get());
+ assertEquals(1, task.incarnation);
+ assertEquals(0, task.unforcedResubmits);
+ assertTrue(task.isOrphan());
+ assertTrue(task.isUnassigned());
+ assertTrue(zkw.checkExists(tasknode) > version);
+ }
+
+ @Test
+ public void testMultipleResubmits() throws Exception {
+ LOG.info("TestMultipleResbmits - no indefinite resubmissions");
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+ to = to + 2 * 100;
+
+ conf.setInt("hbase.splitlog.max.resubmit", 2);
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo/1");
+ int version = zkw.checkExists(tasknode);
+
+ zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
+ waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+ int version1 = zkw.checkExists(tasknode);
+ assertTrue(version1 > version);
+ zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker2"));
+ waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+ waitForCounter(tot_mgr_resubmit, 1, 2, to + 100);
+ int version2 = zkw.checkExists(tasknode);
+ assertTrue(version2 > version1);
+ zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker3"));
+ waitForCounter(tot_mgr_heartbeat, 1, 2, 1000);
+ waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100);
+ Thread.sleep(to + 100);
+ assertEquals(2L, tot_mgr_resubmit.get());
+ }
+
+ @Test
+ public void testRescanCleanup() throws Exception {
+ LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+ to = to + 2 * 100;
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo/1");
+ int version = zkw.checkExists(tasknode);
+
+ zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
+ waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 100);
+ int version1 = zkw.checkExists(tasknode);
+ assertTrue(version1 > version);
+ byte[] taskstate = zkw.getData("", tasknode);
+ assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+ taskstate));
+ waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000);
+
+ return;
+ }
+
+ @Test
+ public void testTaskDone() throws Exception {
+ LOG.info("TestTaskDone - cleanup task node once in DONE state");
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+ String tasknode = submitTaskAndWait(batch, "foo/1");
+ zkw.setData(tasknode, TaskState.TASK_DONE.get("worker"));
+ synchronized (batch) {
+ while (batch.installed != batch.done) {
+ batch.wait();
+ }
+ }
+ waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+ assertTrue(zkw.checkExists(tasknode) == -1);
+ }
+
+ @Test
+ public void testTaskErr() throws Exception {
+ LOG.info("TestTaskErr - cleanup task node once in ERR state");
+
+ conf.setInt("hbase.splitlog.max.resubmit", 0);
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo/1");
+ zkw.setData(tasknode, TaskState.TASK_ERR.get("worker"));
+ synchronized (batch) {
+ while (batch.installed != batch.error) {
+ batch.wait();
+ }
+ }
+ waitForCounter(tot_mgr_task_deleted, 0, 1, 1000);
+ assertTrue(zkw.checkExists(tasknode) == -1);
+ conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT);
+ }
+
+ @Test
+ public void testTaskResigned() throws Exception {
+ LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+ String tasknode = submitTaskAndWait(batch, "foo/1");
+ zkw.setData(tasknode, TaskState.TASK_RESIGNED.get("worker"));
+ int version = zkw.checkExists(tasknode);
+
+ waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
+ int version1 = zkw.checkExists(tasknode);
+ assertTrue(version1 > version);
+
+ byte[] taskstate = zkw.getData("", tasknode);
+ assertTrue(Arrays.equals(taskstate,
+ TaskState.TASK_UNASSIGNED.get("dummy-master")));
+ }
+
+ @Test
+ public void testUnassignedTimeout() throws Exception {
+ LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
+ " resubmit");
+
+ // create an orphan task in OWNED state
+ String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
+ zkw.getZooKeeper().create(tasknode1,
+ TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ int to = 1000;
+ conf.setInt("hbase.splitlog.manager.timeout", to);
+ conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to);
+ conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100);
+
+
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100);
+
+
+ // submit another task which will stay in unassigned mode
+ TaskBatch batch = new TaskBatch();
+ submitTaskAndWait(batch, "foo/1");
+
+ // keep updating the orphan owned node every to/2 seconds
+ for (int i = 0; i < (3 * to)/100; i++) {
+ Thread.sleep(100);
+ zkw.setData(tasknode1,
+ TaskState.TASK_OWNED.get("dummy-worker"));
+ }
+
+
+ // since we have stopped heartbeating the owned node therefore it should
+ // get resubmitted
+ LOG.info("waiting for manager to resubmit the orphan task");
+ waitForCounter(tot_mgr_resubmit, 0, 1, to + 500);
+
+ // now all the nodes are unassigned. manager should post another rescan
+ waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500);
+ }
+
+ @Test
+ public void testDeadWorker() throws Exception {
+ LOG.info("testDeadWorker");
+
+ conf.setLong("hbase.splitlog.max.resubmit", 0);
+ slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null);
+ slm.finishInitialization();
+ TaskBatch batch = new TaskBatch();
+
+ String tasknode = submitTaskAndWait(batch, "foo/1");
+ int version = zkw.checkExists(tasknode);
+
+ zkw.setData(tasknode, TaskState.TASK_OWNED.get("worker1"));
+ waitForCounter(tot_mgr_heartbeat, 0, 1, 1000);
+ slm.handleDeadWorker("worker1");
+ waitForCounter(tot_mgr_resubmit, 0, 1, 1000);
+ waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, 1000);
+
+ int version1 = zkw.checkExists(tasknode);
+ assertTrue(version1 > version);
+ byte[] taskstate = zkw.getData("", tasknode);
+ assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"),
+ taskstate));
+ return;
+ }
+
+ public static class NodeCreationListener implements Watcher {
+ private static final Log LOG = LogFactory
+ .getLog(NodeCreationListener.class);
+
+ private Semaphore lock;
+ private String node;
+ protected ZooKeeperWrapper watcher;
+
+ public NodeCreationListener(ZooKeeperWrapper watcher, String node) {
+ this.watcher = watcher;
+ this.lock = new Semaphore(0);
+ this.node = node;
+ }
+
+ private void nodeCreated(String path) {
+ if (path.equals(node)) {
+ LOG.debug("nodeCreated(" + path + ")");
+ lock.release();
+ }
+ }
+
+ public void waitForCreation() throws InterruptedException {
+ lock.acquire();
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ switch (event.getType()) {
+ case NodeCreated:
+ nodeCreated(event.getPath());
+ break;
+ case NodeDeleted:
+ // no-op
+ break;
+ case NodeDataChanged:
+ // no-op
+ break;
+ case NodeChildrenChanged:
+ // no-op
+ break;
+ }
+ }
+ }
+}
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1188419&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Mon Oct 24 22:30:31 2011
@@ -0,0 +1,283 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class TestSplitLogWorker {
+ private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
+ static {
+ Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+ }
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+ private ZooKeeperWrapper zkw;
+ private SplitLogWorker slw;
+
+ private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+ long timems) {
+ long curt = System.currentTimeMillis();
+ long endt = curt + timems;
+ while (curt < endt) {
+ if (ctr.get() == oldval) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ curt = System.currentTimeMillis();
+ } else {
+ assertEquals(newval, ctr.get());
+ return;
+ }
+ }
+ assertTrue(false);
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ slw = null;
+ zkw = ZooKeeperWrapper.createInstance(TEST_UTIL.getConfiguration(),
+ "split-log-worker-tests");
+ zkw.deleteChildrenRecursively(zkw.parentZNode);
+ zkw.createZNodeIfNotExists(zkw.parentZNode);
+ assertTrue(zkw.checkExists(zkw.parentZNode) != -1);
+ LOG.debug(zkw.parentZNode + " created");
+ zkw.createZNodeIfNotExists(zkw.splitLogZNode);
+ assertTrue(zkw.checkExists(zkw.splitLogZNode) != -1);
+ LOG.debug(zkw.splitLogZNode + " created");
+ resetCounters();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ zkw.close();
+ if (slw != null) {
+ slw.stop();
+ slw.worker.join(3000);
+ if (slw.worker.isAlive()) {
+ assertTrue("could not stop the worker thread" == null);
+ }
+ }
+ }
+
+ SplitLogWorker.TaskExecutor neverEndingTask =
+ new SplitLogWorker.TaskExecutor() {
+
+ @Override
+ public Status exec(String name, CancelableProgressable p) {
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ return Status.PREEMPTED;
+ }
+ if (!p.progress()) {
+ return Status.PREEMPTED;
+ }
+ }
+ }
+
+ };
+
+ @Test
+ public void testAcquireTaskAtStartup() throws Exception {
+ LOG.info("testAcquireTaskAtStartup");
+
+ zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tatas"),
+ TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs",
+ neverEndingTask);
+ slw.start();
+ waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
+ assertTrue(TaskState.TASK_OWNED.equals(
+ zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "tatas")), "rs"));
+ }
+
+ @Test
+ public void testRaceForTask() throws Exception {
+ LOG.info("testRaceForTask");
+
+ zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "trft"),
+ TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+ "svr1", neverEndingTask);
+ SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+ "svr2", neverEndingTask);
+ slw1.start();
+ slw2.start();
+ waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ waitForCounter(tot_wkr_failed_to_grab_task_lost_race, 0, 1, 1000);
+ assertTrue(TaskState.TASK_OWNED.equals(
+ zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "trft")), "svr1")
+ || TaskState.TASK_OWNED
+ .equals(
+ zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "trft")),
+ "svr2"));
+ slw1.stop();
+ slw2.stop();
+ slw1.worker.join();
+ slw2.worker.join();
+ }
+
+ @Test
+ public void testPreemptTask() throws Exception {
+ LOG.info("testPreemptTask");
+
+ slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+ "tpt_svr", neverEndingTask);
+ slw.start();
+ Thread.yield(); // let the worker start
+ Thread.sleep(100);
+
+ // this time create a task node after starting the splitLogWorker
+ zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
+ TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ assertEquals(1, slw.taskReadySeq);
+ assertTrue(TaskState.TASK_OWNED.equals(
+ zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "tpt_task")),
+ "tpt_svr"));
+
+ zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"),
+ TaskState.TASK_UNASSIGNED.get("manager"));
+ waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+ }
+
+ @Test
+ public void testMultipleTasks() throws Exception {
+ LOG.info("testMultipleTasks");
+ slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+ "tmt_svr", neverEndingTask);
+ slw.start();
+ Thread.yield(); // let the worker start
+ Thread.sleep(100);
+
+ zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
+ TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ // now the worker is busy doing the above task
+
+ // create another task
+ zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2"),
+ TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ // preempt the first task, have it owned by another worker
+ zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"),
+ TaskState.TASK_OWNED.get("another-worker"));
+ waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+
+ waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+ assertEquals(2, slw.taskReadySeq);
+ assertTrue(TaskState.TASK_OWNED.equals(
+ zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2")),
+ "tmt_svr"));
+ }
+
+ // @Test
+ public void testRescan() throws Exception {
+ LOG.info("testRescan");
+ slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+ "svr", neverEndingTask);
+ slw.start();
+ Thread.yield(); // let the worker start
+ Thread.sleep(100);
+
+ zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+ TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+ // now the worker is busy doing the above task
+
+ // preempt the task, have it owned by another worker
+ zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+ TaskState.TASK_UNASSIGNED.get("manager"));
+ waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+
+ // create a RESCAN node
+ zkw.getZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, "RESCAN"),
+ TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL);
+
+ waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+ // RESCAN node might not have been processed if the worker became busy
+ // with the above task. preempt the task again so that now the RESCAN
+ // node is processed
+ zkw.setData(ZKSplitLog.getEncodedNodeName(zkw, "task"),
+ TaskState.TASK_UNASSIGNED.get("manager"));
+ waitForCounter(tot_wkr_preempt_task, 1, 2, 1000);
+ waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000);
+
+ List<String> nodes = zkw.listChildrenNoWatch(zkw.splitLogZNode);
+ LOG.debug(nodes);
+ int num = 0;
+ for (String node : nodes) {
+ num++;
+ if (node.startsWith("RESCAN")) {
+ assertTrue(TaskState.TASK_DONE.equals(
+ zkw.getData("", ZKSplitLog.getEncodedNodeName(zkw, node)), "svr"));
+ }
+ }
+ assertEquals(2, num);
+ }
+}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1188419&r1=1188418&r2=1188419&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Mon Oct 24 22:30:31 2011
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
@@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.After;
@@ -88,6 +88,8 @@ public class TestHLogSplit {
private static final String HLOG_FILE_PREFIX = "hlog.dat.";
private static List<String> regions;
private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
+ private static final Path tabledir = new Path(hbaseDir,
+ Bytes.toString(TABLE_NAME));
static enum Corruptions {
@@ -694,6 +696,119 @@ public class TestHLogSplit {
}
}
+ private CancelableProgressable reporter = new CancelableProgressable() {
+ int count = 0;
+
+ @Override
+ public boolean progress() {
+ count++;
+ LOG.debug("progress = " + count);
+ return true;
+ }
+ };
+
+ @Test
+ public void testSplitLogFileWithOneRegion() throws IOException {
+ LOG.info("testSplitLogFileWithOneRegion");
+ final String REGION = "region__1";
+ regions.removeAll(regions);
+ regions.add(REGION);
+
+ generateHLogs(1, 10, -1);
+ FileStatus logfile = fs.listStatus(hlogDir)[0];
+ fs.initialize(fs.getUri(), conf);
+ HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+ reporter);
+ HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+ logfile.getPath().toString(), conf);
+
+ Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+ Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
+
+ assertEquals(true, logsAreEqual(originalLog, splitLog));
+ }
+
+ @Test
+ public void testSplitLogFileDeletedRegionDir() throws IOException {
+ LOG.info("testSplitLogFileDeletedRegionDir");
+ final String REGION = "region__1";
+ regions.removeAll(regions);
+ regions.add(REGION);
+
+ generateHLogs(1, 10, -1);
+ FileStatus logfile = fs.listStatus(hlogDir)[0];
+ fs.initialize(fs.getUri(), conf);
+
+ Path regiondir = new Path(tabledir, REGION);
+ LOG.info("Region directory is" + regiondir);
+ fs.delete(regiondir, true);
+
+ HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+ reporter);
+ HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+ logfile.getPath().toString(), conf);
+ // This test passes if there are no exceptions when
+ // the region directory has been removed
+
+ assertTrue(!fs.exists(regiondir));
+ }
+
+ @Test
+ public void testSplitLogFileEmpty() throws IOException {
+ LOG.info("testSplitLogFileEmpty");
+ injectEmptyFile(".empty", true);
+ FileStatus logfile = fs.listStatus(hlogDir)[0];
+
+ fs.initialize(fs.getUri(), conf);
+
+ HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+ reporter);
+ HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+ logfile.getPath().toString(), conf);
+ Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
+ FileStatus[] files = this.fs.listStatus(tdir);
+ assertTrue(files == null || files.length == 0);
+
+ assertEquals(0, countHLog(fs.listStatus(oldLogDir)[0].getPath(), fs, conf));
+ }
+
+ @Test
+ public void testSplitLogFileMultipleRegions() throws IOException {
+ LOG.info("testSplitLogFileMultipleRegions");
+ generateHLogs(1, 10, -1);
+ FileStatus logfile = fs.listStatus(hlogDir)[0];
+ fs.initialize(fs.getUri(), conf);
+
+ HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+ reporter);
+ HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+ logfile.getPath().toString(), conf);
+ for (String region : regions) {
+ Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region);
+ assertEquals(10, countHLog(recovered, fs, conf));
+ }
+ }
+
+ @Test
+ public void testSplitLogFileFirstLineCorruptionLog() throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ generateHLogs(1, 10, -1);
+ FileStatus logfile = fs.listStatus(hlogDir)[0];
+
+ corruptHLog(logfile.getPath(), Corruptions.INSERT_GARBAGE_ON_FIRST_LINE,
+ true, fs);
+
+ fs.initialize(fs.getUri(), conf);
+ HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, conf,
+ reporter);
+ HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+ logfile.getPath().toString(), conf);
+
+ final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
+ "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
+ assertEquals(1, fs.listStatus(corruptDir).length);
+ }
+
private void flushToConsole(String s) {
System.out.println(s);
System.out.flush();