You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/27 10:21:18 UTC

[32/50] [abbrv] hbase git commit: HBASE-19258 IntegrationTest for Backup and Restore

HBASE-19258 IntegrationTest for Backup and Restore

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/64ef1208
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/64ef1208
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/64ef1208

Branch: refs/heads/HBASE-19064
Commit: 64ef12080899dad4ab60652a76c68fb59ded04f2
Parents: ff5250c
Author: Vladimir Rodionov <vr...@hortonworks.com>
Authored: Mon Mar 26 14:23:34 2018 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Mon Mar 26 15:46:30 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/backup/impl/BackupManager.java |  40 ++-
 .../hbase/IntegrationTestBackupRestore.java     | 316 +++++++++++++------
 2 files changed, 234 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/64ef1208/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
index 8bebc91..90677fe 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManager.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupHFileCleaner;
 import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
 import org.apache.hadoop.hbase.backup.BackupObserver;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.backup.regionserver.LogRollRegionServerProcedureM
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -120,9 +122,13 @@ public class BackupManager implements Closeable {
         classes + "," + masterProcedureClass);
     }
 
+    plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
+    conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, (plugins == null ? "" : plugins + ",") +
+      BackupHFileCleaner.class.getName());
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Added log cleaner: " + cleanerClass + "\n" + "Added master procedure manager: "
-          + masterProcedureClass);
+      LOG.debug("Added log cleaner: {}. Added master procedure manager: {}."
+        +"Added master procedure manager: {}", cleanerClass, masterProcedureClass,
+        BackupHFileCleaner.class.getName());
     }
   }
 
@@ -150,8 +156,8 @@ public class BackupManager implements Closeable {
     conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
       (coproc == null ? "" : coproc + ",") + regionObserverClass);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Added region procedure manager: " + regionProcedureClass
-        + ". Added region observer: " + regionObserverClass);
+      LOG.debug("Added region procedure manager: {}. Added region observer: {}",
+        regionProcedureClass, regionObserverClass);
     }
   }
 
@@ -222,7 +228,7 @@ public class BackupManager implements Closeable {
           tableList.add(hTableDescriptor.getTableName());
         }
 
-        LOG.info("Full backup all the tables available in the cluster: " + tableList);
+        LOG.info("Full backup all the tables available in the cluster: {}", tableList);
       }
     }
 
@@ -256,9 +262,9 @@ public class BackupManager implements Closeable {
   public void initialize() throws IOException {
     String ongoingBackupId = this.getOngoingBackupId();
     if (ongoingBackupId != null) {
-      LOG.info("There is a ongoing backup " + ongoingBackupId
-        + ". Can not launch new backup until no ongoing backup remains.");
-      throw new BackupException("There is ongoing backup.");
+      LOG.info("There is a ongoing backup {}"
+        + ". Can not launch new backup until no ongoing backup remains.", ongoingBackupId);
+      throw new BackupException("There is ongoing backup seesion.");
     }
   }
 
@@ -273,7 +279,7 @@ public class BackupManager implements Closeable {
    * @throws IOException exception
    */
   public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo) throws IOException {
-    LOG.debug("Getting the direct ancestors of the current backup " + backupInfo.getBackupId());
+    LOG.debug("Getting the direct ancestors of the current backup {}", backupInfo.getBackupId());
 
     ArrayList<BackupImage> ancestors = new ArrayList<>();
 
@@ -309,25 +315,25 @@ public class BackupManager implements Closeable {
         if (BackupManifest.canCoverImage(ancestors, image)) {
           LOG.debug("Met the backup boundary of the current table set:");
           for (BackupImage image1 : ancestors) {
-            LOG.debug("  BackupID=" + image1.getBackupId() + ", BackupDir=" + image1.getRootDir());
+            LOG.debug("  BackupID={}, BackupDir={}", image1.getBackupId(),  image1.getRootDir());
           }
         } else {
           Path logBackupPath =
               HBackupFileSystem.getBackupPath(backup.getBackupRootDir(), backup.getBackupId());
           LOG.debug("Current backup has an incremental backup ancestor, "
-              + "touching its image manifest in " + logBackupPath.toString()
-              + " to construct the dependency.");
+              + "touching its image manifest in {}"
+              + " to construct the dependency.", logBackupPath.toString());
           BackupManifest lastIncrImgManifest = new BackupManifest(conf, logBackupPath);
           BackupImage lastIncrImage = lastIncrImgManifest.getBackupImage();
           ancestors.add(lastIncrImage);
 
           LOG.debug(
-            "Last dependent incremental backup image: " + "{BackupID=" + lastIncrImage.getBackupId()
-            + "," + "BackupDir=" + lastIncrImage.getRootDir() + "}");
+            "Last dependent incremental backup image: {BackupID={}" +
+                "BackupDir={}}", lastIncrImage.getBackupId(), lastIncrImage.getRootDir());
         }
       }
     }
