You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/27 00:09:31 UTC
[10/37] hbase git commit: HBASE-15302 Reenable the other tests
disabled by HBASE-14678
http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
new file mode 100644
index 0000000..125f5a1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestMasterFailoverWithProcedures {
+ private static final Log LOG = LogFactory.getLog(TestMasterFailoverWithProcedures.class);
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static void setupConf(Configuration conf) {
+ // don't waste time retrying with the roll, the test is already slow enough.
+ conf.setInt("hbase.procedure.store.wal.max.retries.before.roll", 1);
+ conf.setInt("hbase.procedure.store.wal.wait.before.roll", 0);
+ conf.setInt("hbase.procedure.store.wal.max.roll.retries", 1);
+ conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 1);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(2, 1);
+
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, false);
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, false);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testWalRecoverLease() throws Exception {
+ final ProcedureStore masterStore = getMasterProcedureExecutor().getStore();
+ assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore);
+
+ HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
+ // Abort Latch for the master store
+ final CountDownLatch masterStoreAbort = new CountDownLatch(1);
+ masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void postSync() {}
+
+ @Override
+ public void abortProcess() {
+ LOG.debug("Abort store of Master");
+ masterStoreAbort.countDown();
+ }
+ });
+
+ // startup a fake master the new WAL store will take the lease
+ // and the active master should abort.
+ HMaster backupMaster3 = Mockito.mock(HMaster.class);
+ Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
+ Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
+ final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
+ firstMaster.getMasterFileSystem().getFileSystem(),
+ ((WALProcedureStore)masterStore).getLogDir(),
+ new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
+ // Abort Latch for the test store
+ final CountDownLatch backupStore3Abort = new CountDownLatch(1);
+ backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void postSync() {}
+
+ @Override
+ public void abortProcess() {
+ LOG.debug("Abort store of backupMaster3");
+ backupStore3Abort.countDown();
+ backupStore3.stop(true);
+ }
+ });
+ backupStore3.start(1);
+ backupStore3.recoverLease();
+
+ // Try to trigger a command on the master (WAL lease expired on the active one)
+ HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f");
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null);
+ LOG.debug("submit proc");
+ try {
+ getMasterProcedureExecutor().submitProcedure(
+ new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
+ fail("expected RuntimeException 'sync aborted'");
+ } catch (RuntimeException e) {
+ LOG.info("got " + e.getMessage());
+ }
+ LOG.debug("wait master store abort");
+ masterStoreAbort.await();
+
+ // Now the real backup master should start up
+ LOG.debug("wait backup master to startup");
+ waitBackupMaster(UTIL, firstMaster);
+ assertEquals(true, firstMaster.isStopped());
+
+ // wait the store in here to abort (the test will fail due to timeout if it doesn't)
+ LOG.debug("wait the store to abort");
+ backupStore3.getStoreTracker().setDeleted(1, false);
+ try {
+ backupStore3.delete(1);
+ fail("expected RuntimeException 'sync aborted'");
+ } catch (RuntimeException e) {
+ LOG.info("got " + e.getMessage());
+ }
+ backupStore3Abort.await();
+ }
+
+ /**
+ * Tests proper fencing in case the current WAL store is fenced
+ */
+ @Test
+ public void testWALfencingWithoutWALRolling() throws IOException {
+ testWALfencing(false);
+ }
+
+ /**
+ * Tests proper fencing in case the current WAL store does not receive writes until after the
+ * new WAL does a couple of WAL rolls.
+ */
+ @Test
+ public void testWALfencingWithWALRolling() throws IOException {
+ testWALfencing(true);
+ }
+
+ public void testWALfencing(boolean walRolls) throws IOException {
+ final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
+ assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
+
+ HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
+
+ // cause WAL rolling after a delete in WAL:
+ firstMaster.getConfiguration().setLong("hbase.procedure.store.wal.roll.threshold", 1);
+
+ HMaster backupMaster3 = Mockito.mock(HMaster.class);
+ Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
+ Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
+ final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
+ firstMaster.getMasterFileSystem().getFileSystem(),
+ ((WALProcedureStore)procStore).getLogDir(),
+ new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
+
+ // start a second store which should fence the first one out
+ LOG.info("Starting new WALProcedureStore");
+ procStore2.start(1);
+ procStore2.recoverLease();
+
+ // before writing back to the WAL store, optionally do a couple of WAL rolls (which causes
+ // to delete the old WAL files).
+ if (walRolls) {
+ LOG.info("Inserting into second WALProcedureStore, causing WAL rolls");
+ for (int i = 0; i < 512; i++) {
+ // insert something to the second store then delete it, causing a WAL roll(s)
+ Procedure proc2 = new TestProcedure(i);
+ procStore2.insert(proc2, null);
+ procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later
+ }
+ }
+
+ // Now, insert something to the first store, should fail.
+ // If the store does a WAL roll and continue with another logId without checking higher logIds
+ // it will incorrectly succeed.
+ LOG.info("Inserting into first WALProcedureStore");
+ try {
+ procStore.insert(new TestProcedure(11), null);
+ fail("Inserting into Procedure Store should have failed");
+ } catch (Exception ex) {
+ LOG.info("Received expected exception", ex);
+ }
+ }
+
+ // ==========================================================================
+ // Test Create Table
+ // ==========================================================================
+ @Test(timeout=60000)
+ public void testCreateWithFailover() throws Exception {
+ // TODO: Should we try every step? (master failover takes long time)
+ // It is already covered by TestCreateTableProcedure
+ // but without the master restart, only the executor/store is restarted.
+ // Without Master restart we may not find bug in the procedure code
+ // like missing "wait" for resources to be available (e.g. RS)
+ testCreateWithFailoverAtStep(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS.ordinal());
+ }
+
+ private void testCreateWithFailoverAtStep(final int step) throws Exception {
+ final TableName tableName = TableName.valueOf("testCreateWithFailoverAtStep" + step);
+
+ // create the table
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true);
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true);
+
+ // Start the Create procedure && kill the executor
+ byte[][] splitKeys = null;
+ HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2");
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys);
+ long procId = procExec.submitProcedure(
+ new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
+ testRecoveryAndDoubleExecution(UTIL, procId, step, CreateTableState.values());
+
+ MasterProcedureTestingUtility.validateTableCreation(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ // ==========================================================================
+ // Test Delete Table
+ // ==========================================================================
+ @Test(timeout=60000)
+ public void testDeleteWithFailover() throws Exception {
+ // TODO: Should we try every step? (master failover takes long time)
+ // It is already covered by TestDeleteTableProcedure
+ // but without the master restart, only the executor/store is restarted.
+ // Without Master restart we may not find bug in the procedure code
+ // like missing "wait" for resources to be available (e.g. RS)
+ testDeleteWithFailoverAtStep(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS.ordinal());
+ }
+
+ private void testDeleteWithFailoverAtStep(final int step) throws Exception {
+ final TableName tableName = TableName.valueOf("testDeleteWithFailoverAtStep" + step);
+
+ // create the table
+ byte[][] splitKeys = null;
+ HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+ Path tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+ MasterProcedureTestingUtility.validateTableCreation(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ UTIL.getHBaseAdmin().disableTable(tableName);
+
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true);
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true);
+
+ // Start the Delete procedure && kill the executor
+ long procId = procExec.submitProcedure(
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName));
+ testRecoveryAndDoubleExecution(UTIL, procId, step, DeleteTableState.values());
+
+ MasterProcedureTestingUtility.validateTableDeletion(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ // ==========================================================================
+ // Test Truncate Table
+ // ==========================================================================
+ @Test(timeout=90000)
+ public void testTruncateWithFailover() throws Exception {
+ // TODO: Should we try every step? (master failover takes long time)
+ // It is already covered by TestTruncateTableProcedure
+ // but without the master restart, only the executor/store is restarted.
+ // Without Master restart we may not find bug in the procedure code
+ // like missing "wait" for resources to be available (e.g. RS)
+ testTruncateWithFailoverAtStep(true, TruncateTableState.TRUNCATE_TABLE_ADD_TO_META.ordinal());
+ }
+
+ private void testTruncateWithFailoverAtStep(final boolean preserveSplits, final int step)
+ throws Exception {
+ final TableName tableName = TableName.valueOf("testTruncateWithFailoverAtStep" + step);
+
+ // create the table
+ final String[] families = new String[] { "f1", "f2" };
+ final byte[][] splitKeys = new byte[][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+ };
+ HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, families);
+ // load and verify that there are rows in the table
+ MasterProcedureTestingUtility.loadData(
+ UTIL.getConnection(), tableName, 100, splitKeys, families);
+ assertEquals(100, UTIL.countRows(tableName));
+ // disable the table
+ UTIL.getHBaseAdmin().disableTable(tableName);
+
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the Truncate procedure && kill the executor
+ long procId = procExec.submitProcedure(
+ new TruncateTableProcedure(procExec.getEnvironment(), tableName, preserveSplits));
+ testRecoveryAndDoubleExecution(UTIL, procId, step, TruncateTableState.values());
+
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
+ UTIL.waitUntilAllRegionsAssigned(tableName);
+
+ // validate the table regions and layout
+ if (preserveSplits) {
+ assertEquals(1 + splitKeys.length, UTIL.getHBaseAdmin().getTableRegions(tableName).size());
+ } else {
+ regions = UTIL.getHBaseAdmin().getTableRegions(tableName).toArray(new HRegionInfo[1]);
+ assertEquals(1, regions.length);
+ }
+ MasterProcedureTestingUtility.validateTableCreation(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, families);
+
+ // verify that there are no rows in the table
+ assertEquals(0, UTIL.countRows(tableName));
+
+ // verify that the table is read/writable
+ MasterProcedureTestingUtility.loadData(
+ UTIL.getConnection(), tableName, 50, splitKeys, families);
+ assertEquals(50, UTIL.countRows(tableName));
+ }
+
+ // ==========================================================================
+ // Test Disable Table
+ // ==========================================================================
+ @Test(timeout=60000)
+ public void testDisableTableWithFailover() throws Exception {
+ // TODO: Should we try every step? (master failover takes long time)
+ // It is already covered by TestDisableTableProcedure
+ // but without the master restart, only the executor/store is restarted.
+ // Without Master restart we may not find bug in the procedure code
+ // like missing "wait" for resources to be available (e.g. RS)
+ testDisableTableWithFailoverAtStep(
+ DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE.ordinal());
+ }
+
+ private void testDisableTableWithFailoverAtStep(final int step) throws Exception {
+ final TableName tableName = TableName.valueOf("testDisableTableWithFailoverAtStep" + step);
+
+ // create the table
+ final byte[][] splitKeys = new byte[][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+ };
+ MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the Delete procedure && kill the executor
+ long procId = procExec.submitProcedure(
+ new DisableTableProcedure(procExec.getEnvironment(), tableName, false));
+ testRecoveryAndDoubleExecution(UTIL, procId, step, DisableTableState.values());
+
+ MasterProcedureTestingUtility.validateTableIsDisabled(
+ UTIL.getHBaseCluster().getMaster(), tableName);
+ }
+
+ // ==========================================================================
+ // Test Enable Table
+ // ==========================================================================
+ @Test(timeout=60000)
+ public void testEnableTableWithFailover() throws Exception {
+ // TODO: Should we try every step? (master failover takes long time)
+ // It is already covered by TestEnableTableProcedure
+ // but without the master restart, only the executor/store is restarted.
+ // Without Master restart we may not find bug in the procedure code
+ // like missing "wait" for resources to be available (e.g. RS)
+ testEnableTableWithFailoverAtStep(
+ EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE.ordinal());
+ }
+
+ private void testEnableTableWithFailoverAtStep(final int step) throws Exception {
+ final TableName tableName = TableName.valueOf("testEnableTableWithFailoverAtStep" + step);
+
+ // create the table
+ final byte[][] splitKeys = new byte[][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+ };
+ MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+ UTIL.getHBaseAdmin().disableTable(tableName);
+
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the Delete procedure && kill the executor
+ long procId = procExec.submitProcedure(
+ new EnableTableProcedure(procExec.getEnvironment(), tableName, false));
+ testRecoveryAndDoubleExecution(UTIL, procId, step, EnableTableState.values());
+
+ MasterProcedureTestingUtility.validateTableIsEnabled(
+ UTIL.getHBaseCluster().getMaster(), tableName);
+ }
+
+ // ==========================================================================
+ // Test Helpers
+ // ==========================================================================
+ public static <TState> void testRecoveryAndDoubleExecution(final HBaseTestingUtility testUtil,
+ final long procId, final int lastStepBeforeFailover, TState[] states) throws Exception {
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+ for (int i = 0; i < lastStepBeforeFailover; ++i) {
+ LOG.info("Restart "+ i +" exec state: " + states[i]);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ }
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+
+ LOG.info("Trigger master failover");
+ masterFailover(testUtil);
+
+ procExec = testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+ }
+
+ // ==========================================================================
+ // Master failover utils
+ // ==========================================================================
+ public static void masterFailover(final HBaseTestingUtility testUtil)
+ throws Exception {
+ MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
+
+ // Kill the master
+ HMaster oldMaster = cluster.getMaster();
+ cluster.killMaster(cluster.getMaster().getServerName());
+
+ // Wait the secondary
+ waitBackupMaster(testUtil, oldMaster);
+ }
+
+ public static void waitBackupMaster(final HBaseTestingUtility testUtil,
+ final HMaster oldMaster) throws Exception {
+ MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
+
+ HMaster newMaster = cluster.getMaster();
+ while (newMaster == null || newMaster == oldMaster) {
+ Thread.sleep(250);
+ newMaster = cluster.getMaster();
+ }
+
+ while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
+ Thread.sleep(250);
+ }
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private MasterProcedureEnv getMasterProcedureEnv() {
+ return getMasterProcedureExecutor().getEnvironment();
+ }
+
+ private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+ return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ }
+
+ private FileSystem getFileSystem() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ }
+
+ private Path getRootDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ }
+
+ private Path getTempDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getTempDir();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java
new file mode 100644
index 0000000..fe297edc
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobFlushSnapshotFromClient.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test creating/using/deleting snapshots from the client
+ * <p>
+ * This is an end-to-end test for the snapshot utility
+ *
+ * TODO This is essentially a clone of TestSnapshotFromClient. This is worth refactoring this
+ * because there will be a few more flavors of snapshots that need to run these tests.
+ */
+@Category({ClientTests.class, LargeTests.class})
+public class TestMobFlushSnapshotFromClient extends TestFlushSnapshotFromClient {
+ private static final Log LOG = LogFactory.getLog(TestFlushSnapshotFromClient.class);
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(3);
+ }
+
+ protected static void setupConf(Configuration conf) {
+ TestFlushSnapshotFromClient.setupConf(conf);
+ UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
+ }
+
+ @Override
+ protected void createTable() throws Exception {
+ MobSnapshotTestingUtils.createMobTable(UTIL, TABLE_NAME, 1, TEST_FAM);
+ }
+
+ @Override
+ protected void verifyRowCount(final HBaseTestingUtility util, final TableName tableName,
+ long expectedRows) throws IOException {
+ MobSnapshotTestingUtils.verifyMobRowCount(util, tableName, expectedRows);
+ }
+
+ @Override
+ protected int countRows(final Table table, final byte[]... families) throws IOException {
+ return MobSnapshotTestingUtils.countMobRows(table, families);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
new file mode 100644
index 0000000..67fc60a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -0,0 +1,1320 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.FaultySequenceFileLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Testing {@link WAL} splitting code.
+ */
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestWALSplit {
+ {
+ // Uncomment the following lines if more verbosity is needed for
+ // debugging (see HBASE-12285 for details).
+ //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+ //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ }
+ private final static Log LOG = LogFactory.getLog(TestWALSplit.class);
+
+ private static Configuration conf;
+ private FileSystem fs;
+
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private Path HBASEDIR;
+ private Path WALDIR;
+ private Path OLDLOGDIR;
+ private Path CORRUPTDIR;
+ private Path TABLEDIR;
+
+ private static final int NUM_WRITERS = 10;
+ private static final int ENTRIES = 10; // entries per writer per region
+
+ private static final String FILENAME_BEING_SPLIT = "testfile";
+ private static final TableName TABLE_NAME =
+ TableName.valueOf("t1");
+ private static final byte[] FAMILY = "f1".getBytes();
+ private static final byte[] QUALIFIER = "q1".getBytes();
+ private static final byte[] VALUE = "v1".getBytes();
+ private static final String WAL_FILE_PREFIX = "wal.dat.";
+ private static List<String> REGIONS = new ArrayList<String>();
+ private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
+ private static String ROBBER;
+ private static String ZOMBIE;
+ private static String [] GROUP = new String [] {"supergroup"};
+ private RecoveryMode mode;
+
+ static enum Corruptions {
+ INSERT_GARBAGE_ON_FIRST_LINE,
+ INSERT_GARBAGE_IN_THE_MIDDLE,
+ APPEND_GARBAGE,
+ TRUNCATE,
+ TRUNCATE_TRAILER
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ conf.setClass("hbase.regionserver.hlog.writer.impl",
+ InstrumentedLogWriter.class, Writer.class);
+ conf.setBoolean("dfs.support.broken.append", true);
+ conf.setBoolean("dfs.support.append", true);
+ // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
+ System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
+ // Create fake maping user to group and set it to the conf.
+ Map<String, String []> u2g_map = new HashMap<String, String []>(2);
+ ROBBER = User.getCurrent().getName() + "-robber";
+ ZOMBIE = User.getCurrent().getName() + "-zombie";
+ u2g_map.put(ROBBER, GROUP);
+ u2g_map.put(ZOMBIE, GROUP);
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+ conf.setInt("dfs.heartbeat.interval", 1);
+ TEST_UTIL.startMiniDFSCluster(2);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniDFSCluster();
+ }
+
+ @Rule
+ public TestName name = new TestName();
+ private WALFactory wals = null;
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("Cleaning up cluster for new test.");
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ HBASEDIR = TEST_UTIL.createRootDir();
+ OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
+ CORRUPTDIR = new Path(HBASEDIR, HConstants.CORRUPT_DIR_NAME);
+ TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
+ REGIONS.clear();
+ Collections.addAll(REGIONS, "bbb", "ccc");
+ InstrumentedLogWriter.activateFailure = false;
+ this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+ wals = new WALFactory(conf, null, name.getMethodName());
+ WALDIR = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(name.getMethodName()));
+ //fs.mkdirs(WALDIR);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ wals.close();
+ } catch(IOException exception) {
+ // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
+ LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
+ " you see a failure look here.");
+ LOG.debug("exception details", exception);
+ } finally {
+ wals = null;
+ fs.delete(HBASEDIR, true);
+ }
+ }
+
+ /**
+ * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
+ * Ensures we do not lose edits.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=300000)
+ public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
+ final AtomicLong counter = new AtomicLong(0);
+ AtomicBoolean stop = new AtomicBoolean(false);
+ // Region we'll write edits too and then later examine to make sure they all made it in.
+ final String region = REGIONS.get(0);
+ final int numWriters = 3;
+ Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
+ try {
+ long startCount = counter.get();
+ zombie.start();
+ // Wait till writer starts going.
+ while (startCount == counter.get()) Threads.sleep(1);
+ // Give it a second to write a few appends.
+ Threads.sleep(1000);
+ final Configuration conf2 = HBaseConfiguration.create(this.conf);
+ final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
+ int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
+ @Override
+ public Integer run() throws Exception {
+ StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
+ .append("):\n");
+ for (FileStatus status : fs.listStatus(WALDIR)) {
+ ls.append("\t").append(status.toString()).append("\n");
+ }
+ LOG.debug(ls);
+ LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
+ LOG.info("Finished splitting out from under zombie.");
+ Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+ assertEquals("wrong number of split files for region", numWriters, logfiles.length);
+ int count = 0;
+ for (Path logfile: logfiles) {
+ count += countWAL(logfile);
+ }
+ return count;
+ }
+ });
+ LOG.info("zombie=" + counter.get() + ", robber=" + count);
+ assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
+ "Zombie could write " + counter.get() + " and logfile had only " + count,
+ counter.get() == count || counter.get() + 1 == count);
+ } finally {
+ stop.set(true);
+ zombie.interrupt();
+ Threads.threadDumpingIsAlive(zombie);
+ }
+ }
+
+ /**
+ * This thread will keep writing to a 'wal' file even after the split process has started.
+ * It simulates a region server that was considered dead but woke up and wrote some more to the
+ * last log entry. Does its writing as an alternate user in another filesystem instance to
+ * simulate better it being a regionserver.
+ */
+ class ZombieLastLogWriterRegionServer extends Thread {
+ final AtomicLong editsCount;
+ final AtomicBoolean stop;
+ final int numOfWriters;
+ /**
+ * Region to write edits for.
+ */
+ final String region;
+ final User user;
+
+ public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
+ final String region, final int writers)
+ throws IOException, InterruptedException {
+ super("ZombieLastLogWriterRegionServer");
+ setDaemon(true);
+ this.stop = stop;
+ this.editsCount = counter;
+ this.region = region;
+ this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
+ numOfWriters = writers;
+ }
+
+ @Override
+ public void run() {
+ try {
+ doWriting();
+ } catch (IOException e) {
+ LOG.warn(getName() + " Writer exiting " + e);
+ } catch (InterruptedException e) {
+ LOG.warn(getName() + " Writer exiting " + e);
+ }
+ }
+
+ private void doWriting() throws IOException, InterruptedException {
+ this.user.runAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
+ // index we supply here.
+ int walToKeepOpen = numOfWriters - 1;
+ // The below method writes numOfWriters files each with ENTRIES entries for a total of
+ // numOfWriters * ENTRIES added per column family in the region.
+ Writer writer = null;
+ try {
+ writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
+ } catch (IOException e1) {
+ throw new RuntimeException("Failed", e1);
+ }
+ // Update counter so has all edits written so far.
+ editsCount.addAndGet(numOfWriters * ENTRIES);
+ loop(writer);
+ // If we've been interruped, then things should have shifted out from under us.
+ // closing should error
+ try {
+ writer.close();
+ fail("Writing closing after parsing should give an error.");
+ } catch (IOException exception) {
+ LOG.debug("ignoring error when closing final writer.", exception);
+ }
+ return null;
+ }
+ });
+ }
+
+ private void loop(final Writer writer) {
+ byte [] regionBytes = Bytes.toBytes(this.region);
+ while (!stop.get()) {
+ try {
+ long seq = appendEntry(writer, TABLE_NAME, regionBytes,
+ ("r" + editsCount.get()).getBytes(), regionBytes, QUALIFIER, VALUE, 0);
+ long count = editsCount.incrementAndGet();
+ LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ //
+ }
+ } catch (IOException ex) {
+ LOG.error(getName() + " ex " + ex.toString());
+ if (ex instanceof RemoteException) {
+ LOG.error("Juliet: got RemoteException " + ex.getMessage() +
+ " while writing " + (editsCount.get() + 1));
+ } else {
+ LOG.error(getName() + " failed to write....at " + editsCount.get());
+ fail("Failed to write " + editsCount.get());
+ }
+ break;
+ } catch (Throwable t) {
+ LOG.error(getName() + " HOW? " + t);
+ LOG.debug("exception details", t);
+ break;
+ }
+ }
+ LOG.info(getName() + " Writer exiting");
+ }
+ }
+
+ /**
+ * @throws IOException
+ * @see https://issues.apache.org/jira/browse/HBASE-3020
+ */
+ @Test (timeout=300000)
+ public void testRecoveredEditsPathForMeta() throws IOException {
+ byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
+ Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
+ Path regiondir = new Path(tdir,
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+ fs.mkdirs(regiondir);
+ long now = System.currentTimeMillis();
+ Entry entry =
+ new Entry(new WALKey(encoded,
+ TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
+ new WALEdit());
+ Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
+ FILENAME_BEING_SPLIT);
+ String parentOfParent = p.getParent().getParent().getName();
+ assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+ }
+
+ /**
+ * Test old recovered edits file doesn't break WALSplitter.
+ * This is useful in upgrading old instances.
+ */
+ @Test (timeout=300000)
+ public void testOldRecoveredEditsFileSidelined() throws IOException {
+ byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
+ Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
+ Path regiondir = new Path(tdir,
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+ fs.mkdirs(regiondir);
+ long now = System.currentTimeMillis();
+ Entry entry =
+ new Entry(new WALKey(encoded,
+ TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
+ new WALEdit());
+ Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ assertEquals(parent.getName(), HConstants.RECOVERED_EDITS_DIR);
+ fs.createNewFile(parent); // create a recovered.edits file
+
+ Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
+ FILENAME_BEING_SPLIT);
+ String parentOfParent = p.getParent().getParent().getName();
+ assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+ WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
+ }
+
+ private void useDifferentDFSClient() throws IOException {
+ // make fs act as a different client now
+ // initialize will create a new DFSClient with a new client ID
+ fs.initialize(fs.getUri(), conf);
+ }
+
+ @Test (timeout=300000)
+ public void testSplitPreservesEdits() throws IOException{
+ final String REGION = "region__1";
+ REGIONS.clear();
+ REGIONS.add(REGION);
+
+ generateWALs(1, 10, -1);
+ useDifferentDFSClient();
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
+ Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+ assertEquals(1, splitLog.length);
+
+ assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
+ }
+
+ /**
+ * @param expectedEntries -1 to not assert
+ * @return the count across all regions
+ */
+ private int splitAndCount(final int expectedFiles, final int expectedEntries)
+ throws IOException {
+ useDifferentDFSClient();
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ int result = 0;
+ for (String region : REGIONS) {
+ Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+ assertEquals(expectedFiles, logfiles.length);
+ int count = 0;
+ for (Path logfile: logfiles) {
+ count += countWAL(logfile);
+ }
+ if (-1 != expectedEntries) {
+ assertEquals(expectedEntries, count);
+ }
+ result += count;
+ }
+ return result;
+ }
+
+ @Test (timeout=300000)
+ public void testEmptyLogFiles() throws IOException {
+ testEmptyLogFiles(true);
+ }
+
+ @Test (timeout=300000)
+ public void testEmptyOpenLogFiles() throws IOException {
+ testEmptyLogFiles(false);
+ }
+
+ private void testEmptyLogFiles(final boolean close) throws IOException {
+ // we won't create the hlog dir until getWAL got called, so
+ // make dir here when testing empty log file
+ fs.mkdirs(WALDIR);
+ injectEmptyFile(".empty", close);
+ generateWALs(Integer.MAX_VALUE);
+ injectEmptyFile("empty", close);
+ splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
+ }
+
+ @Test (timeout=300000)
+ public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
+ // generate logs but leave wal.dat.5 open.
+ generateWALs(5);
+ splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
+ }
+
+ @Test (timeout=300000)
+ public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ generateWALs(Integer.MAX_VALUE);
+ corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
+ Corruptions.APPEND_GARBAGE, true);
+ splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
+ }
+
+ @Test (timeout=300000)
+ public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ generateWALs(Integer.MAX_VALUE);
+ corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
+ Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
+ splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
+ }
+
+ @Test (timeout=300000)
+ public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ generateWALs(Integer.MAX_VALUE);
+ corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
+ Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
+ // the entries in the original logs are alternating regions
+ // considering the sequence file header, the middle corruption should
+ // affect at least half of the entries
+ int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
+ int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
+ int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
+ assertTrue("The file up to the corrupted area hasn't been parsed",
+ REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
+ }
+
+ @Test (timeout=300000)
+ public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ for (FaultySequenceFileLogReader.FailureType failureType :
+ FaultySequenceFileLogReader.FailureType.values()) {
+ final Set<String> walDirContents = splitCorruptWALs(failureType);
+ final Set<String> archivedLogs = new HashSet<String>();
+ final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
+ for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
+ archived.append("\n\t").append(log.toString());
+ archivedLogs.add(log.getPath().getName());
+ }
+ LOG.debug(archived.toString());
+ assertEquals(failureType.name() + ": expected to find all of our wals corrupt.",
+ walDirContents, archivedLogs);
+ }
+ }
+
+ /**
+ * @return set of wal names present prior to split attempt.
+ * @throws IOException if the split process fails
+ */
+ private Set<String> splitCorruptWALs(final FaultySequenceFileLogReader.FailureType failureType)
+ throws IOException {
+ Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
+ Reader.class);
+ InstrumentedLogWriter.activateFailure = false;
+
+ try {
+ conf.setClass("hbase.regionserver.hlog.reader.impl",
+ FaultySequenceFileLogReader.class, Reader.class);
+ conf.set("faultysequencefilelogreader.failuretype", failureType.name());
+ // Clean up from previous tests or previous loop
+ try {
+ wals.shutdown();
+ } catch (IOException exception) {
+ // since we're splitting out from under the factory, we should expect some closing failures.
+ LOG.debug("Ignoring problem closing WALFactory.", exception);
+ }
+ wals.close();
+ try {
+ for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
+ fs.delete(log.getPath(), true);
+ }
+ } catch (FileNotFoundException exception) {
+ LOG.debug("no previous CORRUPTDIR to clean.");
+ }
+ // change to the faulty reader
+ wals = new WALFactory(conf, null, name.getMethodName());
+ generateWALs(-1);
+ // Our reader will render all of these files corrupt.
+ final Set<String> walDirContents = new HashSet<String>();
+ for (FileStatus status : fs.listStatus(WALDIR)) {
+ walDirContents.add(status.getPath().getName());
+ }
+ useDifferentDFSClient();
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ return walDirContents;
+ } finally {
+ conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
+ Reader.class);
+ }
+ }
+
+ @Test (timeout=300000, expected = IOException.class)
+ public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
+ throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
+ }
+
+ @Test (timeout=300000)
+ public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
+ throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ try {
+ splitCorruptWALs(FaultySequenceFileLogReader.FailureType.BEGINNING);
+ } catch (IOException e) {
+ LOG.debug("split with 'skip errors' set to 'false' correctly threw");
+ }
+ assertEquals("if skip.errors is false all files should remain in place",
+ NUM_WRITERS, fs.listStatus(WALDIR).length);
+ }
+
+ private void ignoreCorruption(final Corruptions corruption, final int entryCount,
+ final int expectedCount) throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+ final String REGION = "region__1";
+ REGIONS.clear();
+ REGIONS.add(REGION);
+
+ Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
+ generateWALs(1, entryCount, -1);
+ corruptWAL(c1, corruption, true);
+
+ useDifferentDFSClient();
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+
+ Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+ assertEquals(1, splitLog.length);
+
+ int actualCount = 0;
+ Reader in = wals.createReader(fs, splitLog[0]);
+ @SuppressWarnings("unused")
+ Entry entry;
+ while ((entry = in.next()) != null) ++actualCount;
+ assertEquals(expectedCount, actualCount);
+ in.close();
+
+ // should not have stored the EOF files as corrupt
+ FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
+ assertEquals(archivedLogs.length, 0);
+
+ }
+
+ @Test (timeout=300000)
+ public void testEOFisIgnored() throws IOException {
+ int entryCount = 10;
+ ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
+ }
+
+ @Test (timeout=300000)
+ public void testCorruptWALTrailer() throws IOException {
+ int entryCount = 10;
+ ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
+ }
+
+ @Test (timeout=300000)
+ public void testLogsGetArchivedAfterSplit() throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, false);
+ generateWALs(-1);
+ useDifferentDFSClient();
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
+ assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
+ }
+
+ @Test (timeout=300000)
+ public void testSplit() throws IOException {
+ generateWALs(-1);
+ splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
+ }
+
+ @Test (timeout=300000)
+ public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
+ throws IOException {
+ generateWALs(-1);
+ useDifferentDFSClient();
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ FileStatus [] statuses = null;
+ try {
+ statuses = fs.listStatus(WALDIR);
+ if (statuses != null) {
+ fail("Files left in log dir: " +
+ Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
+ }
+ } catch (FileNotFoundException e) {
+ // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
+ }
+ }
+
+ @Test(timeout=300000, expected = IOException.class)
+ public void testSplitWillFailIfWritingToRegionFails() throws Exception {
+ //leave 5th log open so we could append the "trap"
+ Writer writer = generateWALs(4);
+ useDifferentDFSClient();
+
+ String region = "break";
+ Path regiondir = new Path(TABLEDIR, region);
+ fs.mkdirs(regiondir);
+
+ InstrumentedLogWriter.activateFailure = false;
+ appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
+ ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+ writer.close();
+
+ try {
+ InstrumentedLogWriter.activateFailure = true;
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ } catch (IOException e) {
+ assertTrue(e.getMessage().
+ contains("This exception is instrumented and should only be thrown for testing"));
+ throw e;
+ } finally {
+ InstrumentedLogWriter.activateFailure = false;
+ }
+ }
+
+ @Test (timeout=300000)
+ public void testSplitDeletedRegion() throws IOException {
+ REGIONS.clear();
+ String region = "region_that_splits";
+ REGIONS.add(region);
+
+ generateWALs(1);
+ useDifferentDFSClient();
+
+ Path regiondir = new Path(TABLEDIR, region);
+ fs.delete(regiondir, true);
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ assertFalse(fs.exists(regiondir));
+ }
+
+ @Test (timeout=300000)
+ public void testIOEOnOutputThread() throws Exception {
+ conf.setBoolean(HBASE_SKIP_ERRORS, false);
+
+ generateWALs(-1);
+ useDifferentDFSClient();
+ FileStatus[] logfiles = fs.listStatus(WALDIR);
+ assertTrue("There should be some log file",
+ logfiles != null && logfiles.length > 0);
+ // wals with no entries (like the one we don't use in the factory)
+ // won't cause a failure since nothing will ever be written.
+ // pick the largest one since it's most likely to have entries.
+ int largestLogFile = 0;
+ long largestSize = 0;
+ for (int i = 0; i < logfiles.length; i++) {
+ if (logfiles[i].getLen() > largestSize) {
+ largestLogFile = i;
+ largestSize = logfiles[i].getLen();
+ }
+ }
+ assertTrue("There should be some log greater than size 0.", 0 < largestSize);
+ // Set up a splitter that will throw an IOE on the output side
+ WALSplitter logSplitter = new WALSplitter(wals,
+ conf, HBASEDIR, fs, null, null, this.mode) {
+ @Override
+ protected Writer createWriter(Path logfile) throws IOException {
+ Writer mockWriter = Mockito.mock(Writer.class);
+ Mockito.doThrow(new IOException("Injected")).when(
+ mockWriter).append(Mockito.<Entry>any());
+ return mockWriter;
+ }
+ };
+ // Set up a background thread dumper. Needs a thread to depend on and then we need to run
+ // the thread dumping in a background thread so it does not hold up the test.
+ final AtomicBoolean stop = new AtomicBoolean(false);
+ final Thread someOldThread = new Thread("Some-old-thread") {
+ @Override
+ public void run() {
+ while(!stop.get()) Threads.sleep(10);
+ }
+ };
+ someOldThread.setDaemon(true);
+ someOldThread.start();
+ final Thread t = new Thread("Background-thread-dumper") {
+ public void run() {
+ try {
+ Threads.threadDumpingIsAlive(someOldThread);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ try {
+ logSplitter.splitLogFile(logfiles[largestLogFile], null);
+ fail("Didn't throw!");
+ } catch (IOException ioe) {
+ assertTrue(ioe.toString().contains("Injected"));
+ } finally {
+ // Setting this to true will turn off the background thread dumper.
+ stop.set(true);
+ }
+ }
+
+ /**
+ * @param spiedFs should be instrumented for failure.
+ */
+ private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
+ generateWALs(-1);
+ useDifferentDFSClient();
+
+ try {
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
+ assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
+ assertFalse(fs.exists(WALDIR));
+ } catch (IOException e) {
+ fail("There shouldn't be any exception but: " + e.toString());
+ }
+ }
+
+ // Test for HBASE-3412
+ @Test (timeout=300000)
+ public void testMovedWALDuringRecovery() throws Exception {
+ // This partial mock will throw LEE for every file simulating
+ // files that were moved
+ FileSystem spiedFs = Mockito.spy(fs);
+ // The "File does not exist" part is very important,
+ // that's how it comes out of HDFS
+ Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
+ when(spiedFs).append(Mockito.<Path>any());
+ retryOverHdfsProblem(spiedFs);
+ }
+
+ @Test (timeout=300000)
+ public void testRetryOpenDuringRecovery() throws Exception {
+ FileSystem spiedFs = Mockito.spy(fs);
+ // The "Cannot obtain block length", "Could not obtain the last block",
+ // and "Blocklist for [^ ]* has changed.*" part is very important,
+ // that's how it comes out of HDFS. If HDFS changes the exception
+ // message, this test needs to be adjusted accordingly.
+ //
+ // When DFSClient tries to open a file, HDFS needs to locate
+ // the last block of the file and get its length. However, if the
+ // last block is under recovery, HDFS may have problem to obtain
+ // the block length, in which case, retry may help.
+ Mockito.doAnswer(new Answer<FSDataInputStream>() {
+ private final String[] errors = new String[] {
+ "Cannot obtain block length", "Could not obtain the last block",
+ "Blocklist for " + OLDLOGDIR + " has changed"};
+ private int count = 0;
+
+ public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+ if (count < 3) {
+ throw new IOException(errors[count++]);
+ }
+ return (FSDataInputStream)invocation.callRealMethod();
+ }
+ }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
+ retryOverHdfsProblem(spiedFs);
+ }
+
+ @Test (timeout=300000)
+ public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
+ generateWALs(1, 10, -1);
+ FileStatus logfile = fs.listStatus(WALDIR)[0];
+ useDifferentDFSClient();
+
+ final AtomicInteger count = new AtomicInteger();
+
+ CancelableProgressable localReporter
+ = new CancelableProgressable() {
+ @Override
+ public boolean progress() {
+ count.getAndIncrement();
+ return false;
+ }
+ };
+
+ FileSystem spiedFs = Mockito.spy(fs);
+ Mockito.doAnswer(new Answer<FSDataInputStream>() {
+ public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(1500); // Sleep a while and wait report status invoked
+ return (FSDataInputStream)invocation.callRealMethod();
+ }
+ }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
+
+ try {
+ conf.setInt("hbase.splitlog.report.period", 1000);
+ boolean ret = WALSplitter.splitLogFile(
+ HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, this.mode, wals);
+ assertFalse("Log splitting should failed", ret);
+ assertTrue(count.get() > 0);
+ } catch (IOException e) {
+ fail("There shouldn't be any exception but: " + e.toString());
+ } finally {
+ // reset it back to its default value
+ conf.setInt("hbase.splitlog.report.period", 59000);
+ }
+ }
+
+ /**
+ * Test log split process with fake data and lots of edits to trigger threading
+ * issues.
+ */
+ @Test (timeout=300000)
+ public void testThreading() throws Exception {
+ doTestThreading(20000, 128*1024*1024, 0);
+ }
+
+ /**
+ * Test blocking behavior of the log split process if writers are writing slower
+ * than the reader is reading.
+ */
+ @Test (timeout=300000)
+ public void testThreadingSlowWriterSmallBuffer() throws Exception {
+ doTestThreading(200, 1024, 50);
+ }
+
+ /**
+ * Sets up a log splitter with a mock reader and writer. The mock reader generates
+ * a specified number of edits spread across 5 regions. The mock writer optionally
+ * sleeps for each edit it is fed.
+ * *
+ * After the split is complete, verifies that the statistics show the correct number
+ * of edits output into each region.
+ *
+ * @param numFakeEdits number of fake edits to push through pipeline
+ * @param bufferSize size of in-memory buffer
+ * @param writerSlowness writer threads will sleep this many ms per edit
+ */
+ private void doTestThreading(final int numFakeEdits,
+ final int bufferSize,
+ final int writerSlowness) throws Exception {
+
+ Configuration localConf = new Configuration(conf);
+ localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
+
+ // Create a fake log file (we'll override the reader to produce a stream of edits)
+ Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
+ FSDataOutputStream out = fs.create(logPath);
+ out.close();
+
+ // Make region dirs for our destination regions so the output doesn't get skipped
+ final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
+ makeRegionDirs(regions);
+
+ // Create a splitter that reads and writes the data without touching disk
+ WALSplitter logSplitter = new WALSplitter(wals,
+ localConf, HBASEDIR, fs, null, null, this.mode) {
+
+ /* Produce a mock writer that doesn't write anywhere */
+ @Override
+ protected Writer createWriter(Path logfile) throws IOException {
+ Writer mockWriter = Mockito.mock(Writer.class);
+ Mockito.doAnswer(new Answer<Void>() {
+ int expectedIndex = 0;
+
+ @Override
+ public Void answer(InvocationOnMock invocation) {
+ if (writerSlowness > 0) {
+ try {
+ Thread.sleep(writerSlowness);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ Entry entry = (Entry) invocation.getArguments()[0];
+ WALEdit edit = entry.getEdit();
+ List<Cell> cells = edit.getCells();
+ assertEquals(1, cells.size());
+ Cell cell = cells.get(0);
+
+ // Check that the edits come in the right order.
+ assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength()));
+ expectedIndex++;
+ return null;
+ }
+ }).when(mockWriter).append(Mockito.<Entry>any());
+ return mockWriter;
+ }
+
+ /* Produce a mock reader that generates fake entries */
+ @Override
+ protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
+ throws IOException {
+ Reader mockReader = Mockito.mock(Reader.class);
+ Mockito.doAnswer(new Answer<Entry>() {
+ int index = 0;
+
+ @Override
+ public Entry answer(InvocationOnMock invocation) throws Throwable {
+ if (index >= numFakeEdits) return null;
+
+ // Generate r0 through r4 in round robin fashion
+ int regionIdx = index % regions.size();
+ byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
+
+ Entry ret = createTestEntry(TABLE_NAME, region,
+ Bytes.toBytes((int)(index / regions.size())),
+ FAMILY, QUALIFIER, VALUE, index);
+ index++;
+ return ret;
+ }
+ }).when(mockReader).next();
+ return mockReader;
+ }
+ };
+
+ logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
+
+ // Verify number of written edits per region
+ Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
+ for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
+ LOG.info("Got " + entry.getValue() + " output edits for region " +
+ Bytes.toString(entry.getKey()));
+ assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
+ }
+ assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
+ }
+
+ // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
+ @Test (timeout=300000)
+ public void testSplitLogFileDeletedRegionDir() throws IOException {
+ LOG.info("testSplitLogFileDeletedRegionDir");
+ final String REGION = "region__1";
+ REGIONS.clear();
+ REGIONS.add(REGION);
+
+ generateWALs(1, 10, -1);
+ useDifferentDFSClient();
+
+ Path regiondir = new Path(TABLEDIR, REGION);
+ LOG.info("Region directory is" + regiondir);
+ fs.delete(regiondir, true);
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ assertFalse(fs.exists(regiondir));
+ }
+
+ @Test (timeout=300000)
+ public void testSplitLogFileEmpty() throws IOException {
+ LOG.info("testSplitLogFileEmpty");
+ // we won't create the hlog dir until getWAL got called, so
+ // make dir here when testing empty log file
+ fs.mkdirs(WALDIR);
+ injectEmptyFile(".empty", true);
+ useDifferentDFSClient();
+
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+ Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
+ assertFalse(fs.exists(tdir));
+
+ assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
+ }
+
+ @Test (timeout=300000)
+ public void testSplitLogFileMultipleRegions() throws IOException {
+ LOG.info("testSplitLogFileMultipleRegions");
+ generateWALs(1, 10, -1);
+ splitAndCount(1, 10);
+ }
+
+ @Test (timeout=300000)
+ public void testSplitLogFileFirstLineCorruptionLog()
+ throws IOException {
+ conf.setBoolean(HBASE_SKIP_ERRORS, true);
+ generateWALs(1, 10, -1);
+ FileStatus logfile = fs.listStatus(WALDIR)[0];
+
+ corruptWAL(logfile.getPath(),
+ Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
+
+ useDifferentDFSClient();
+ WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+
+ final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
+ "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
+ assertEquals(1, fs.listStatus(corruptDir).length);
+ }
+
+ /**
+ * @throws IOException
+ * @see https://issues.apache.org/jira/browse/HBASE-4862
+ */
+ @Test (timeout=300000)
+ public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
+ LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
+ // Generate wals for our destination region
+ String regionName = "r0";
+ final Path regiondir = new Path(TABLEDIR, regionName);
+ REGIONS.clear();
+ REGIONS.add(regionName);
+ generateWALs(-1);
+
+ wals.getWAL(Bytes.toBytes(regionName), null);
+ FileStatus[] logfiles = fs.listStatus(WALDIR);
+ assertTrue("There should be some log file",
+ logfiles != null && logfiles.length > 0);
+
+ WALSplitter logSplitter = new WALSplitter(wals,
+ conf, HBASEDIR, fs, null, null, this.mode) {
+ @Override
+ protected Writer createWriter(Path logfile)
+ throws IOException {
+ Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile);
+ // After creating writer, simulate region's
+ // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
+ // region and delete them, excluding files with '.temp' suffix.
+ NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+ if (files != null && !files.isEmpty()) {
+ for (Path file : files) {
+ if (!this.fs.delete(file, false)) {
+ LOG.error("Failed delete of " + file);
+ } else {
+ LOG.debug("Deleted recovered.edits file=" + file);
+ }
+ }
+ }
+ return writer;
+ }
+ };
+ try{
+ logSplitter.splitLogFile(logfiles[0], null);
+ } catch (IOException e) {
+ LOG.info(e);
+ fail("Throws IOException when spliting "
+ + "log, it is most likely because writing file does not "
+ + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
+ }
+ if (fs.exists(CORRUPTDIR)) {
+ if (fs.listStatus(CORRUPTDIR).length > 0) {
+ fail("There are some corrupt logs, "
+ + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
+ }
+ }
+ }
+
+ private Writer generateWALs(int leaveOpen) throws IOException {
+ return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen);
+ }
+
+ private void makeRegionDirs(List<String> regions) throws IOException {
+ for (String region : regions) {
+ LOG.debug("Creating dir for region " + region);
+ fs.mkdirs(new Path(TABLEDIR, region));
+ }
+ }
+
+ /**
+ * @param leaveOpen index to leave un-closed. -1 to close all.
+ * @return the writer that's still open, or null if all were closed.
+ */
+ private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
+ makeRegionDirs(REGIONS);
+ fs.mkdirs(WALDIR);
+ Writer [] ws = new Writer[writers];
+ int seq = 0;
+ for (int i = 0; i < writers; i++) {
+ ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
+ for (int j = 0; j < entries; j++) {
+ int prefix = 0;
+ for (String region : REGIONS) {
+ String row_key = region + prefix++ + i + j;
+ appendEntry(ws[i], TABLE_NAME, region.getBytes(), row_key.getBytes(), FAMILY, QUALIFIER,
+ VALUE, seq++);
+ }
+ }
+ if (i != leaveOpen) {
+ ws[i].close();
+ LOG.info("Closing writer " + i);
+ }
+ }
+ if (leaveOpen < 0 || leaveOpen >= writers) {
+ return null;
+ }
+ return ws[leaveOpen];
+ }
+
+ private Path[] getLogForRegion(Path rootdir, TableName table, String region)
+ throws IOException {
+ Path tdir = FSUtils.getTableDir(rootdir, table);
+ @SuppressWarnings("deprecation")
+ Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
+ Bytes.toString(region.getBytes())));
+ FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ if (WALSplitter.isSequenceIdFile(p)) {
+ return false;
+ }
+ return true;
+ }
+ });
+ Path[] paths = new Path[files.length];
+ for (int i = 0; i < files.length; i++) {
+ paths[i] = files[i].getPath();
+ }
+ return paths;
+ }
+
+ private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
+ FSDataOutputStream out;
+ int fileSize = (int) fs.listStatus(path)[0].getLen();
+
+ FSDataInputStream in = fs.open(path);
+ byte[] corrupted_bytes = new byte[fileSize];
+ in.readFully(0, corrupted_bytes, 0, fileSize);
+ in.close();
+
+ switch (corruption) {
+ case APPEND_GARBAGE:
+ fs.delete(path, false);
+ out = fs.create(path);
+ out.write(corrupted_bytes);
+ out.write("-----".getBytes());
+ closeOrFlush(close, out);
+ break;
+
+ case INSERT_GARBAGE_ON_FIRST_LINE:
+ fs.delete(path, false);
+ out = fs.create(path);
+ out.write(0);
+ out.write(corrupted_bytes);
+ closeOrFlush(close, out);
+ break;
+
+ case INSERT_GARBAGE_IN_THE_MIDDLE:
+ fs.delete(path, false);
+ out = fs.create(path);
+ int middle = (int) Math.floor(corrupted_bytes.length / 2);
+ out.write(corrupted_bytes, 0, middle);
+ out.write(0);
+ out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
+ closeOrFlush(close, out);
+ break;
+
+ case TRUNCATE:
+ fs.delete(path, false);
+ out = fs.create(path);
+ out.write(corrupted_bytes, 0, fileSize
+ - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
+ closeOrFlush(close, out);
+ break;
+
+ case TRUNCATE_TRAILER:
+ fs.delete(path, false);
+ out = fs.create(path);
+ out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
+ closeOrFlush(close, out);
+ break;
+ }
+ }
+
+ private void closeOrFlush(boolean close, FSDataOutputStream out)
+ throws IOException {
+ if (close) {
+ out.close();
+ } else {
+ Method syncMethod = null;
+ try {
+ syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
+ } catch (NoSuchMethodException e) {
+ try {
+ syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
+ } catch (NoSuchMethodException ex) {
+ throw new IOException("This version of Hadoop supports " +
+ "neither Syncable.sync() nor Syncable.hflush().");
+ }
+ }
+ try {
+ syncMethod.invoke(out, new Object[]{});
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ // Not in 0out.hflush();
+ }
+ }
+
+ private int countWAL(Path log) throws IOException {
+ int count = 0;
+ Reader in = wals.createReader(fs, log);
+ while (in.next() != null) {
+ count++;
+ }
+ in.close();
+ return count;
+ }
+
+ public static long appendEntry(Writer writer, TableName table, byte[] region,
+ byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, long seq)
+ throws IOException {
+ LOG.info(Thread.currentThread().getName() + " append");
+ writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
+ LOG.info(Thread.currentThread().getName() + " sync");
+ writer.sync();
+ return seq;
+ }
+
+ private static Entry createTestEntry(
+ TableName table, byte[] region,
+ byte[] row, byte[] family, byte[] qualifier,
+ byte[] value, long seq) {
+ long time = System.nanoTime();
+ WALEdit edit = new WALEdit();
+ seq++;
+ edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
+ return new Entry(new WALKey(region, table, seq, time,
+ HConstants.DEFAULT_CLUSTER_ID), edit);
+ }
+
+ private void injectEmptyFile(String suffix, boolean closeFile)
+ throws IOException {
+ Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix),
+ conf);
+ if (closeFile) writer.close();
+ }
+
+ private boolean logsAreEqual(Path p1, Path p2) throws IOException {
+ Reader in1, in2;
+ in1 = wals.createReader(fs, p1);
+ in2 = wals.createReader(fs, p2);
+ Entry entry1;
+ Entry entry2;
+ while ((entry1 = in1.next()) != null) {
+ entry2 = in2.next();
+ if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
+ (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
+ return false;
+ }
+ }
+ in1.close();
+ in2.close();
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
new file mode 100644
index 0000000..f47951a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitCompressed.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestWALSplitCompressed extends TestWALSplit {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestWALSplit.setUpBeforeClass();
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/30cec72f/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
new file mode 100644
index 0000000..3f4af05
--- /dev/null
+++ b/hbase-shell/src/test/java/org/apache/hadoop/hbase/client/TestReplicationShell.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.jruby.embed.PathType;
+import org.junit.Test;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, LargeTests.class })
+public class TestReplicationShell extends AbstractTestShell {
+ @Ignore ("Disabled because hangs on occasion.. about 10% of the time") @Test
+ public void testRunShellTests() throws IOException {
+ System.setProperty("shell.test.include", "replication_admin_test.rb");
+ // Start all ruby tests
+ jruby.runScriptlet(PathType.ABSOLUTE, "src/test/ruby/tests_runner.rb");
+ }
+}
\ No newline at end of file