You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/10/05 23:30:10 UTC

[04/10] hbase git commit: HBASE-16727 Backup refactoring: remove MR dependencies from HMaster (Vladimir Rodionov)

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
new file mode 100644
index 0000000..d10713d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupManifest.java
@@ -0,0 +1,791 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupType;
+import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+
+/**
+ * Backup manifest Contains all the meta data of a backup image. The manifest info will be bundled
+ * as manifest file together with data. So that each backup image will contain all the info needed
+ * for restore.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BackupManifest {
+
+  private static final Log LOG = LogFactory.getLog(BackupManifest.class);
+
+  // manifest file name
+  public static final String MANIFEST_FILE_NAME = ".backup.manifest";
+
+  // manifest file version, current is 1.0
+  public static final String MANIFEST_VERSION = "1.0";
+
+  // backup image, the dependency graph is made up by series of backup images
+
+  public static class BackupImage implements Comparable<BackupImage> {
+
+    private String backupId;
+    private BackupType type;
+    private String rootDir;
+    private List<TableName> tableList;
+    private long startTs;
+    private long completeTs;
+    private ArrayList<BackupImage> ancestors;
+
+    public BackupImage() {
+      super();
+    }
+
+    public BackupImage(String backupId, BackupType type, String rootDir,
+        List<TableName> tableList, long startTs, long completeTs) {
+      this.backupId = backupId;
+      this.type = type;
+      this.rootDir = rootDir;
+      this.tableList = tableList;
+      this.startTs = startTs;
+      this.completeTs = completeTs;
+    }
+
+    static BackupImage fromProto(BackupProtos.BackupImage im) {
+      String backupId = im.getBackupId();
+      String rootDir = im.getRootDir();
+      long startTs = im.getStartTs();
+      long completeTs = im.getCompleteTs();
+      List<HBaseProtos.TableName> tableListList = im.getTableListList();
+      List<TableName> tableList = new ArrayList<TableName>();
+      for(HBaseProtos.TableName tn : tableListList) {
+        tableList.add(ProtobufUtil.toTableName(tn));
+      }
+      
+      List<BackupProtos.BackupImage> ancestorList = im.getAncestorsList();
+      
+      BackupType type =
+          im.getBackupType() == BackupProtos.BackupType.FULL ? BackupType.FULL:
+            BackupType.INCREMENTAL;
+
+      BackupImage image = new BackupImage(backupId, type, rootDir, tableList, startTs, completeTs);
+      for(BackupProtos.BackupImage img: ancestorList) {
+        image.addAncestor(fromProto(img));
+      }
+      return image;
+    }
+
+    BackupProtos.BackupImage toProto() {
+      BackupProtos.BackupImage.Builder builder = BackupProtos.BackupImage.newBuilder();
+      builder.setBackupId(backupId);
+      builder.setCompleteTs(completeTs);
+      builder.setStartTs(startTs);
+      builder.setRootDir(rootDir);
+      if (type == BackupType.FULL) {
+        builder.setBackupType(BackupProtos.BackupType.FULL);
+      } else{
+        builder.setBackupType(BackupProtos.BackupType.INCREMENTAL);
+      }
+
+      for (TableName name: tableList) {
+        builder.addTableList(ProtobufUtil.toProtoTableName(name));
+      }
+
+      if (ancestors != null){
+        for (BackupImage im: ancestors){
+          builder.addAncestors(im.toProto());
+        }
+      }
+
+      return builder.build();
+    }
+
+    public String getBackupId() {
+      return backupId;
+    }
+
+    public void setBackupId(String backupId) {
+      this.backupId = backupId;
+    }
+
+    public BackupType getType() {
+      return type;
+    }
+
+    public void setType(BackupType type) {
+      this.type = type;
+    }
+
+    public String getRootDir() {
+      return rootDir;
+    }
+
+    public void setRootDir(String rootDir) {
+      this.rootDir = rootDir;
+    }
+
+    public List<TableName> getTableNames() {
+      return tableList;
+    }
+
+    public void setTableList(List<TableName> tableList) {
+      this.tableList = tableList;
+    }
+
+    public long getStartTs() {
+      return startTs;
+    }
+
+    public void setStartTs(long startTs) {
+      this.startTs = startTs;
+    }
+
+    public long getCompleteTs() {
+      return completeTs;
+    }
+
+    public void setCompleteTs(long completeTs) {
+      this.completeTs = completeTs;
+    }
+
+    public ArrayList<BackupImage> getAncestors() {
+      if (this.ancestors == null) {
+        this.ancestors = new ArrayList<BackupImage>();
+      }
+      return this.ancestors;
+    }
+
+    public void addAncestor(BackupImage backupImage) {
+      this.getAncestors().add(backupImage);
+    }
+
+    public boolean hasAncestor(String token) {
+      for (BackupImage image : this.getAncestors()) {
+        if (image.getBackupId().equals(token)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public boolean hasTable(TableName table) {
+      for (TableName t : tableList) {
+        if (t.equals(table)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public int compareTo(BackupImage other) {
+      String thisBackupId = this.getBackupId();
+      String otherBackupId = other.getBackupId();
+      int index1 = thisBackupId.lastIndexOf("_");
+      int index2 = otherBackupId.lastIndexOf("_");
+      String name1 = thisBackupId.substring(0, index1);
+      String name2 = otherBackupId.substring(0, index2);
+      if(name1.equals(name2)) {
+        Long thisTS = new Long(thisBackupId.substring(index1 + 1));
+        Long otherTS = new Long(otherBackupId.substring(index2 + 1));
+        return thisTS.compareTo(otherTS);
+      } else {
+        return name1.compareTo(name2);
+      }
+    }
+  }
+
+  // manifest version
+  private String version = MANIFEST_VERSION;
+
+  // hadoop hbase configuration
+  protected Configuration config = null;
+
+  // backup root directory
+  private String rootDir = null;
+
+  // backup image directory
+  private String tableBackupDir = null;
+
+  // backup log directory if this is an incremental backup
+  private String logBackupDir = null;
+
+  // backup token
+  private String backupId;
+
+  // backup type, full or incremental
+  private BackupType type;
+
+  // the table list for the backup
+  private ArrayList<TableName> tableList;
+
+  // actual start timestamp of the backup process
+  private long startTs;
+
+  // actual complete timestamp of the backup process
+  private long completeTs;
+
+  // the region server timestamp for tables:
+  // <table, <rs, timestamp>>
+  private Map<TableName, HashMap<String, Long>> incrTimeRanges;
+
+  // dependency of this backup, including all the dependent images to do PIT recovery
+  private Map<String, BackupImage> dependency;
+  
+  /**
+   * Construct manifest for a ongoing backup.
+   * @param backupCtx The ongoing backup context
+   */
+  public BackupManifest(BackupInfo backupCtx) {
+    this.backupId = backupCtx.getBackupId();
+    this.type = backupCtx.getType();
+    this.rootDir = backupCtx.getTargetRootDir();
+    if (this.type == BackupType.INCREMENTAL) {
+      this.logBackupDir = backupCtx.getHLogTargetDir();
+    }
+    this.startTs = backupCtx.getStartTs();
+    this.completeTs = backupCtx.getEndTs();
+    this.loadTableList(backupCtx.getTableNames());
+  }
+  
+  
+  /**
+   * Construct a table level manifest for a backup of the named table.
+   * @param backupCtx The ongoing backup context
+   */
+  public BackupManifest(BackupInfo backupCtx, TableName table) {
+    this.backupId = backupCtx.getBackupId();
+    this.type = backupCtx.getType();
+    this.rootDir = backupCtx.getTargetRootDir();
+    this.tableBackupDir = backupCtx.getBackupStatus(table).getTargetDir();
+    if (this.type == BackupType.INCREMENTAL) {
+      this.logBackupDir = backupCtx.getHLogTargetDir();
+    }
+    this.startTs = backupCtx.getStartTs();
+    this.completeTs = backupCtx.getEndTs();
+    List<TableName> tables = new ArrayList<TableName>();
+    tables.add(table);
+    this.loadTableList(tables);
+  }
+
+  /**
+   * Construct manifest from a backup directory.
+   * @param conf configuration
+   * @param backupPath backup path
+   * @throws IOException 
+   */
+
+  public BackupManifest(Configuration conf, Path backupPath) throws IOException {
+    this(backupPath.getFileSystem(conf), backupPath);
+  }
+
+  /**
+   * Construct manifest from a backup directory.
+   * @param conf configuration
+   * @param backupPath backup path
+   * @throws BackupException exception
+   */
+
+  public BackupManifest(FileSystem fs, Path backupPath) throws BackupException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Loading manifest from: " + backupPath.toString());
+    }
+    // The input backupDir may not exactly be the backup table dir.
+    // It could be the backup log dir where there is also a manifest file stored.
+    // This variable's purpose is to keep the correct and original location so
+    // that we can store/persist it.
+    this.tableBackupDir = backupPath.toString();
+    this.config = fs.getConf();
+    try {
+
+      FileStatus[] subFiles = BackupClientUtil.listStatus(fs, backupPath, null);
+      if (subFiles == null) {
+        String errorMsg = backupPath.toString() + " does not exist";
+        LOG.error(errorMsg);
+        throw new IOException(errorMsg);
+      }
+      for (FileStatus subFile : subFiles) {
+        if (subFile.getPath().getName().equals(MANIFEST_FILE_NAME)) {
+
+          // load and set manifest field from file content
+          FSDataInputStream in = fs.open(subFile.getPath());
+          long len = subFile.getLen();
+          byte[] pbBytes = new byte[(int) len];
+          in.readFully(pbBytes);
+          BackupProtos.BackupManifest proto = null;
+          try{
+            proto = parseFrom(pbBytes);
+          } catch(Exception e){
+            throw new BackupException(e);
+          }
+          this.version = proto.getVersion();
+          this.backupId = proto.getBackupId();
+          this.type = BackupType.valueOf(proto.getType().name());
+          // Here the parameter backupDir is where the manifest file is.
+          // There should always be a manifest file under:
+          // backupRootDir/namespace/table/backupId/.backup.manifest
+          this.rootDir = backupPath.getParent().getParent().getParent().toString();
+
+          Path p = backupPath.getParent();
+          if (p.getName().equals(HConstants.HREGION_LOGDIR_NAME)) {
+            this.rootDir = p.getParent().toString();
+          } else {
+            this.rootDir = p.getParent().getParent().toString();
+          }
+
+          loadTableList(proto);
+          this.startTs = proto.getStartTs();
+          this.completeTs = proto.getCompleteTs();
+          loadIncrementalTimestampMap(proto);
+          loadDependency(proto);
+          //TODO: merge will be implemented by future jira
+          LOG.debug("Loaded manifest instance from manifest file: "
+              + BackupClientUtil.getPath(subFile.getPath()));
+          return;
+        }
+      }
+      String errorMsg = "No manifest file found in: " + backupPath.toString();
+      throw new IOException(errorMsg);
+
+    } catch (IOException e) {
+      throw new BackupException(e.getMessage());
+    }
+  }
+  
+  private void loadIncrementalTimestampMap(BackupProtos.BackupManifest proto) {
+    List<BackupProtos.TableServerTimestamp> list = proto.getTstMapList();
+    if(list == null || list.size() == 0) return;
+    this.incrTimeRanges = new HashMap<TableName, HashMap<String, Long>>();
+    for(BackupProtos.TableServerTimestamp tst: list){
+      TableName tn = ProtobufUtil.toTableName(tst.getTable());
+      HashMap<String, Long> map = this.incrTimeRanges.get(tn);
+      if(map == null){
+        map = new HashMap<String, Long>();
+        this.incrTimeRanges.put(tn, map);
+      }
+      List<BackupProtos.ServerTimestamp> listSt = tst.getServerTimestampList();
+      for(BackupProtos.ServerTimestamp stm: listSt) {
+        map.put(stm.getServer(), stm.getTimestamp());
+      }
+    }
+  }
+
+  private void loadDependency(BackupProtos.BackupManifest proto) {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("load dependency for: "+proto.getBackupId());
+    }
+
+    dependency = new HashMap<String, BackupImage>();
+    List<BackupProtos.BackupImage> list = proto.getDependentBackupImageList();
+    for (BackupProtos.BackupImage im : list) {
+      BackupImage bim = BackupImage.fromProto(im);
+      if(im.getBackupId() != null){
+        dependency.put(im.getBackupId(), bim);
+      } else{
+        LOG.warn("Load dependency for backup manifest: "+ backupId+ 
+          ". Null backup id in dependent image");
+      }
+    }
+  }
+
+  private void loadTableList(BackupProtos.BackupManifest proto) {
+    this.tableList = new ArrayList<TableName>();
+    List<HBaseProtos.TableName> list = proto.getTableListList();
+    for (HBaseProtos.TableName name: list) {
+      this.tableList.add(ProtobufUtil.toTableName(name));
+    }
+  }
+
+  public BackupType getType() {
+    return type;
+  }
+
+  public void setType(BackupType type) {
+    this.type = type;
+  }
+
+  /**
+   * Loads table list.
+   * @param tableList Table list
+   */
+  private void loadTableList(List<TableName> tableList) {
+
+    this.tableList = this.getTableList();
+    if (this.tableList.size() > 0) {
+      this.tableList.clear();
+    }
+    for (int i = 0; i < tableList.size(); i++) {
+      this.tableList.add(tableList.get(i));
+    }
+
+    LOG.debug(tableList.size() + " tables exist in table set.");
+  }
+
+  /**
+   * Get the table set of this image.
+   * @return The table set list
+   */
+  public ArrayList<TableName> getTableList() {
+    if (this.tableList == null) {
+      this.tableList = new ArrayList<TableName>();
+    }
+    return this.tableList;
+  }
+
+  /**
+   * Persist the manifest file.
+   * @throws IOException IOException when storing the manifest file.
+   */
+
+  public void store(Configuration conf) throws BackupException {
+    byte[] data = toByteArray();
+
+    // write the file, overwrite if already exist
+    Path manifestFilePath =
+        new Path(new Path((this.tableBackupDir != null ? this.tableBackupDir : this.logBackupDir))
+            ,MANIFEST_FILE_NAME);
+    try {
+      FSDataOutputStream out =
+          manifestFilePath.getFileSystem(conf).create(manifestFilePath, true);
+      out.write(data);
+      out.close();
+    } catch (IOException e) {      
+      throw new BackupException(e.getMessage());
+    }
+
+    LOG.info("Manifest file stored to " + manifestFilePath);
+  }
+
+  /**
+   * Protobuf serialization
+   * @return The filter serialized using pb
+   */
+  public byte[] toByteArray() {
+    BackupProtos.BackupManifest.Builder builder = BackupProtos.BackupManifest.newBuilder();
+    builder.setVersion(this.version);
+    builder.setBackupId(this.backupId);
+    builder.setType(BackupProtos.BackupType.valueOf(this.type.name()));
+    setTableList(builder);
+    builder.setStartTs(this.startTs);
+    builder.setCompleteTs(this.completeTs);
+    setIncrementalTimestampMap(builder);
+    setDependencyMap(builder);
+    return builder.build().toByteArray();
+  }
+
+  private void setIncrementalTimestampMap(BackupProtos.BackupManifest.Builder builder) {
+    if (this.incrTimeRanges == null) {
+      return;
+    }
+    for (Entry<TableName, HashMap<String,Long>> entry: this.incrTimeRanges.entrySet()) {
+      TableName key = entry.getKey();
+      HashMap<String, Long> value = entry.getValue();
+      BackupProtos.TableServerTimestamp.Builder tstBuilder =
+          BackupProtos.TableServerTimestamp.newBuilder();
+      tstBuilder.setTable(ProtobufUtil.toProtoTableName(key));
+
+      for (String s : value.keySet()) {
+        BackupProtos.ServerTimestamp.Builder stBuilder = BackupProtos.ServerTimestamp.newBuilder();
+        stBuilder.setServer(s);
+        stBuilder.setTimestamp(value.get(s));
+        tstBuilder.addServerTimestamp(stBuilder.build());
+      }
+      builder.addTstMap(tstBuilder.build());
+    }
+  }
+
+  private void setDependencyMap(BackupProtos.BackupManifest.Builder builder) {
+    for (BackupImage image: getDependency().values()) {
+      builder.addDependentBackupImage(image.toProto());
+    }
+  }
+
+  private void setTableList(BackupProtos.BackupManifest.Builder builder) {
+    for(TableName name: tableList){
+      builder.addTableList(ProtobufUtil.toProtoTableName(name));
+    }
+  }
+
+  /**
+   * Parse protobuf from byte array
+   * @param pbBytes A pb serialized BackupManifest instance
+   * @return An instance of  made from <code>bytes</code>
+   * @throws DeserializationException
+   */
+  private static BackupProtos.BackupManifest parseFrom(final byte[] pbBytes)
+      throws DeserializationException {
+    BackupProtos.BackupManifest proto;
+    try {
+      proto = BackupProtos.BackupManifest.parseFrom(pbBytes);
+    } catch (InvalidProtocolBufferException e) {
+      throw new DeserializationException(e);
+    }
+    return proto;
+  }
+
+  /**
+   * Get manifest file version
+   * @return version
+   */
+  public String getVersion() {
+    return version;
+  }
+
+  /**
+   * Get this backup image.
+   * @return the backup image.
+   */
+  public BackupImage getBackupImage() {
+    return this.getDependency().get(this.backupId);
+  }
+
+  /**
+   * Add dependent backup image for this backup.
+   * @param image The direct dependent backup image
+   */
+  public void addDependentImage(BackupImage image) {
+    this.getDependency().get(this.backupId).addAncestor(image);
+    this.setDependencyMap(this.getDependency(), image);
+  }
+
+
+
+  /**
+   * Get all dependent backup images. The image of this backup is also contained.
+   * @return The dependent backup images map
+   */
+  public Map<String, BackupImage> getDependency() {
+    if (this.dependency == null) {
+      this.dependency = new HashMap<String, BackupImage>();
+      LOG.debug(this.rootDir + " " + this.backupId + " " + this.type);
+      this.dependency.put(this.backupId,
+        new BackupImage(this.backupId, this.type, this.rootDir, tableList, this.startTs,
+            this.completeTs));
+    }
+    return this.dependency;
+  }
+
+  /**
+   * Set the incremental timestamp map directly.
+   * @param incrTimestampMap timestamp map
+   */
+  public void setIncrTimestampMap(HashMap<TableName, HashMap<String, Long>> incrTimestampMap) {
+    this.incrTimeRanges = incrTimestampMap;
+  }
+
+
+  public Map<TableName, HashMap<String, Long>> getIncrTimestampMap() {
+    if (this.incrTimeRanges == null) {
+      this.incrTimeRanges = new HashMap<TableName, HashMap<String, Long>>();
+    }
+    return this.incrTimeRanges;
+  }
+
+
+  /**
+   * Get the image list of this backup for restore in time order.
+   * @param reverse If true, then output in reverse order, otherwise in time order from old to new
+   * @return the backup image list for restore in time order
+   */
+  public ArrayList<BackupImage> getRestoreDependentList(boolean reverse) {
+    TreeMap<Long, BackupImage> restoreImages = new TreeMap<Long, BackupImage>();
+    for (BackupImage image : this.getDependency().values()) {
+      restoreImages.put(Long.valueOf(image.startTs), image);
+    }
+    return new ArrayList<BackupImage>(reverse ? (restoreImages.descendingMap().values())
+        : (restoreImages.values()));
+  }
+
+  /**
+   * Get the dependent image list for a specific table of this backup in time order from old to new
+   * if want to restore to this backup image level.
+   * @param table table
+   * @return the backup image list for a table in time order
+   */
+  public ArrayList<BackupImage> getDependentListByTable(TableName table) {
+    ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>();
+    ArrayList<BackupImage> imageList = getRestoreDependentList(true);
+    for (BackupImage image : imageList) {
+      if (image.hasTable(table)) {
+        tableImageList.add(image);
+        if (image.getType() == BackupType.FULL) {
+          break;
+        }
+      }
+    }
+    Collections.reverse(tableImageList);
+    return tableImageList;
+  }
+
+  /**
+   * Get the full dependent image list in the whole dependency scope for a specific table of this
+   * backup in time order from old to new.
+   * @param table table
+   * @return the full backup image list for a table in time order in the whole scope of the
+   *         dependency of this image
+   */
+  public ArrayList<BackupImage> getAllDependentListByTable(TableName table) {
+    ArrayList<BackupImage> tableImageList = new ArrayList<BackupImage>();
+    ArrayList<BackupImage> imageList = getRestoreDependentList(false);
+    for (BackupImage image : imageList) {
+      if (image.hasTable(table)) {
+        tableImageList.add(image);
+      }
+    }
+    return tableImageList;
+  }
+
+
+  /**
+   * Recursively set the dependency map of the backup images.
+   * @param map The dependency map
+   * @param image The backup image
+   */
+  private void setDependencyMap(Map<String, BackupImage> map, BackupImage image) {
+    if (image == null) {
+      return;
+    } else {
+      map.put(image.getBackupId(), image);
+      for (BackupImage img : image.getAncestors()) {
+        setDependencyMap(map, img);
+      }
+    }
+  }
+
+  /**
+   * Check whether backup image1 could cover backup image2 or not.
+   * @param image1 backup image 1
+   * @param image2 backup image 2
+   * @return true if image1 can cover image2, otherwise false
+   */
+  public static boolean canCoverImage(BackupImage image1, BackupImage image2) {
+    // image1 can cover image2 only when the following conditions are satisfied:
+    // - image1 must not be an incremental image;
+    // - image1 must be taken after image2 has been taken;
+    // - table set of image1 must cover the table set of image2.
+    if (image1.getType() == BackupType.INCREMENTAL) {
+      return false;
+    }
+    if (image1.getStartTs() < image2.getStartTs()) {
+      return false;
+    }
+    List<TableName> image1TableList = image1.getTableNames();
+    List<TableName> image2TableList = image2.getTableNames();
+    boolean found = false;
+    for (int i = 0; i < image2TableList.size(); i++) {
+      found = false;
+      for (int j = 0; j < image1TableList.size(); j++) {
+        if (image2TableList.get(i).equals(image1TableList.get(j))) {
+          found = true;
+          break;
+        }
+      }
+      if (!found) {
+        return false;
+      }
+    }
+
+    LOG.debug("Backup image " + image1.getBackupId() + " can cover " + image2.getBackupId());
+    return true;
+  }
+
+  /**
+   * Check whether backup image set could cover a backup image or not.
+   * @param fullImages The backup image set
+   * @param image The target backup image
+   * @return true if fullImages can cover image, otherwise false
+   */
+  public static boolean canCoverImage(ArrayList<BackupImage> fullImages, BackupImage image) {
+    // fullImages can cover image only when the following conditions are satisfied:
+    // - each image of fullImages must not be an incremental image;
+    // - each image of fullImages must be taken after image has been taken;
+    // - sum table set of fullImages must cover the table set of image.
+    for (BackupImage image1 : fullImages) {
+      if (image1.getType() == BackupType.INCREMENTAL) {
+        return false;
+      }
+      if (image1.getStartTs() < image.getStartTs()) {
+        return false;
+      }
+    }
+
+    ArrayList<String> image1TableList = new ArrayList<String>();
+    for (BackupImage image1 : fullImages) {
+      List<TableName> tableList = image1.getTableNames();
+      for (TableName table : tableList) {
+        image1TableList.add(table.getNameAsString());
+      }
+    }
+    ArrayList<String> image2TableList = new ArrayList<String>();
+    List<TableName> tableList = image.getTableNames();
+    for (TableName table : tableList) {
+      image2TableList.add(table.getNameAsString());
+    }
+
+    for (int i = 0; i < image2TableList.size(); i++) {
+      if (image1TableList.contains(image2TableList.get(i)) == false) {
+        return false;
+      }
+    }
+
+    LOG.debug("Full image set can cover image " + image.getBackupId());
+    return true;
+  }
+  
+  public BackupInfo toBackupInfo()
+  {
+    BackupInfo info = new BackupInfo();
+    info.setType(type);
+    TableName[] tables = new TableName[tableList.size()];
+    info.addTables(getTableList().toArray(tables));
+    info.setBackupId(backupId);
+    info.setStartTs(startTs);
+    info.setTargetRootDir(rootDir);
+    if(type == BackupType.INCREMENTAL) {
+      info.setHlogTargetDir(logBackupDir);
+    }
+    return info;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java
new file mode 100644
index 0000000..ac1d2bc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupRestoreConstants.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.impl;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * BackupRestoreConstants holds a bunch of HBase Backup and Restore constants
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public final class BackupRestoreConstants {
+
+
+  // delimiter in tablename list in restore command
+  public static final String TABLENAME_DELIMITER_IN_COMMAND = ",";
+
+  public static final String CONF_STAGING_ROOT = "snapshot.export.staging.root";
+
+  public static final String BACKUPID_PREFIX = "backup_";
+
+  public static enum BackupCommand {
+    CREATE, CANCEL, DELETE, DESCRIBE, HISTORY, STATUS, CONVERT, MERGE, STOP, SHOW, HELP, PROGRESS, SET,
+    SET_ADD, SET_REMOVE, SET_DELETE, SET_DESCRIBE, SET_LIST
+  }
+
+  private BackupRestoreConstants() {
+    // Can't be instantiated with this ctor.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
new file mode 100644
index 0000000..d05d54c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTable.java
@@ -0,0 +1,926 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
+import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.BackupProtos;
+
+/**
+ * This class provides 'hbase:backup' table API
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupSystemTable implements Closeable {
+
+  static class WALItem {
+    String backupId;
+    String walFile;
+    String backupRoot;
+
+    WALItem(String backupId, String walFile, String backupRoot) {
+      this.backupId = backupId;
+      this.walFile = walFile;
+      this.backupRoot = backupRoot;
+    }
+
+    public String getBackupId() {
+      return backupId;
+    }
+
+    public String getWalFile() {
+      return walFile;
+    }
+
+    public String getBackupRoot() {
+      return backupRoot;
+    }
+
+    public String toString() {
+      return "/" + backupRoot + "/" + backupId + "/" + walFile;
+    }
+
+  }
+
+  private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
+  private final static TableName tableName = TableName.BACKUP_TABLE_NAME;
+  // Stores backup sessions (contexts)
+  final static byte[] SESSIONS_FAMILY = "session".getBytes();
+  // Stores other meta
+  final static byte[] META_FAMILY = "meta".getBytes();
+  // Connection to HBase cluster, shared
+  // among all instances
+  private final Connection connection;
+
+  public BackupSystemTable(Connection conn) throws IOException {
+    this.connection = conn;
+  }
+
+  public void close() {
+    // do nothing
+  }
+
+  /**
+   * Updates status (state) of a backup session in hbase:backup table
+   * @param context context
+   * @throws IOException exception
+   */
+  public void updateBackupInfo(BackupInfo context) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("update backup status in hbase:backup for: " + context.getBackupId()
+          + " set status=" + context.getState());
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = BackupSystemTableHelper.createPutForBackupContext(context);
+      table.put(put);
+    }
+  }
+
+  /**
+   * Deletes backup status from hbase:backup table
+   * @param backupId backup id
+   * @throws IOException exception
+   */
+
+  public void deleteBackupInfo(String backupId) throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("delete backup status in hbase:backup for " + backupId);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Delete del = BackupSystemTableHelper.createDeleteForBackupInfo(backupId);
+      table.delete(del);
+    }
+  }
+
+  /**
+   * Reads backup status object (instance of BackupContext) from hbase:backup table
+   * @param backupId - backupId
+   * @return Current status of backup session or null
+   */
+
+  public BackupInfo readBackupInfo(String backupId) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read backup status from hbase:backup for: " + backupId);
+    }
+
+    try (Table table = connection.getTable(tableName)) {
+      Get get = BackupSystemTableHelper.createGetForBackupContext(backupId);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      return BackupSystemTableHelper.resultToBackupInfo(res);
+    }
+  }
+
+  /**
+   * Read the last backup start code (timestamp) of last successful backup. Will return null if
+   * there is no start code stored on hbase or the value is of length 0. These two cases indicate
+   * there is no successful backup completed so far.
+   * @param backupRoot root directory path to backup
+   * @return the timestamp of last successful backup
+   * @throws IOException exception
+   */
+  public String readBackupStartCode(String backupRoot) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read backup start code from hbase:backup");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Get get = BackupSystemTableHelper.createGetForStartCode(backupRoot);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return null;
+      }
+      Cell cell = res.listCells().get(0);
+      byte[] val = CellUtil.cloneValue(cell);
+      if (val.length == 0) {
+        return null;
+      }
+      return new String(val);
+    }
+  }
+
+  /**
+   * Write the start code (timestamp) to hbase:backup. If passed in null, then write 0 byte.
+   * @param startCode start code
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void writeBackupStartCode(Long startCode, String backupRoot) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write backup start code to hbase:backup " + startCode);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = BackupSystemTableHelper.createPutForStartCode(startCode.toString(), backupRoot);
+      table.put(put);
+    }
+  }
+
+  /**
+   * Get the Region Servers log information after the last log roll from hbase:backup.
+   * @param backupRoot root directory path to backup
+   * @return RS log info
+   * @throws IOException exception
+   */
+  public HashMap<String, Long> readRegionServerLastLogRollResult(String backupRoot)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read region server last roll log result to hbase:backup");
+    }
+
+    Scan scan = BackupSystemTableHelper.createScanForReadRegionServerLastLogRollResult(backupRoot);
+
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      HashMap<String, Long> rsTimestampMap = new HashMap<String, Long>();
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        Cell cell = res.current();
+        byte[] row = CellUtil.cloneRow(cell);
+        String server =
+            BackupSystemTableHelper.getServerNameForReadRegionServerLastLogRollResult(row);
+        byte[] data = CellUtil.cloneValue(cell);
+        rsTimestampMap.put(server, Long.parseLong(new String(data)));
+      }
+      return rsTimestampMap;
+    }
+  }
+
+  /**
+   * Writes Region Server last roll log result (timestamp) to hbase:backup table
+   * @param server - Region Server name
+   * @param ts- last log timestamp
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void writeRegionServerLastLogRollResult(String server, Long ts, String backupRoot)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write region server last roll log result to hbase:backup");
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put =
+          BackupSystemTableHelper.createPutForRegionServerLastLogRollResult(server, ts, backupRoot);
+      table.put(put);
+    }
+  }
+
+  /**
+   * Get all completed backup information (in desc order by time)
+   * @param onlyCompeleted, true, if only successfully completed sessions
+   * @return history info of BackupCompleteData
+   * @throws IOException exception
+   */
+  public ArrayList<BackupInfo> getBackupHistory(boolean onlyCompleted) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get backup history from hbase:backup");
+    }
+    ArrayList<BackupInfo> list;
+    BackupState state = onlyCompleted ? BackupState.COMPLETE : BackupState.ANY;
+    list = getBackupContexts(state);
+    return BackupClientUtil.sortHistoryListDesc(list);
+  }
+
+  /**
+   * Get all backups history
+   * @return list of backup info 
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistory() throws IOException {
+    return getBackupHistory(false);
+  }
+
+  /**
+   * Get first n backup history records
+   * @param n - number of records
+   * @return list of records
+   * @throws IOException
+   */
+  public List<BackupInfo> getHistory(int n) throws IOException {
+
+    List<BackupInfo> history = getBackupHistory();
+    if (history.size() <= n) return history;
+    List<BackupInfo> list = new ArrayList<BackupInfo>();
+    for (int i = 0; i < n; i++) {
+      list.add(history.get(i));
+    }
+    return list;
+
+  }
+  
+  /**
+   * Get backup history records filtered by list
+   * of filters.
+   * @param n - max number of records
+   * @param filters - list of filters
+   * @return backup records
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistory(int n, BackupInfo.Filter... filters) throws IOException {
+    if (filters.length == 0) return getHistory(n);
+
+    List<BackupInfo> history = getBackupHistory();
+    List<BackupInfo> result = new ArrayList<BackupInfo>();
+    for (BackupInfo bi : history) {
+      if (result.size() == n) break;
+      boolean passed = true;
+      for (int i = 0; i < filters.length; i++) {
+        if (!filters[i].apply(bi)) {
+          passed = false;
+          break;
+        }
+      }
+      if (passed) {
+        result.add(bi);
+      }
+    }
+    return result;
+
+  }
+
+  /**
+   * Get history for backup destination
+   * @param backupRoot - backup destination
+   * @return List of backup info
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistory(String backupRoot) throws IOException {
+    ArrayList<BackupInfo> history = getBackupHistory(false);
+    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
+      BackupInfo info = iterator.next();
+      if (!backupRoot.equals(info.getTargetRootDir())) {
+        iterator.remove();
+      }
+    }
+    return history;
+  }
+  
+  /**
+   * Get history for a table
+   * @param name - table name
+   * @return history for a table
+   * @throws IOException
+   */
+  public List<BackupInfo> getBackupHistoryForTable(TableName name) throws IOException {
+    List<BackupInfo> history = getBackupHistory();
+    List<BackupInfo> tableHistory = new ArrayList<BackupInfo>();
+    for (BackupInfo info : history) {
+      List<TableName> tables = info.getTableNames();
+      if (tables.contains(name)) {
+        tableHistory.add(info);
+      }
+    }
+    return tableHistory;
+  }
+
+  public Map<TableName, ArrayList<BackupInfo>> 
+    getBackupHistoryForTableSet(Set<TableName> set, String backupRoot) throws IOException {
+    List<BackupInfo> history = getBackupHistory(backupRoot);
+    Map<TableName, ArrayList<BackupInfo>> tableHistoryMap = 
+        new HashMap<TableName, ArrayList<BackupInfo>>();
+    for (Iterator<BackupInfo> iterator = history.iterator(); iterator.hasNext();) {
+      BackupInfo info = iterator.next();
+      if (!backupRoot.equals(info.getTargetRootDir())) {
+        continue;
+      }
+      List<TableName> tables = info.getTableNames();
+      for (TableName tableName: tables) {      
+        if (set.contains(tableName)) {
+          ArrayList<BackupInfo> list = tableHistoryMap.get(tableName);
+          if (list == null) {
+            list = new ArrayList<BackupInfo>();
+            tableHistoryMap.put(tableName, list);
+          }
+          list.add(info);
+        }
+      }
+    }
+    return tableHistoryMap;
+  }
+  
+  /**
+   * Get all backup session with a given status (in desc order by time)
+   * @param status status
+   * @return history info of backup contexts
+   * @throws IOException exception
+   */
+  public ArrayList<BackupInfo> getBackupContexts(BackupState status) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get backup contexts from hbase:backup");
+    }
+
+    Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+    ArrayList<BackupInfo> list = new ArrayList<BackupInfo>();
+
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        BackupInfo context = BackupSystemTableHelper.cellToBackupInfo(res.current());
+        if (status != BackupState.ANY && context.getState() != status) {
+          continue;
+        }
+        list.add(context);
+      }
+      return list;
+    }
+  }
+
+  /**
+   * Write the current timestamps for each regionserver to hbase:backup after a successful full or
+   * incremental backup. The saved timestamp is of the last log file that was backed up already.
+   * @param tables tables
+   * @param newTimestamps timestamps
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void writeRegionServerLogTimestamp(Set<TableName> tables,
+      HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("write RS log time stamps to hbase:backup for tables ["
+          + StringUtils.join(tables, ",") + "]");
+    }
+    List<Put> puts = new ArrayList<Put>();
+    for (TableName table : tables) {
+      byte[] smapData = toTableServerTimestampProto(table, newTimestamps).toByteArray();
+      Put put =
+          BackupSystemTableHelper.createPutForWriteRegionServerLogTimestamp(table, smapData,
+            backupRoot);
+      puts.add(put);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      table.put(puts);
+    }
+  }
+
+  /**
+   * Read the timestamp for each region server log after the last successful backup. Each table has
+   * its own set of the timestamps. The info is stored for each table as a concatenated string of
+   * rs->timestapmp
+   * @param backupRoot root directory path to backup
+   * @return the timestamp for each region server. key: tableName value:
+   *         RegionServer,PreviousTimeStamp
+   * @throws IOException exception
+   */
+  public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("read RS log ts from hbase:backup for root=" + backupRoot);
+    }
+
+    HashMap<TableName, HashMap<String, Long>> tableTimestampMap =
+        new HashMap<TableName, HashMap<String, Long>>();
+
+    Scan scan = BackupSystemTableHelper.createScanForReadLogTimestampMap(backupRoot);
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        Cell cell = res.current();
+        byte[] row = CellUtil.cloneRow(cell);
+        String tabName = BackupSystemTableHelper.getTableNameForReadLogTimestampMap(row);
+        TableName tn = TableName.valueOf(tabName);
+        byte[] data = CellUtil.cloneValue(cell);
+        if (data == null) {
+          throw new IOException("Data of last backup data from hbase:backup "
+              + "is empty. Create a backup first.");
+        }
+        if (data != null && data.length > 0) {
+          HashMap<String, Long> lastBackup =
+              fromTableServerTimestampProto(BackupProtos.TableServerTimestamp.parseFrom(data));
+          tableTimestampMap.put(tn, lastBackup);
+        }
+      }
+      return tableTimestampMap;
+    }
+  }
+
+  private BackupProtos.TableServerTimestamp toTableServerTimestampProto(TableName table,
+      Map<String, Long> map) {
+    BackupProtos.TableServerTimestamp.Builder tstBuilder =
+        BackupProtos.TableServerTimestamp.newBuilder();
+    tstBuilder.setTable(ProtobufUtil.toProtoTableName(table));
+
+    for (Entry<String, Long> entry : map.entrySet()) {
+      BackupProtos.ServerTimestamp.Builder builder = BackupProtos.ServerTimestamp.newBuilder();
+      builder.setServer(entry.getKey());
+      builder.setTimestamp(entry.getValue());
+      tstBuilder.addServerTimestamp(builder.build());
+    }
+
+    return tstBuilder.build();
+  }
+
+  private HashMap<String, Long> fromTableServerTimestampProto(
+      BackupProtos.TableServerTimestamp proto) {
+    HashMap<String, Long> map = new HashMap<String, Long>();
+    List<BackupProtos.ServerTimestamp> list = proto.getServerTimestampList();
+    for (BackupProtos.ServerTimestamp st : list) {
+      map.put(st.getServer(), st.getTimestamp());
+    }
+    return map;
+  }
+
+  /**
+   * Return the current tables covered by incremental backup.
+   * @param backupRoot root directory path to backup
+   * @return set of tableNames
+   * @throws IOException exception
+   */
+  public Set<TableName> getIncrementalBackupTableSet(String backupRoot) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get incr backup table set from hbase:backup");
+    }
+    TreeSet<TableName> set = new TreeSet<>();
+
+    try (Table table = connection.getTable(tableName)) {
+      Get get = BackupSystemTableHelper.createGetForIncrBackupTableSet(backupRoot);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return set;
+      }
+      List<Cell> cells = res.listCells();
+      for (Cell cell : cells) {
+        // qualifier = table name - we use table names as qualifiers
+        set.add(TableName.valueOf(CellUtil.cloneQualifier(cell)));
+      }
+      return set;
+    }
+  }
+
+  /**
+   * Add tables to global incremental backup set
+   * @param tables - set of tables
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void addIncrementalBackupTableSet(Set<TableName> tables, String backupRoot)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Add incremental backup table set to hbase:backup. ROOT=" + backupRoot
+          + " tables [" + StringUtils.join(tables, " ") + "]");
+      for (TableName table : tables) {
+        LOG.debug(table);
+      }
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Put put = BackupSystemTableHelper.createPutForIncrBackupTableSet(tables, backupRoot);
+      table.put(put);
+    }
+  }
+
+  /**
+   * Removes incremental backup set
+   * @param backupRoot backup root
+   */
+
+  public void deleteIncrementalBackupTableSet(String backupRoot) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Delete incremental backup table set to hbase:backup. ROOT=" + backupRoot);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Delete delete = BackupSystemTableHelper.createDeleteForIncrBackupTableSet(backupRoot);
+      table.delete(delete);
+    }
+  }
+
+  /**
+   * Register WAL files as eligible for deletion
+   * @param files files
+   * @param backupId backup id
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public void addWALFiles(List<String> files, String backupId, String backupRoot)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("add WAL files to hbase:backup: " + backupId + " " + backupRoot + " files ["
+          + StringUtils.join(files, ",") + "]");
+      for (String f : files) {
+        LOG.debug("add :" + f);
+      }
+    }
+    try (Table table = connection.getTable(tableName)) {
+      List<Put> puts =
+          BackupSystemTableHelper.createPutsForAddWALFiles(files, backupId, backupRoot);
+      table.put(puts);
+    }
+  }
+
+  /**
+   * Register WAL files as eligible for deletion
+   * @param backupRoot root directory path to backup
+   * @throws IOException exception
+   */
+  public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("get WAL files from hbase:backup");
+    }
+    final Table table = connection.getTable(tableName);
+    Scan scan = BackupSystemTableHelper.createScanForGetWALs(backupRoot);
+    final ResultScanner scanner = table.getScanner(scan);
+    final Iterator<Result> it = scanner.iterator();
+    return new Iterator<WALItem>() {
+
+      @Override
+      public boolean hasNext() {
+        boolean next = it.hasNext();
+        if (!next) {
+          // close all
+          try {
+            scanner.close();
+            table.close();
+          } catch (IOException e) {
+            LOG.error("Close WAL Iterator", e);
+          }
+        }
+        return next;
+      }
+
+      @Override
+      public WALItem next() {
+        Result next = it.next();
+        List<Cell> cells = next.listCells();
+        byte[] buf = cells.get(0).getValueArray();
+        int len = cells.get(0).getValueLength();
+        int offset = cells.get(0).getValueOffset();
+        String backupId = new String(buf, offset, len);
+        buf = cells.get(1).getValueArray();
+        len = cells.get(1).getValueLength();
+        offset = cells.get(1).getValueOffset();
+        String walFile = new String(buf, offset, len);
+        buf = cells.get(2).getValueArray();
+        len = cells.get(2).getValueLength();
+        offset = cells.get(2).getValueOffset();
+        String backupRoot = new String(buf, offset, len);
+        return new WALItem(backupId, walFile, backupRoot);
+      }
+
+      @Override
+      public void remove() {
+        // not implemented
+        throw new RuntimeException("remove is not supported");
+      }
+    };
+
+  }
+
+  /**
+   * Check if WAL file is eligible for deletion Future: to support all backup destinations
+   * @param file file
+   * @return true, if - yes.
+   * @throws IOException exception
+   */
+  public boolean isWALFileDeletable(String file) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Check if WAL file has been already backed up in hbase:backup " + file);
+    }
+    try (Table table = connection.getTable(tableName)) {
+      Get get = BackupSystemTableHelper.createGetForCheckWALFile(file);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  /**
+   * Checks if we have at least one backup session in hbase:backup This API is used by
+   * BackupLogCleaner
+   * @return true, if - at least one session exists in hbase:backup table
+   * @throws IOException exception
+   */
+  public boolean hasBackupSessions() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Has backup sessions from hbase:backup");
+    }
+    boolean result = false;
+    Scan scan = BackupSystemTableHelper.createScanForBackupHistory();
+    scan.setCaching(1);
+    try (Table table = connection.getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+      if (scanner.next() != null) {
+        result = true;
+      }
+      return result;
+    }
+  }
+
+  /**
+   * BACKUP SETS
+   */
+
+  /**
+   * Get backup set list
+   * @return backup set list
+   * @throws IOException
+   */
+  public List<String> listBackupSets() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" Backup set list");
+    }
+    List<String> list = new ArrayList<String>();
+    Table table = null;
+    ResultScanner scanner = null;
+    try {
+      table = connection.getTable(tableName);
+      Scan scan = BackupSystemTableHelper.createScanForBackupSetList();
+      scan.setMaxVersions(1);
+      scanner = table.getScanner(scan);
+      Result res = null;
+      while ((res = scanner.next()) != null) {
+        res.advance();
+        list.add(BackupSystemTableHelper.cellKeyToBackupSetName(res.current()));
+      }
+      return list;
+    } finally {
+      if (scanner != null) {
+        scanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Get backup set description (list of tables)
+   * @param name - set's name
+   * @return list of tables in a backup set
+   * @throws IOException
+   */
+  public List<TableName> describeBackupSet(String name) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" Backup set describe: " + name);
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = BackupSystemTableHelper.createGetForBackupSet(name);
+      Result res = table.get(get);
+      if (res.isEmpty()) return null;
+      res.advance();
+      String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current());
+      return toList(tables);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  private List<TableName> toList(String[] tables) {
+    List<TableName> list = new ArrayList<TableName>(tables.length);
+    for (String name : tables) {
+      list.add(TableName.valueOf(name));
+    }
+    return list;
+  }
+
+  /**
+   * Add backup set (list of tables)
+   * @param name - set name
+   * @param tables - list of tables, comma-separated
+   * @throws IOException
+   */
+  public void addToBackupSet(String name, String[] newTables) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Backup set add: " + name + " tables [" + StringUtils.join(newTables, " ") + "]");
+    }
+    Table table = null;
+    String[] union = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = BackupSystemTableHelper.createGetForBackupSet(name);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        union = newTables;
+      } else {
+        res.advance();
+        String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current());
+        union = merge(tables, newTables);
+      }
+      Put put = BackupSystemTableHelper.createPutForBackupSet(name, union);
+      table.put(put);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  private String[] merge(String[] tables, String[] newTables) {
+    List<String> list = new ArrayList<String>();
+    // Add all from tables
+    for (String t : tables) {
+      list.add(t);
+    }
+    for (String nt : newTables) {
+      if (list.contains(nt)) continue;
+      list.add(nt);
+    }
+    String[] arr = new String[list.size()];
+    list.toArray(arr);
+    return arr;
+  }
+
+  /**
+   * Remove tables from backup set (list of tables)
+   * @param name - set name
+   * @param tables - list of tables, comma-separated
+   * @throws IOException
+   */
+  public void removeFromBackupSet(String name, String[] toRemove) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" Backup set remove from : " + name + " tables [" + StringUtils.join(toRemove, " ")
+          + "]");
+    }
+    Table table = null;
+    String[] disjoint = null;
+    try {
+      table = connection.getTable(tableName);
+      Get get = BackupSystemTableHelper.createGetForBackupSet(name);
+      Result res = table.get(get);
+      if (res.isEmpty()) {
+        LOG.warn("Backup set '" + name + "' not found.");
+        return;
+      } else {
+        res.advance();
+        String[] tables = BackupSystemTableHelper.cellValueToBackupSet(res.current());
+        disjoint = disjoin(tables, toRemove);
+      }
+      if (disjoint.length > 0) {
+        Put put = BackupSystemTableHelper.createPutForBackupSet(name, disjoint);
+        table.put(put);
+      } else {
+        // Delete
+        // describeBackupSet(name);
+        LOG.warn("Backup set '" + name + "' does not contain tables ["
+            + StringUtils.join(toRemove, " ") + "]");
+      }
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  private String[] disjoin(String[] tables, String[] toRemove) {
+    List<String> list = new ArrayList<String>();
+    // Add all from tables
+    for (String t : tables) {
+      list.add(t);
+    }
+    for (String nt : toRemove) {
+      if (list.contains(nt)) {
+        list.remove(nt);
+      }
+    }
+    String[] arr = new String[list.size()];
+    list.toArray(arr);
+    return arr;
+  }
+
+  /**
+   * Delete backup set
+   * @param name set's name
+   * @throws IOException
+   */
+  public void deleteBackupSet(String name) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(" Backup set delete: " + name);
+    }
+    Table table = null;
+    try {
+      table = connection.getTable(tableName);
+      Delete del = BackupSystemTableHelper.createDeleteForBackupSet(name);
+      table.delete(del);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  /**
+   * Get backup system table descriptor
+   * @return descriptor
+   */
+  public static HTableDescriptor getSystemTableDescriptor() {
+    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+    HColumnDescriptor colSessionsDesc = new HColumnDescriptor(SESSIONS_FAMILY);
+    colSessionsDesc.setMaxVersions(1);
+    // Time to keep backup sessions (secs)
+    Configuration config = HBaseConfiguration.create();
+    int ttl = config.getInt(HConstants.BACKUP_SYSTEM_TTL_KEY, HConstants.BACKUP_SYSTEM_TTL_DEFAULT);
+    colSessionsDesc.setTimeToLive(ttl);
+    tableDesc.addFamily(colSessionsDesc);
+    HColumnDescriptor colMetaDesc = new HColumnDescriptor(META_FAMILY);
+    // colDesc.setMaxVersions(1);
+    tableDesc.addFamily(colMetaDesc);
+    return tableDesc;
+  }
+
+  public static String getTableNameAsString() {
+    return tableName.getNameAsString();
+  }
+
+  public static TableName getTableName() {
+    return tableName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b14e2ab1/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java
new file mode 100644
index 0000000..37f29f8
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/BackupSystemTableHelper.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.BackupInfo;
+import org.apache.hadoop.hbase.backup.util.BackupClientUtil;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * A collection for methods used by BackupSystemTable.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class BackupSystemTableHelper {
+
+  /**
+   * hbase:backup schema: 
+   * 1. Backup sessions rowkey= "session:" + backupId; value = serialized BackupContext 
+   * 2. Backup start code rowkey = "startcode:" + backupRoot; value = startcode 
+   * 3. Incremental backup set rowkey="incrbackupset:" + backupRoot; value=[list of tables] 
+   * 4. Table-RS-timestamp map rowkey="trslm:"+ backupRoot+table_name; value = map[RS-> last WAL
+   * timestamp] 
+   * 5. RS - WAL ts map rowkey="rslogts:"+backupRoot +server; value = last WAL timestamp
+   * 6. WALs recorded rowkey="wals:"+WAL unique file name; value = backupId and full WAL file name
+   */
+
+  private final static String BACKUP_INFO_PREFIX = "session:";
+  private final static String START_CODE_ROW = "startcode:";
+  private final static String INCR_BACKUP_SET = "incrbackupset:";
+  private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
+  private final static String RS_LOG_TS_PREFIX = "rslogts:";
+  private final static String WALS_PREFIX = "wals:";
+  private final static String SET_KEY_PREFIX = "backupset:";
+
+  private final static byte[] EMPTY_VALUE = new byte[] {};
+
+  // Safe delimiter in a string
+  private final static String NULL = "\u0000";
+
+  private BackupSystemTableHelper() {
+    throw new AssertionError("Instantiating utility class...");
+  }
+
+  /**
+   * Creates Put operation for a given backup context object
+   * @param context backup context
+   * @return put operation
+   * @throws IOException exception
+   */
+  static Put createPutForBackupContext(BackupInfo context) throws IOException {
+    Put put = new Put(rowkey(BACKUP_INFO_PREFIX, context.getBackupId()));
+    put.addColumn(BackupSystemTable.SESSIONS_FAMILY, "context".getBytes(), context.toByteArray());
+    return put;
+  }
+
+  /**
+   * Creates Get operation for a given backup id
+   * @param backupId - backup's ID
+   * @return get operation
+   * @throws IOException exception
+   */
+  static Get createGetForBackupContext(String backupId) throws IOException {
+    Get get = new Get(rowkey(BACKUP_INFO_PREFIX, backupId));
+    get.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Delete operation for a given backup id
+   * @param backupId - backup's ID
+   * @return delete operation
+   * @throws IOException exception
+   */
+  public static Delete createDeleteForBackupInfo(String backupId) {
+    Delete del = new Delete(rowkey(BACKUP_INFO_PREFIX, backupId));
+    del.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+    return del;
+  }
+
+  /**
+   * Converts Result to BackupContext
+   * @param res - HBase result
+   * @return backup context instance
+   * @throws IOException exception
+   */
+  static BackupInfo resultToBackupInfo(Result res) throws IOException {
+    res.advance();
+    Cell cell = res.current();
+    return cellToBackupInfo(cell);
+  }
+
+  /**
+   * Creates Get operation to retrieve start code from hbase:backup
+   * @return get operation
+   * @throws IOException exception
+   */
+  static Get createGetForStartCode(String rootPath) throws IOException {
+    Get get = new Get(rowkey(START_CODE_ROW, rootPath));
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Put operation to store start code to hbase:backup
+   * @return put operation
+   * @throws IOException exception
+   */
+  static Put createPutForStartCode(String startCode, String rootPath) {
+    Put put = new Put(rowkey(START_CODE_ROW, rootPath));
+    put.addColumn(BackupSystemTable.META_FAMILY, "startcode".getBytes(), startCode.getBytes());
+    return put;
+  }
+
+  /**
+   * Creates Get to retrieve incremental backup table set from hbase:backup
+   * @return get operation
+   * @throws IOException exception
+   */
+  static Get createGetForIncrBackupTableSet(String backupRoot) throws IOException {
+    Get get = new Get(rowkey(INCR_BACKUP_SET, backupRoot));
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    get.setMaxVersions(1);
+    return get;
+  }
+
+  /**
+   * Creates Put to store incremental backup table set
+   * @param tables tables
+   * @return put operation
+   */
+  static Put createPutForIncrBackupTableSet(Set<TableName> tables, String backupRoot) {
+    Put put = new Put(rowkey(INCR_BACKUP_SET, backupRoot));
+    for (TableName table : tables) {
+      put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes(table.getNameAsString()),
+        EMPTY_VALUE);
+    }
+    return put;
+  }
+
+  /**
+   * Creates Delete for incremental backup table set
+   * @param backupRoot backup root
+   * @return delete operation
+   */
+  static Delete createDeleteForIncrBackupTableSet(String backupRoot) {
+    Delete delete = new Delete(rowkey(INCR_BACKUP_SET, backupRoot));
+    delete.addFamily(BackupSystemTable.META_FAMILY);
+    return delete;
+  }
+
+  /**
+   * Creates Scan operation to load backup history
+   * @return scan operation
+   */
+  static Scan createScanForBackupHistory() {
+    Scan scan = new Scan();
+    byte[] startRow = BACKUP_INFO_PREFIX.getBytes();
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.SESSIONS_FAMILY);
+    scan.setMaxVersions(1);
+    return scan;
+  }
+
+  /**
+   * Converts cell to backup context instance.
+   * @param current - cell
+   * @return backup context instance
+   * @throws IOException exception
+   */
+  static BackupInfo cellToBackupInfo(Cell current) throws IOException {
+    byte[] data = CellUtil.cloneValue(current);
+    return BackupInfo.fromByteArray(data);
+  }
+
+  /**
+   * Creates Put to write RS last roll log timestamp map
+   * @param table - table
+   * @param smap - map, containing RS:ts
+   * @return put operation
+   */
+  static Put createPutForWriteRegionServerLogTimestamp(TableName table, byte[] smap,
+      String backupRoot) {
+    Put put = new Put(rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot, NULL, table.getNameAsString()));
+    put.addColumn(BackupSystemTable.META_FAMILY, "log-roll-map".getBytes(), smap);
+    return put;
+  }
+
+  /**
+   * Creates Scan to load table-> { RS -> ts} map of maps
+   * @return scan operation
+   */
+  static Scan createScanForReadLogTimestampMap(String backupRoot) {
+    Scan scan = new Scan();
+    byte[] startRow = rowkey(TABLE_RS_LOG_MAP_PREFIX, backupRoot);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+
+    return scan;
+  }
+
+  /**
+   * Get table name from rowkey
+   * @param cloneRow rowkey
+   * @return table name
+   */
+  static String getTableNameForReadLogTimestampMap(byte[] cloneRow) {
+    String s = new String(cloneRow);
+    int index = s.lastIndexOf(NULL);
+    return s.substring(index + 1);
+  }
+
+  /**
+   * Creates Put to store RS last log result
+   * @param server - server name
+   * @param timestamp - log roll result (timestamp)
+   * @return put operation
+   */
+  static Put createPutForRegionServerLastLogRollResult(String server, Long timestamp,
+      String backupRoot) {
+    Put put = new Put(rowkey(RS_LOG_TS_PREFIX, backupRoot, NULL, server));
+    put.addColumn(BackupSystemTable.META_FAMILY, "rs-log-ts".getBytes(), timestamp.toString()
+        .getBytes());
+    return put;
+  }
+
+  /**
+   * Creates Scan operation to load last RS log roll results
+   * @return scan operation
+   */
+  static Scan createScanForReadRegionServerLastLogRollResult(String backupRoot) {
+    Scan scan = new Scan();
+    byte[] startRow = rowkey(RS_LOG_TS_PREFIX, backupRoot);
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    scan.setMaxVersions(1);
+
+    return scan;
+  }
+
+  /**
+   * Get server's name from rowkey
+   * @param row - rowkey
+   * @return server's name
+   */
+  static String getServerNameForReadRegionServerLastLogRollResult(byte[] row) {
+    String s = new String(row);
+    int index = s.lastIndexOf(NULL);
+    return s.substring(index + 1);
+  }
+
+  /**
+   * Creates put list for list of WAL files
+   * @param files list of WAL file paths
+   * @param backupId backup id
+   * @return put list
+   * @throws IOException exception
+   */
+  public static List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
+      String backupRoot) throws IOException {
+
+    List<Put> puts = new ArrayList<Put>();
+    for (String file : files) {
+      Put put = new Put(rowkey(WALS_PREFIX, BackupClientUtil.getUniqueWALFileNamePart(file)));
+      put.addColumn(BackupSystemTable.META_FAMILY, "backupId".getBytes(), backupId.getBytes());
+      put.addColumn(BackupSystemTable.META_FAMILY, "file".getBytes(), file.getBytes());
+      put.addColumn(BackupSystemTable.META_FAMILY, "root".getBytes(), backupRoot.getBytes());
+      puts.add(put);
+    }
+    return puts;
+  }
+
+  /**
+   * Creates Scan operation to load WALs TODO: support for backupRoot
+   * @param backupRoot - path to backup destination
+   * @return scan operation
+   */
+  public static Scan createScanForGetWALs(String backupRoot) {
+    Scan scan = new Scan();
+    byte[] startRow = WALS_PREFIX.getBytes();
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    return scan;
+  }
+
+  /**
+   * Creates Get operation for a given wal file name TODO: support for backup destination
+   * @param file file
+   * @return get operation
+   * @throws IOException exception
+   */
+  public static Get createGetForCheckWALFile(String file) throws IOException {
+    Get get = new Get(rowkey(WALS_PREFIX, BackupClientUtil.getUniqueWALFileNamePart(file)));
+    // add backup root column
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    return get;
+  }
+
+  /**
+   * Creates Scan operation to load backup set list
+   * @return scan operation
+   */
+  static Scan createScanForBackupSetList() {
+    Scan scan = new Scan();
+    byte[] startRow = SET_KEY_PREFIX.getBytes();
+    byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
+    stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
+    scan.setStartRow(startRow);
+    scan.setStopRow(stopRow);
+    scan.addFamily(BackupSystemTable.META_FAMILY);
+    return scan;
+  }
+
+  /**
+   * Creates Get operation to load backup set content
+   * @return get operation
+   */
+  static Get createGetForBackupSet(String name) {
+    Get get = new Get(rowkey(SET_KEY_PREFIX, name));
+    get.addFamily(BackupSystemTable.META_FAMILY);
+    return get;
+  }
+
+  /**
+   * Creates Delete operation to delete backup set content
+   * @param name - backup set's name
+   * @return delete operation
+   */
+  static Delete createDeleteForBackupSet(String name) {
+    Delete del = new Delete(rowkey(SET_KEY_PREFIX, name));
+    del.addFamily(BackupSystemTable.META_FAMILY);
+    return del;
+  }
+
+  /**
+   * Creates Put operation to update backup set content
+   * @param name - backup set's name
+   * @param tables - list of tables
+   * @return put operation
+   */
+  static Put createPutForBackupSet(String name, String[] tables) {
+    Put put = new Put(rowkey(SET_KEY_PREFIX, name));
+    byte[] value = convertToByteArray(tables);
+    put.addColumn(BackupSystemTable.META_FAMILY, "tables".getBytes(), value);
+    return put;
+  }
+
+  private static byte[] convertToByteArray(String[] tables) {
+    return StringUtils.join(tables, ",").getBytes();
+  }
+
+  /**
+   * Converts cell to backup set list.
+   * @param current - cell
+   * @return backup set
+   * @throws IOException
+   */
+  static String[] cellValueToBackupSet(Cell current) throws IOException {
+    byte[] data = CellUtil.cloneValue(current);
+    if (data != null && data.length > 0) {
+      return new String(data).split(",");
+    } else {
+      return new String[0];
+    }
+  }
+
+  /**
+   * Converts cell key to backup set name.
+   * @param current - cell
+   * @return backup set name
+   * @throws IOException
+   */
+  static String cellKeyToBackupSetName(Cell current) throws IOException {
+    byte[] data = CellUtil.cloneRow(current);
+    return new String(data).substring(SET_KEY_PREFIX.length());
+  }
+
+  static byte[] rowkey(String s, String... other) {
+    StringBuilder sb = new StringBuilder(s);
+    for (String ss : other) {
+      sb.append(ss);
+    }
+    return sb.toString().getBytes();
+  }
+
+}