-    LOG.debug("Got " + ancestors.size() + " ancestors for the current backup.");
+    LOG.debug("Got {} ancestors for the current backup.", ancestors.size());
     return ancestors;
   }
 
@@ -391,8 +397,8 @@ public class BackupManager implements Closeable {
           if (lastWarningOutputTime == 0
               || (System.currentTimeMillis() - lastWarningOutputTime) > 60000) {
             lastWarningOutputTime = System.currentTimeMillis();
-            LOG.warn("Waiting to acquire backup exclusive lock for "
-                + (lastWarningOutputTime - startTime) / 1000 + "s");
+            LOG.warn("Waiting to acquire backup exclusive lock for {}s",
+                +(lastWarningOutputTime - startTime) / 1000);
           }
         } else {
           throw e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/64ef1208/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
index c28895c..7d5fca5 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.hbase;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -38,13 +40,22 @@ import org.apache.hadoop.hbase.backup.RestoreRequest;
 import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
+import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+import org.apache.hadoop.hbase.chaos.policies.Policy;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -52,7 +63,6 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
  * An integration test to detect regressions in HBASE-7912. Create
@@ -65,18 +75,31 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 public class IntegrationTestBackupRestore extends IntegrationTestBase {
   private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName();
   protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class);
-  protected static final TableName TABLE_NAME1 = TableName.valueOf(CLASS_NAME + ".table1");
-  protected static final TableName TABLE_NAME2 = TableName.valueOf(CLASS_NAME + ".table2");
+  protected static final String NUMBER_OF_TABLES_KEY = "num_tables";
   protected static final String COLUMN_NAME = "f";
   protected static final String REGION_COUNT_KEY = "regions_per_rs";
   protected static final String REGIONSERVER_COUNT_KEY = "region_servers";
+  protected static final String ROWS_PER_ITERATION_KEY = "rows_in_iteration";
+  protected static final String NUM_ITERATIONS_KEY = "num_iterations";
   protected static final int DEFAULT_REGION_COUNT = 10;
-  protected static final int DEFAULT_REGIONSERVER_COUNT = 2;
+  protected static final int DEFAULT_REGIONSERVER_COUNT = 5;
+  protected static final int DEFAULT_NUMBER_OF_TABLES = 1;
+  protected static final int DEFAULT_NUM_ITERATIONS = 10;
+  protected static final int DEFAULT_ROWS_IN_ITERATION = 500000;
+  protected static final String SLEEP_TIME_KEY = "sleeptime";
+  // short default interval because tests don't run very long.
+  protected static final long SLEEP_TIME_DEFAULT = 50000L;
+
+  protected static int rowsInIteration;
   protected static int regionsCountPerServer;
   protected static int regionServerCount;
-  protected static final String NB_ROWS_IN_BATCH_KEY = "rows_in_batch";
-  protected static final int DEFAULT_NB_ROWS_IN_BATCH = 20000;
-  private static int rowsInBatch;
+
+  protected static int numIterations;
+  protected static int numTables;
+  protected static TableName[] tableNames;
+  protected long sleepTime;
+  protected static Object lock = new Object();
+
   private static String BACKUP_ROOT_DIR = "backupIT";
 
   @Override
@@ -87,24 +110,22 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
     regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT);
     regionServerCount =
         conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT);
-    rowsInBatch = conf.getInt(NB_ROWS_IN_BATCH_KEY, DEFAULT_NB_ROWS_IN_BATCH);
+    rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION);
+    numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS);
+    numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES);
+    sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT);
     enableBackup(conf);
-    LOG.info(String.format("Initializing cluster with %d region servers.", regionServerCount));
+    LOG.info("Initializing cluster with {} region servers.", regionServerCount);
     util.initializeCluster(regionServerCount);
-    LOG.info("Cluster initialized");
-    util.deleteTableIfAny(TABLE_NAME1);
-    util.deleteTableIfAny(TABLE_NAME2);
-    LOG.info("Cluster ready");
+    LOG.info("Cluster initialized and ready");
   }
 
   @After
   public void tearDown() throws IOException {
     LOG.info("Cleaning up after test.");
     if(util.isDistributedCluster()) {
-      util.deleteTableIfAny(TABLE_NAME1);
-      LOG.info("Cleaning up after test. TABLE1 done");
-      util.deleteTableIfAny(TABLE_NAME2);
-      LOG.info("Cleaning up after test. TABLE2 done");
+      deleteTablesIfAny();
+      LOG.info("Cleaning up after test. Deleted tables");
       cleanUpBackupDir();
     }
     LOG.info("Restoring cluster.");
@@ -112,6 +133,30 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
     LOG.info("Cluster restored.");
   }
 
+  @Override
+  public void setUpMonkey() throws Exception {
+    Policy p = new PeriodicRandomActionPolicy(sleepTime,
+      new RestartRandomRsExceptMetaAction(sleepTime));
+    this.monkey = new PolicyBasedChaosMonkey(util, p);
+    startMonkey();
+  }
+
+  private void deleteTablesIfAny() throws IOException {
+    for (TableName table : tableNames) {
+      util.deleteTableIfAny(table);
+    }
+  }
+
+  private void createTables() throws Exception {
+    tableNames = new TableName[numTables];
+    for (int i = 0; i < numTables; i++) {
+      tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i);
+    }
+    for (TableName table : tableNames) {
+      createTable(table);
+    }
+  }
+
   private void enableBackup(Configuration conf) {
     // Enable backup
     conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
@@ -127,25 +172,53 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
   @Test
   public void testBackupRestore() throws Exception {
     BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR;
-    createTable(TABLE_NAME1);
-    createTable(TABLE_NAME2);
-    runTest();
+    createTables();
+    runTestMulti();
+  }
+
+  private void runTestMulti() throws IOException {
+    LOG.info("IT backup & restore started");
+    Thread[] workers = new Thread[numTables];
+    for (int i = 0; i < numTables; i++) {
+      final TableName table = tableNames[i];
+      Runnable r = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            runTestSingle(table);
+          } catch (IOException e) {
+            LOG.error("Failed", e);
+            Assert.fail(e.getMessage());
+          }
+        }
+      };
+      workers[i] = new Thread(r);
+      workers[i].start();
+    }
+    // Wait all workers to finish
+    for (Thread t : workers) {
+      Uninterruptibles.joinUninterruptibly(t);
+    }
+    LOG.info("IT backup & restore finished");
   }
 
   private void createTable(TableName tableName) throws Exception {
     long startTime, endTime;
-    HTableDescriptor desc = new HTableDescriptor(tableName);
-    HColumnDescriptor[] columns =
-        new HColumnDescriptor[]{new HColumnDescriptor(COLUMN_NAME)};
-    LOG.info(String.format("Creating table %s with %d splits.", tableName,
-      regionsCountPerServer));
+
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+
+    TableDescriptor desc = builder.build();
+    ColumnFamilyDescriptorBuilder cbuilder =
+        ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset()));
+    ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() };
+    LOG.info("Creating table {} with {} splits.", tableName,
+      regionsCountPerServer * regionServerCount);
     startTime = System.currentTimeMillis();
     HBaseTestingUtility.createPreSplitLoadTestTable(util.getConfiguration(), desc, columns,
       regionsCountPerServer);
     util.waitTableAvailable(tableName);
     endTime = System.currentTimeMillis();
-    LOG.info(String.format("Pre-split table created successfully in %dms.",
-      (endTime - startTime)));
+    LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime));
   }
 
   private void loadData(TableName table, int numRows) throws IOException {
@@ -157,77 +230,102 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
     conn.getAdmin().flush(TableName.valueOf(table.getName()));
   }
 
-  private void runTest() throws IOException {
-    // Check if backup is enabled
-    if (!BackupManager.isBackupEnabled(getConf())) {
-      LOG.error(BackupRestoreConstants.ENABLE_BACKUP);
-      System.exit(EXIT_FAILURE);
-    }
+  private String backup(BackupRequest request, BackupAdmin client)
+      throws IOException {
+    String backupId = client.backupTables(request);
+    return backupId;
+  }
+
+  private void restore(RestoreRequest request, BackupAdmin client)
+      throws IOException {
+    client.restore(request);
+  }
+
+  private void merge(String[] backupIds, BackupAdmin client)
+      throws IOException {
+    client.mergeBackups(backupIds);
+  }
+
+  private void runTestSingle(TableName table) throws IOException {
 
-    LOG.info(BackupRestoreConstants.VERIFY_BACKUP);
+    List<String> backupIds = new ArrayList<String>();
+    List<Integer> tableSizes = new ArrayList<Integer>();
 
     try (Connection conn = util.getConnection();
-         Admin admin = conn.getAdmin();
-         BackupAdmin client = new BackupAdminImpl(conn)) {
-      // #0- insert some data to table TABLE_NAME1, TABLE_NAME2
-      loadData(TABLE_NAME1, rowsInBatch);
-      loadData(TABLE_NAME2, rowsInBatch);
-      // #1 - create full backup for all tables
-      LOG.info("create full backup image for all tables");
-      List<TableName> tables = Lists.newArrayList(TABLE_NAME1, TABLE_NAME2);
+        Admin admin = conn.getAdmin();
+        BackupAdmin client = new BackupAdminImpl(conn);) {
 
+      // #0- insert some data to table 'table'
+      loadData(table, rowsInIteration);
+      tableSizes.add(rowsInIteration);
+
+      // #1 - create full backup for table first
+      LOG.info("create full backup image for {}", table);
+      List<TableName> tables = Lists.newArrayList(table);
       BackupRequest.Builder builder = new BackupRequest.Builder();
-      BackupRequest request =
-          builder.withBackupType(BackupType.FULL).withTableList(tables)
-              .withTargetRootDir(BACKUP_ROOT_DIR).build();
-      String backupIdFull = client.backupTables(request);
+      BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables)
+          .withTargetRootDir(BACKUP_ROOT_DIR).build();
+
+      String backupIdFull = backup(request, client);
       assertTrue(checkSucceeded(backupIdFull));
-      // #2 - insert some data to table
-      loadData(TABLE_NAME1, rowsInBatch);
-      loadData(TABLE_NAME2, rowsInBatch);
 
-      try (HTable t1 = (HTable) conn.getTable(TABLE_NAME1)) {
-        Assert.assertEquals(util.countRows(t1), rowsInBatch * 2);
-      }
-      try (HTable t2 = (HTable) conn.getTable(TABLE_NAME2)) {
-        Assert.assertEquals(util.countRows(t2), rowsInBatch * 2);
-      }
+      backupIds.add(backupIdFull);
+      // Now continue with incremental backups
+      int count = 1;
+      while (count++ < numIterations) {
 
-      // #3 - incremental backup for tables
-      tables = Lists.newArrayList(TABLE_NAME1, TABLE_NAME2);
-      builder = new BackupRequest.Builder();
-      request =
-          builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
-              .withTargetRootDir(BACKUP_ROOT_DIR).build();
-      String backupIdIncMultiple = client.backupTables(request);
-      assertTrue(checkSucceeded(backupIdIncMultiple));
-      // #4 - restore full backup for all tables, without overwrite
-      TableName[] tablesRestoreFull = new TableName[] { TABLE_NAME1, TABLE_NAME2 };
-      client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdFull, false, tablesRestoreFull,
-        null, true));
-      // #5.1 - check tables for full restore
-      assertTrue(admin.tableExists(TABLE_NAME1));
-      assertTrue(admin.tableExists(TABLE_NAME2));
-      // #5.2 - checking row count of tables for full restore
-      HTable hTable = (HTable) conn.getTable(TABLE_NAME1);
-      Assert.assertEquals(util.countRows(hTable), rowsInBatch);
-      hTable.close();
-      hTable = (HTable) conn.getTable(TABLE_NAME2);
-      Assert.assertEquals(util.countRows(hTable), rowsInBatch);
-      hTable.close();
-      // #6 - restore incremental backup for multiple tables, with overwrite
-      TableName[] tablesRestoreIncMultiple = new TableName[] { TABLE_NAME1, TABLE_NAME2 };
-      client.restore(createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple, false,
-        tablesRestoreIncMultiple, null, true));
-      hTable = (HTable) conn.getTable(TABLE_NAME1);
-      Assert.assertEquals(util.countRows(hTable), rowsInBatch * 2);
-      hTable.close();
-      hTable = (HTable) conn.getTable(TABLE_NAME2);
-      Assert.assertEquals(util.countRows(hTable), rowsInBatch * 2);
+        // Load data
+        loadData(table, rowsInIteration);
+        tableSizes.add(rowsInIteration * count);
+        // Do incremental backup
+        builder = new BackupRequest.Builder();
+        request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables)
+            .withTargetRootDir(BACKUP_ROOT_DIR).build();
+        String backupId = backup(request, client);
+        assertTrue(checkSucceeded(backupId));
+        backupIds.add(backupId);
+
+        // Restore incremental backup for table, with overwrite for previous backup
+        String previousBackupId = backupIds.get(backupIds.size() - 2);
+        restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1));
+        // Restore incremental backup for table, with overwrite for last backup
+        restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count);
+      }
+      // Now merge all incremental and restore
+      String[] incBackupIds = allIncremental(backupIds);
+      merge(incBackupIds, client);
+      // Restore last one
+      String backupId = incBackupIds[incBackupIds.length - 1];
+      // restore incremental backup for table, with overwrite
+      TableName[] tablesRestoreIncMultiple = new TableName[] { table };
+      restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null,
+        true), client);
+      Table hTable = conn.getTable(table);
+      Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations);
       hTable.close();
+      LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count-1));
     }
   }
 
+  private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table,
+      String backupId, long expectedRows) throws IOException {
+
+    TableName[] tablesRestoreIncMultiple = new TableName[] { table };
+    restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false,
+      tablesRestoreIncMultiple, null, true), client);
+    Table hTable = conn.getTable(table);
+    Assert.assertEquals(expectedRows, util.countRows(hTable));
+    hTable.close();
+  }
+
+  private String[] allIncremental(List<String> backupIds) {
+    int size = backupIds.size();
+    backupIds = backupIds.subList(1, size);
+    String[] arr = new String[size - 1];
+    backupIds.toArray(arr);
+    return arr;
+  }
+
   /**
    *
    * @param backupId pass backup ID to check status of
@@ -260,18 +358,18 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
       TableName[] fromTables, TableName[] toTables, boolean isOverwrite) {
     RestoreRequest.Builder builder = new RestoreRequest.Builder();
     return builder.withBackupRootDir(backupRootDir)
-                                    .withBackupId(backupId)
-                                    .withCheck(check)
-                                    .withFromTables(fromTables)
-                                    .withToTables(toTables)
-                                    .withOvewrite(isOverwrite).build();
+        .withBackupId(backupId)
+        .withCheck(check)
+        .withFromTables(fromTables)
+        .withToTables(toTables)
+        .withOvewrite(isOverwrite).build();
   }
 
   @Override
   public void setUpCluster() throws Exception {
     util = getTestingUtil(getConf());
     enableBackup(getConf());
-    LOG.debug("Initializing/checking cluster has " + regionServerCount + " servers");
+    LOG.debug("Initializing/checking cluster has {} servers",regionServerCount);
     util.initializeCluster(regionServerCount);
     LOG.debug("Done initializing/checking cluster");
   }
@@ -282,14 +380,12 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
    */
   @Override
   public int runTestFromCommandLine() throws Exception {
-     // Check if backup is enabled
+    // Check if backup is enabled
     if (!BackupManager.isBackupEnabled(getConf())) {
       System.err.println(BackupRestoreConstants.ENABLE_BACKUP);
       return -1;
     }
-
     System.out.println(BackupRestoreConstants.VERIFY_BACKUP);
-
     testBackupRestore();
     return 0;
   }
@@ -308,11 +404,16 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
 
   @Override
   protected void addOptions() {
-    addOptWithArg(REGIONSERVER_COUNT_KEY, "Total number of region servers. Default: '"
-        + DEFAULT_REGIONSERVER_COUNT + "'");
+    addOptWithArg(REGIONSERVER_COUNT_KEY,
+      "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'");
     addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT);
-    addOptWithArg(NB_ROWS_IN_BATCH_KEY, "Total number of data rows to be loaded (per table/batch."
-        + " Total number of batches=2). Default: " + DEFAULT_NB_ROWS_IN_BATCH);
+    addOptWithArg(ROWS_PER_ITERATION_KEY,
+      "Total number of data rows to be loaded during one iteration." + " Default: "
+          + DEFAULT_ROWS_IN_ITERATION);
+    addOptWithArg(NUM_ITERATIONS_KEY,
+      "Total number iterations." + " Default: " + DEFAULT_NUM_ITERATIONS);
+    addOptWithArg(NUMBER_OF_TABLES_KEY,
+      "Total number of tables in the test." + " Default: " + DEFAULT_NUMBER_OF_TABLES);
 
   }
 
@@ -325,13 +426,18 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase {
     regionServerCount =
         Integer.parseInt(cmd.getOptionValue(REGIONSERVER_COUNT_KEY,
           Integer.toString(DEFAULT_REGIONSERVER_COUNT)));
-    rowsInBatch =
-        Integer.parseInt(cmd.getOptionValue(NB_ROWS_IN_BATCH_KEY,
-          Integer.toString(DEFAULT_NB_ROWS_IN_BATCH)));
+    rowsInIteration =
+        Integer.parseInt(cmd.getOptionValue(ROWS_PER_ITERATION_KEY,
+          Integer.toString(DEFAULT_ROWS_IN_ITERATION)));
+    numIterations = Integer.parseInt(cmd.getOptionValue(NUM_ITERATIONS_KEY,
+      Integer.toString(DEFAULT_NUM_ITERATIONS)));
+    numTables = Integer.parseInt(cmd.getOptionValue(NUMBER_OF_TABLES_KEY,
+      Integer.toString(DEFAULT_NUMBER_OF_TABLES)));
+
     LOG.info(MoreObjects.toStringHelper("Parsed Options").
       add(REGION_COUNT_KEY, regionsCountPerServer)
-        .add(REGIONSERVER_COUNT_KEY, regionServerCount).add(NB_ROWS_IN_BATCH_KEY, rowsInBatch)
-        .toString());
+      .add(REGIONSERVER_COUNT_KEY, regionServerCount).add(ROWS_PER_ITERATION_KEY, rowsInIteration)
+      .toString());
   }
 
   /**