You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2013/02/13 19:10:58 UTC

svn commit: r1445786 [2/2] - in /hbase/branches/hbase-7290: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/ hbase-server/src/main/java...

Added: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/RestoreSnapshotHelper.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/RestoreSnapshotHelper.java?rev=1445786&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/RestoreSnapshotHelper.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/RestoreSnapshotHelper.java Wed Feb 13 18:10:58 2013
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot.restore;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.TreeMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.server.snapshot.error.SnapshotExceptionSnare;
+import org.apache.hadoop.hbase.snapshot.exception.RestoreSnapshotException;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+
+/**
+ * Helper to Restore/Clone a Snapshot
+ *
+ * <p>The helper assumes that a table is already created, and by calling restore()
+ * the content present in the snapshot will be restored as the new content of the table.
+ *
+ * <p>Clone from Snapshot: If the target table is empty, the restore operation
+ * is just a "clone operation", where the only operations are:
+ * <ul>
+ *  <li>for each region in the snapshot create a new region
+ *    (note that the region will have a different name, since the encoding contains the table name)
+ *  <li>for each file in the region create a new HFileLink to point to the original file.
+ *  <li>restore the logs, if any
+ * </ul>
+ *
+ * <p>Restore from Snapshot:
+ * <ul>
+ *  <li>for each region in the table verify which are available in the snapshot and which are not
+ *    <ul>
+ *    <li>if the region is not present in the snapshot, remove it.
+ *    <li>if the region is present in the snapshot
+ *      <ul>
+ *      <li>for each file in the table region verify which are available in the snapshot
+ *        <ul>
+ *          <li>if the hfile is not present in the snapshot, remove it
+ *          <li>if the hfile is present, keep it (nothing to do)
+ *        </ul>
+ *      <li>for each file in the snapshot region but not in the table
+ *        <ul>
+ *          <li>create a new HFileLink that point to the original file
+ *        </ul>
+ *      </ul>
+ *    </ul>
+ *  <li>for each region in the snapshot not present in the current table state
+ *    <ul>
+ *    <li>create a new region and for each file in the region create a new HFileLink
+ *      (This is the same as the clone operation)
+ *    </ul>
+ *  <li>restore the logs, if any
+ * </ul>
+ */
+@InterfaceAudience.Private
+public class RestoreSnapshotHelper {
+	private static final Log LOG = LogFactory.getLog(RestoreSnapshotHelper.class);
+
+  private final Map<byte[], byte[]> regionsMap =
+        new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+
+  private final SnapshotExceptionSnare monitor;
+
+  private final SnapshotDescription snapshotDesc;
+  private final Path snapshotDir;
+
+  private final HTableDescriptor tableDesc;
+  private final Path tableDir;
+
+  private final CatalogTracker catalogTracker;
+  private final Configuration conf;
+  private final FileSystem fs;
+
+  public RestoreSnapshotHelper(final Configuration conf, final FileSystem fs,
+      final CatalogTracker catalogTracker,
+      final SnapshotDescription snapshotDescription, final Path snapshotDir,
+      final HTableDescriptor tableDescriptor, final Path tableDir,
+      final SnapshotExceptionSnare monitor)
+  {
+    this.fs = fs;
+    this.conf = conf;
+    this.catalogTracker = catalogTracker;
+    this.snapshotDesc = snapshotDescription;
+    this.snapshotDir = snapshotDir;
+    this.tableDesc = tableDescriptor;
+    this.tableDir = tableDir;
+    this.monitor = monitor;
+  }
+
+  /**
+   * Restore table to a specified snapshot state.
+   */
+  public void restore() throws IOException {
+    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+
+    LOG.debug("starting restore");
+    Set<String> snapshotRegionNames = SnapshotReferenceUtil.getSnapshotRegionNames(fs, snapshotDir);
+    if (snapshotRegionNames == null) {
+      LOG.warn("Nothing to restore. Snapshot " + snapshotDesc + " looks empty");
+      return;
+    }
+
+    // Identify which region are still available and which not.
+    // NOTE: we rely upon the region name as: "table name, start key, end key"
+    List<HRegionInfo> tableRegions = getTableRegions();
+    if (tableRegions != null) {
+      monitor.failOnError();
+      List<HRegionInfo> regionsToRestore = new LinkedList<HRegionInfo>();
+      List<HRegionInfo> regionsToRemove = new LinkedList<HRegionInfo>();
+
+      for (HRegionInfo regionInfo: tableRegions) {
+        String regionName = regionInfo.getEncodedName();
+        if (snapshotRegionNames.contains(regionName)) {
+          LOG.info("region to restore: " + regionName);
+          snapshotRegionNames.remove(regionInfo);
+          regionsToRestore.add(regionInfo);
+        } else {
+          LOG.info("region to remove: " + regionName);
+          regionsToRemove.add(regionInfo);
+        }
+      }
+
+      // Restore regions using the snapshot data
+      monitor.failOnError();
+      restoreRegions(regionsToRestore);
+
+      // Remove regions from the current table
+      monitor.failOnError();
+      ModifyRegionUtils.deleteRegions(fs, catalogTracker, regionsToRemove);
+    }
+
+    // Regions to Add: present in the snapshot but not in the current table
+    if (snapshotRegionNames.size() > 0) {
+      List<HRegionInfo> regionsToAdd = new LinkedList<HRegionInfo>();
+
+      monitor.failOnError();
+      for (String regionName: snapshotRegionNames) {
+        LOG.info("region to add: " + regionName);
+        Path regionDir = new Path(snapshotDir, regionName);
+        regionsToAdd.add(HRegion.loadDotRegionInfoFileContent(fs, regionDir));
+      }
+
+      // Create new regions cloning from the snapshot
+      monitor.failOnError();
+      cloneRegions(regionsToAdd);
+    }
+
+    // Restore WALs
+    monitor.failOnError();
+    restoreWALs();
+  }
+
+  /**
+   * Restore specified regions by restoring content to the snapshot state.
+   */
+  private void restoreRegions(final List<HRegionInfo> regions) throws IOException {
+    if (regions == null || regions.size() == 0) return;
+    for (HRegionInfo hri: regions) restoreRegion(hri);
+  }
+
+  /**
+   * Restore region by removing files not it in the snapshot
+   * and adding the missing ones from the snapshot.
+   */
+  private void restoreRegion(HRegionInfo regionInfo) throws IOException {
+    Path snapshotRegionDir = new Path(snapshotDir, regionInfo.getEncodedName());
+    Map<String, List<String>> snapshotFiles =
+                SnapshotReferenceUtil.getRegionHFileReferences(fs, snapshotRegionDir);
+
+    Path regionDir = new Path(tableDir, regionInfo.getEncodedName());
+    String tableName = tableDesc.getNameAsString();
+
+    for (Map.Entry<String, List<String>> familyEntry: snapshotFiles.entrySet()) {
+      byte[] family = Bytes.toBytes(familyEntry.getKey());
+      Path familyDir = new Path(regionDir, familyEntry.getKey());
+      Set<String> familyFiles = getTableRegionFamilyFiles(familyDir);
+
+      List<String> hfilesToAdd = new LinkedList<String>();
+      for (String hfileName: familyEntry.getValue()) {
+        if (familyFiles.contains(hfileName)) {
+          // HFile already present
+          familyFiles.remove(hfileName);
+        } else {
+          // HFile missing
+          hfilesToAdd.add(hfileName);
+        }
+      }
+
+      // Remove hfiles not present in the snapshot
+      for (String hfileName: familyFiles) {
+        Path hfile = new Path(familyDir, hfileName);
+        LOG.trace("Removing hfile=" + hfile + " from table=" + tableName);
+        HFileArchiver.archiveStoreFile(fs, regionInfo, conf, tableDir, family, hfile);
+      }
+
+      // Restore Missing files
+      for (String hfileName: hfilesToAdd) {
+        LOG.trace("Adding HFileLink " + hfileName + " to table=" + tableName);
+        restoreStoreFile(familyDir, regionInfo, hfileName);
+      }
+    }
+  }
+
+  /**
+   * @return The set of files in the specified family directory.
+   */
+  private Set<String> getTableRegionFamilyFiles(final Path familyDir) throws IOException {
+    Set<String> familyFiles = new HashSet<String>();
+
+    FileStatus[] hfiles = FSUtils.listStatus(fs, familyDir);
+    if (hfiles == null) return familyFiles;
+
+    for (FileStatus hfileRef: hfiles) {
+      String hfileName = hfileRef.getPath().getName();
+      familyFiles.add(hfileName);
+    }
+
+    return familyFiles;
+  }
+
+  /**
+   * Clone specified regions. For each region create a new region
+   * and create a HFileLink for each hfile.
+   */
+  private void cloneRegions(final List<HRegionInfo> regions) throws IOException {
+    if (regions == null || regions.size() == 0) return;
+
+    final Map<String, HRegionInfo> snapshotRegions =
+      new HashMap<String, HRegionInfo>(regions.size());
+
+    // clone region info (change embedded tableName with the new one)
+    HRegionInfo[] clonedRegionsInfo = new HRegionInfo[regions.size()];
+    for (int i = 0; i < clonedRegionsInfo.length; ++i) {
+      // clone the region info from the snapshot region info
+      HRegionInfo snapshotRegionInfo = regions.get(i);
+      clonedRegionsInfo[i] = cloneRegionInfo(snapshotRegionInfo);
+
+      // add the region name mapping between snapshot and cloned
+      String snapshotRegionName = snapshotRegionInfo.getEncodedName();
+      String clonedRegionName = clonedRegionsInfo[i].getEncodedName();
+      regionsMap.put(Bytes.toBytes(snapshotRegionName), Bytes.toBytes(clonedRegionName));
+      LOG.info("clone region=" + snapshotRegionName + " as " + clonedRegionName);
+
+      // Add mapping between cloned region name and snapshot region info
+      snapshotRegions.put(clonedRegionName, snapshotRegionInfo);
+    }
+
+    // create the regions on disk
+    List<HRegionInfo> clonedRegions = ModifyRegionUtils.createRegions(conf, FSUtils.getRootDir(conf),
+      tableDesc, clonedRegionsInfo, catalogTracker, new ModifyRegionUtils.RegionFillTask() {
+        public void fillRegion(final HRegion region) throws IOException {
+          cloneRegion(region, snapshotRegions.get(region.getRegionInfo().getEncodedName()));
+        }
+      });
+    if (regions != null && regions.size() > 0) {
+      // add regions to .META.
+      MetaEditor.addRegionsToMeta(catalogTracker, clonedRegions);
+    }
+  }
+
+  /**
+   * Clone region directory content from the snapshot info.
+   *
+   * Each region is encoded with the table name, so the cloned region will have
+   * a different region name.
+   *
+   * Instead of copying the hfiles a HFileLink is created.
+   *
+   * @param region {@link HRegion} cloned
+   * @param snapshotRegionInfo
+   */
+  private void cloneRegion(final HRegion region, final HRegionInfo snapshotRegionInfo)
+      throws IOException {
+    final Path snapshotRegionDir = new Path(snapshotDir, snapshotRegionInfo.getEncodedName());
+    final Path regionDir = new Path(tableDir, region.getRegionInfo().getEncodedName());
+    final String tableName = tableDesc.getNameAsString();
+    SnapshotReferenceUtil.visitRegionStoreFiles(fs, snapshotRegionDir,
+      new FSVisitor.StoreFileVisitor() {
+        public void storeFile (final String region, final String family, final String hfile)
+            throws IOException {
+          LOG.info("Adding HFileLink " + hfile + " to table=" + tableName);
+          Path familyDir = new Path(regionDir, family);
+          restoreStoreFile(familyDir, snapshotRegionInfo, hfile);
+        }
+    });
+  }
+
+  /**
+   * Create a new {@link HFileLink} to reference the store file.
+   *
+   * @param familyDir destination directory for the store file
+   * @param regionInfo destination region info for the table
+   * @param hfileName store file name (can be a Reference, HFileLink or simple HFile)
+   */
+  private void restoreStoreFile(final Path familyDir, final HRegionInfo regionInfo,
+      final String hfileName) throws IOException {
+    if (HFileLink.isHFileLink(hfileName)) {
+      HFileLink.createFromHFileLink(conf, fs, familyDir, hfileName);
+    } else {
+      HFileLink.create(conf, fs, familyDir, regionInfo, hfileName);
+    }
+  }
+
+  /**
+   * Create a new {@link HRegionInfo} from the snapshot region info.
+   * Keep the same startKey, endKey, regionId and split information but change
+   * the table name.
+   *
+   * @param snapshotRegionInfo Info for region to clone.
+   * @return the new HRegion instance
+   */
+  public HRegionInfo cloneRegionInfo(final HRegionInfo snapshotRegionInfo) {
+    return new HRegionInfo(tableDesc.getName(),
+                      snapshotRegionInfo.getStartKey(), snapshotRegionInfo.getEndKey(),
+                      snapshotRegionInfo.isSplit(), snapshotRegionInfo.getRegionId());
+  }
+
+  /**
+   * Restore snapshot WALs.
+   *
+   * Global Snapshot keep a reference to region servers logs present during the snapshot.
+   * (/hbase/.snapshot/snapshotName/.logs/hostName/logName)
+   *
+   * Since each log contains different tables data, logs must be split to
+   * extract the table that we are interested in.
+   */
+  private void restoreWALs() throws IOException {
+    final SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
+                                Bytes.toBytes(snapshotDesc.getTable()), regionsMap);
+    try {
+      // Recover.Edits
+      SnapshotReferenceUtil.visitRecoveredEdits(fs, snapshotDir,
+          new FSVisitor.RecoveredEditsVisitor() {
+        public void recoveredEdits (final String region, final String logfile) throws IOException {
+          Path path = SnapshotReferenceUtil.getRecoveredEdits(snapshotDir, region, logfile);
+          logSplitter.splitRecoveredEdit(path);
+        }
+      });
+
+      // Region Server Logs
+      SnapshotReferenceUtil.visitLogFiles(fs, snapshotDir, new FSVisitor.LogFileVisitor() {
+        public void logFile (final String server, final String logfile) throws IOException {
+          logSplitter.splitLog(server, logfile);
+        }
+      });
+    } finally {
+      logSplitter.close();
+    }
+  }
+
+  /**
+   * @return the set of the regions contained in the table
+   */
+  private List<HRegionInfo> getTableRegions() throws IOException {
+    LOG.debug("get table regions: " + tableDir);
+    FileStatus[] regionDirs = FSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
+    if (regionDirs == null) return null;
+
+    List<HRegionInfo> regions = new LinkedList<HRegionInfo>();
+    for (FileStatus regionDir: regionDirs) {
+      HRegionInfo hri = HRegion.loadDotRegionInfoFileContent(fs, regionDir.getPath());
+      regions.add(hri);
+    }
+    LOG.debug("found " + regions.size() + " regions for table=" + tableDesc.getNameAsString());
+    return regions;
+  }
+
+  /**
+   * Create a new table descriptor cloning the snapshot table schema.
+   *
+   * @param admin
+   * @param snapshotTableDescriptor
+   * @param tableName
+   * @return cloned table descriptor
+   * @throws IOException
+   */
+  public static HTableDescriptor cloneTableSchema(final HTableDescriptor snapshotTableDescriptor,
+      final byte[] tableName) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for (HColumnDescriptor hcd: snapshotTableDescriptor.getColumnFamilies()) {
+      htd.addFamily(hcd);
+    }
+    return htd;
+  }
+}

Added: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/SnapshotLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/SnapshotLogSplitter.java?rev=1445786&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/SnapshotLogSplitter.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/restore/SnapshotLogSplitter.java Wed Feb 13 18:10:58 2013
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.snapshot.restore;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.io.HLogLink;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * If the snapshot has references to one or more log files,
+ * those must be split (each log contains multiple tables and regions)
+ * and must be placed in the region/recovered.edits folder.
+ * (recovered.edits files will be played on region startup)
+ *
+ * In case of Restore: the log can just be split in the recovered.edits folder.
+ * In case of Clone: each entry in the log must be modified to use the new region name.
+ * (region names are encoded with: tableName, startKey, regionIdTimeStamp)
+ *
+ * We can't use the normal split code, because the HLogKey contains the
+ * table name and the region name, and in case of "clone from snapshot"
+ * region name and table name will be different and must be replaced in
+ * the recovered.edits.
+ */
+@InterfaceAudience.Private
+class SnapshotLogSplitter implements Closeable {
+  static final Log LOG = LogFactory.getLog(SnapshotLogSplitter.class);
+
+  private final class LogWriter implements Closeable {
+    private HLog.Writer writer;
+    private Path logFile;
+    private long seqId;
+
+    public LogWriter(final Configuration conf, final FileSystem fs,
+        final Path logDir, long seqId) throws IOException {
+      logFile = new Path(logDir, logFileName(seqId, true));
+      this.writer = HLogFactory.createWriter(fs, logFile, conf);
+      this.seqId = seqId;
+    }
+
+    public void close() throws IOException {
+      writer.close();
+
+      Path finalFile = new Path(logFile.getParent(), logFileName(seqId, false));
+      LOG.debug("LogWriter tmpLogFile=" + logFile + " -> logFile=" + finalFile);
+      fs.rename(logFile, finalFile);
+    }
+
+    public void append(final HLog.Entry entry) throws IOException {
+      writer.append(entry);
+      if (seqId < entry.getKey().getLogSeqNum()) {
+        seqId = entry.getKey().getLogSeqNum();
+      }
+    }
+
+    private String logFileName(long seqId, boolean temp) {
+      String fileName = String.format("%019d", seqId);
+      if (temp) fileName += HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
+      return fileName;
+    }
+  }
+
+  private final Map<byte[], LogWriter> regionLogWriters =
+      new TreeMap<byte[], LogWriter>(Bytes.BYTES_COMPARATOR);
+
+  private final Map<byte[], byte[]> regionsMap;
+  private final Configuration conf;
+  private final byte[] snapshotTableName;
+  private final byte[] tableName;
+  private final Path tableDir;
+  private final FileSystem fs;
+
+  /**
+   * @params tableName snapshot table name
+   * @params regionsMap maps original region names to the new ones.
+   */
+  public SnapshotLogSplitter(final Configuration conf, final FileSystem fs,
+      final Path tableDir, final byte[] snapshotTableName,
+      final Map<byte[], byte[]> regionsMap) {
+    this.regionsMap = regionsMap;
+    this.snapshotTableName = snapshotTableName;
+    this.tableName = Bytes.toBytes(tableDir.getName());
+    this.tableDir = tableDir;
+    this.conf = conf;
+    this.fs = fs;
+  }
+
+  public void close() throws IOException {
+    for (LogWriter writer: regionLogWriters.values()) {
+      writer.close();
+    }
+  }
+
+  public void splitLog(final String serverName, final String logfile) throws IOException {
+    LOG.debug("Restore log=" + logfile + " server=" + serverName +
+              " for snapshotTable=" + Bytes.toString(snapshotTableName) +
+              " to table=" + Bytes.toString(tableName));
+    splitLog(new HLogLink(conf, serverName, logfile).getAvailablePath(fs));
+  }
+
+  public void splitRecoveredEdit(final Path editPath) throws IOException {
+    LOG.debug("Restore recover.edits=" + editPath +
+              " for snapshotTable=" + Bytes.toString(snapshotTableName) +
+              " to table=" + Bytes.toString(tableName));
+    splitLog(editPath);
+  }
+
+  /**
+   * Split the snapshot HLog reference into regions recovered.edits.
+   *
+   * The HLogKey contains the table name and the region name,
+   * and they must be changed to the restored table names.
+   *
+   * @param logPath Snapshot HLog reference path
+   */
+  public void splitLog(final Path logPath) throws IOException {
+    HLog.Reader log = HLogFactory.createReader(fs, logPath, conf);
+    try {
+      HLog.Entry entry;
+      LogWriter writer = null;
+      byte[] regionName = null;
+      byte[] newRegionName = null;
+      while ((entry = log.next()) != null) {
+        HLogKey key = entry.getKey();
+
+        // We're interested only in the snapshot table that we're restoring
+        if (!Bytes.equals(key.getTablename(), snapshotTableName)) continue;
+
+        // Writer for region.
+        if (!Bytes.equals(regionName, key.getEncodedRegionName())) {
+          regionName = key.getEncodedRegionName().clone();
+
+          // Get the new region name in case of clone, or use the original one
+          newRegionName = regionsMap.get(regionName);
+          if (newRegionName == null) newRegionName = regionName;
+
+          writer = getOrCreateWriter(newRegionName, key.getLogSeqNum());
+          LOG.debug("+ regionName=" + Bytes.toString(regionName));
+        }
+
+        // Append Entry
+        key = new HLogKey(newRegionName, tableName,
+                          key.getLogSeqNum(), key.getWriteTime(), key.getClusterId());
+        writer.append(new HLog.Entry(key, entry.getEdit()));
+      }
+    } catch (IOException e) {
+      LOG.warn("Something wrong during the log split", e);
+    } finally {
+      log.close();
+    }
+  }
+
+  /**
+   * Create a LogWriter for specified region if not already created.
+   */
+  private LogWriter getOrCreateWriter(final byte[] regionName, long seqId) throws IOException {
+    LogWriter writer = regionLogWriters.get(regionName);
+    if (writer == null) {
+      Path regionDir = HRegion.getRegionDir(tableDir, Bytes.toString(regionName));
+      Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regionDir);
+      fs.mkdirs(dir);
+
+      writer = new LogWriter(conf, fs, dir, seqId);
+      regionLogWriters.put(regionName, writer);
+    }
+    return(writer);
+  }
+}

Added: hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java?rev=1445786&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java Wed Feb 13 18:10:58 2013
@@ -0,0 +1,213 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * Utility methods for interacting with the regions.
+ */
+@InterfaceAudience.Private
+public abstract class ModifyRegionUtils {
+  private static final Log LOG = LogFactory.getLog(ModifyRegionUtils.class);
+
+  private ModifyRegionUtils() {
+  }
+
+  public interface RegionFillTask {
+    public void fillRegion(final HRegion region) throws IOException;
+  }
+
+  /**
+   * Create new set of regions on the specified file-system.
+   * NOTE: that you should add the regions to .META. after this operation.
+   *
+   * @param conf {@link Configuration}
+   * @param rootDir Root directory for HBase instance
+   * @param hTableDescriptor description of the table
+   * @param newRegions {@link HRegionInfo} that describes the regions to create
+   * @param catalogTracker the catalog tracker
+   * @throws IOException
+   */
+  public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
+      final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+      final CatalogTracker catalogTracker) throws IOException {
+    return createRegions(conf, rootDir, hTableDescriptor, newRegions, catalogTracker, null);
+  }
+
+  /**
+   * Create new set of regions on the specified file-system.
+   * NOTE: that you should add the regions to .META. after this operation.
+   *
+   * @param conf {@link Configuration}
+   * @param rootDir Root directory for HBase instance
+   * @param hTableDescriptor description of the table
+   * @param newRegions {@link HRegionInfo} that describes the regions to create
+   * @param catalogTracker the catalog tracker
+   * @param task {@link RegionFillTask} custom code to populate region after creation
+   * @throws IOException
+   */
+  public static List<HRegionInfo> createRegions(final Configuration conf, final Path rootDir,
+      final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
+      final CatalogTracker catalogTracker, final RegionFillTask task) throws IOException {
+    if (newRegions == null) return null;
+    int regionNumber = newRegions.length;
+    ThreadPoolExecutor regionOpenAndInitThreadPool = getRegionOpenAndInitThreadPool(conf,
+        "RegionOpenAndInitThread-" + hTableDescriptor.getNameAsString(), regionNumber);
+    CompletionService<HRegionInfo> completionService = new ExecutorCompletionService<HRegionInfo>(
+        regionOpenAndInitThreadPool);
+    List<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>();
+    for (final HRegionInfo newRegion : newRegions) {
+      completionService.submit(new Callable<HRegionInfo>() {
+        public HRegionInfo call() throws IOException {
+          // 1. Create HRegion
+          HRegion region = HRegion.createHRegion(newRegion,
+              rootDir, conf, hTableDescriptor, null,
+              false, true);
+          try {
+            // 2. Custom user code to interact with the created region
+            if (task != null) {
+              task.fillRegion(region);
+            }
+          } finally {
+            // 3. Close the new region to flush to disk. Close log file too.
+            region.close();
+          }
+          return region.getRegionInfo();
+        }
+      });
+    }
+    try {
+      // 4. wait for all regions to finish creation
+      for (int i = 0; i < regionNumber; i++) {
+        Future<HRegionInfo> future = completionService.take();
+        HRegionInfo regionInfo = future.get();
+        regionInfos.add(regionInfo);
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Caught " + e + " during region creation");
+      throw new InterruptedIOException(e.getMessage());
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    } finally {
+      regionOpenAndInitThreadPool.shutdownNow();
+    }
+    return regionInfos;
+  }
+
+  /*
+   * used by createRegions() to get the thread pool executor based on the
+   * "hbase.hregion.open.and.init.threads.max" property.
+   */
+  static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
+      final String threadNamePrefix, int regionNumber) {
+    int maxThreads = Math.min(regionNumber, conf.getInt(
+        "hbase.hregion.open.and.init.threads.max", 10));
+    ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
+    .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
+        new ThreadFactory() {
+          private int count = 1;
+
+          public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, threadNamePrefix + "-" + count++);
+            return t;
+          }
+        });
+    return regionOpenAndInitThreadPool;
+  }
+
+  /**
+   * Trigger immediate assignment of the regions in round-robin fashion
+   *
+   * @param assignmentManager
+   * @param regions
+   */
+  public static void assignRegions(final AssignmentManager assignmentManager,
+      final List<HRegionInfo> regions) throws IOException {
+    try {
+      assignmentManager.getRegionStates().createRegionStates(regions);
+      assignmentManager.assign(regions);
+    } catch (InterruptedException e) {
+      LOG.error("Caught " + e + " during round-robin assignment");
+      throw new InterruptedIOException(e.getMessage());
+    }
+  }
+
+  /**
+   * Remove specified regions by removing them from file-system and .META.
+   * (The regions must be offline).
+   *
+   * @param fs {@link FileSystem} on which to delete the region directory
+   * @param catalogTracker the catalog tracker
+   * @param regions list of {@link HRegionInfo} to delete.
+   */
+  public static void deleteRegions(final FileSystem fs, final CatalogTracker catalogTracker,
+      final List<HRegionInfo> regions) throws IOException {
+    if (regions != null && regions.size() > 0) {
+      for (HRegionInfo hri: regions) {
+        deleteRegion(fs, catalogTracker, hri);
+      }
+    }
+  }
+
+  /**
+   * Remove region from file-system and .META.
+   * (The region must be offline).
+   *
+   * @param fs {@link FileSystem} on which to delete the region directory
+   * @param catalogTracker the catalog tracker
+   * @param regionInfo {@link HRegionInfo} to delete.
+   */
+  public static void deleteRegion(final FileSystem fs, final CatalogTracker catalogTracker,
+      final HRegionInfo regionInfo) throws IOException {
+    // Remove region from .META.
+    MetaEditor.deleteRegion(catalogTracker, regionInfo);
+
+    // "Delete" region from FS
+    HFileArchiver.archiveRegion(fs, regionInfo);
+  }
+}

Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java?rev=1445786&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java Wed Feb 13 18:10:58 2013
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.snapshot.exception.SnapshotDoesNotExistException;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSTableDescriptors;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MD5Hash;
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test clone/restore snapshots from the client
+ */
+@Category(LargeTests.class)
+public class TestRestoreSnapshotFromClient {
+  final Log LOG = LogFactory.getLog(getClass());
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private final byte[] FAMILY = Bytes.toBytes("cf");
+
+  private byte[] snapshotName0;
+  private byte[] snapshotName1;
+  private byte[] snapshotName2;
+  private int snapshot0Rows;
+  private int snapshot1Rows;
+  private byte[] tableName;
+  private HBaseAdmin admin;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
+    TEST_UTIL.getConfiguration().setBoolean(
+        "hbase.master.enabletable.roundrobin", true);
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Initialize the tests with a table filled with some data
+   * and two snapshots (snapshotName0, snapshotName1) of different states.
+   * The tableName, snapshotNames and the number of rows in the snapshot are initialized.
+   */
+  @Before
+  public void setup() throws Exception {
+    this.admin = TEST_UTIL.getHBaseAdmin();
+
+    long tid = System.currentTimeMillis();
+    tableName = Bytes.toBytes("testtb-" + tid);
+    snapshotName0 = Bytes.toBytes("snaptb0-" + tid);
+    snapshotName1 = Bytes.toBytes("snaptb1-" + tid);
+    snapshotName2 = Bytes.toBytes("snaptb2-" + tid);
+
+    // create Table and disable it
+    createTable(tableName, FAMILY);
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    loadData(table, 500, FAMILY);
+    snapshot0Rows = TEST_UTIL.countRows(table);
+    admin.disableTable(tableName);
+
+    // take a snapshot
+    admin.snapshot(snapshotName0, tableName);
+
+    // enable table and insert more data
+    admin.enableTable(tableName);
+    loadData(table, 500, FAMILY);
+    snapshot1Rows = TEST_UTIL.countRows(table);
+    admin.disableTable(tableName);
+
+    // take a snapshot of the updated table
+    admin.snapshot(snapshotName1, tableName);
+
+    // re-enable table
+    admin.enableTable(tableName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+    admin.deleteSnapshot(snapshotName0);
+    admin.deleteSnapshot(snapshotName1);
+  }
+
+  @Test
+  public void testRestoreSnapshot() throws IOException {
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    assertEquals(snapshot1Rows, TEST_UTIL.countRows(table));
+
+    // Restore from snapshot-0
+    admin.disableTable(tableName);
+    admin.restoreSnapshot(snapshotName0);
+    admin.enableTable(tableName);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
+
+    // Restore from snapshot-1
+    admin.disableTable(tableName);
+    admin.restoreSnapshot(snapshotName1);
+    admin.enableTable(tableName);
+    table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    assertEquals(snapshot1Rows, TEST_UTIL.countRows(table));
+  }
+
+  @Test(expected=SnapshotDoesNotExistException.class)
+  public void testCloneNonExistentSnapshot() throws IOException, InterruptedException {
+    String snapshotName = "random-snapshot-" + System.currentTimeMillis();
+    String tableName = "random-table-" + System.currentTimeMillis();
+    admin.cloneSnapshot(snapshotName, tableName);
+  }
+
+  @Test
+  public void testCloneSnapshot() throws IOException, InterruptedException {
+    byte[] clonedTableName = Bytes.toBytes("clonedtb-" + System.currentTimeMillis());
+    testCloneSnapshot(clonedTableName, snapshotName0, snapshot0Rows);
+    testCloneSnapshot(clonedTableName, snapshotName1, snapshot1Rows);
+  }
+
+  private void testCloneSnapshot(final byte[] tableName, final byte[] snapshotName,
+      int snapshotRows) throws IOException, InterruptedException {
+    // create a new table from snapshot
+    admin.cloneSnapshot(snapshotName, tableName);
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
+    assertEquals(snapshotRows, TEST_UTIL.countRows(table));
+
+    admin.disableTable(tableName);
+    admin.deleteTable(tableName);
+  }
+
+  @Test
+  public void testRestoreSnapshotOfCloned() throws IOException, InterruptedException {
+    byte[] clonedTableName = Bytes.toBytes("clonedtb-" + System.currentTimeMillis());
+    admin.cloneSnapshot(snapshotName0, clonedTableName);
+    HTable table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
+    assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
+    admin.disableTable(clonedTableName);
+    admin.snapshot(snapshotName2, clonedTableName);
+    admin.deleteTable(clonedTableName);
+
+    admin.cloneSnapshot(snapshotName2, clonedTableName);
+    table = new HTable(TEST_UTIL.getConfiguration(), clonedTableName);
+    assertEquals(snapshot0Rows, TEST_UTIL.countRows(table));
+    admin.disableTable(clonedTableName);
+    admin.deleteTable(clonedTableName);
+  }
+
+  // ==========================================================================
+  //  Helpers
+  // ==========================================================================
+  private void createTable(final byte[] tableName, final byte[]... families) throws IOException {
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    for (byte[] family: families) {
+      HColumnDescriptor hcd = new HColumnDescriptor(family);
+      htd.addFamily(hcd);
+    }
+    byte[][] splitKeys = new byte[16][];
+    byte[] hex = Bytes.toBytes("0123456789abcdef");
+    for (int i = 0; i < 16; ++i) {
+      splitKeys[i] = new byte[] { hex[i] };
+    }
+    admin.createTable(htd, splitKeys);
+  }
+
+  public void loadData(final HTable table, int rows, byte[]... families) throws IOException {
+    byte[] qualifier = Bytes.toBytes("q");
+    table.setAutoFlush(false);
+    while (rows-- > 0) {
+      byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
+      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
+      Put put = new Put(key);
+      put.setWriteToWAL(false);
+      for (byte[] family: families) {
+        put.add(family, qualifier, value);
+      }
+      table.put(put);
+    }
+    table.flushCommits();
+  }
+}

Added: hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/restore/TestSnapshotLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/restore/TestSnapshotLogSplitter.java?rev=1445786&view=auto
==============================================================================
--- hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/restore/TestSnapshotLogSplitter.java (added)
+++ hbase/branches/hbase-7290/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/restore/TestSnapshotLogSplitter.java Wed Feb 13 18:10:58 2013
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.snapshot.restore;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test snapshot log splitter
+ */
+@Category(SmallTests.class)
+public class TestSnapshotLogSplitter {
+  final Log LOG = LogFactory.getLog(getClass());
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private byte[] TEST_QUALIFIER = Bytes.toBytes("q");
+  private byte[] TEST_FAMILY = Bytes.toBytes("f");
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path logFile;
+
+  @Before
+  public void setup() throws Exception {
+    conf = TEST_UTIL.getConfiguration();
+    fs = FileSystem.get(conf);
+    logFile = new Path(TEST_UTIL.getDataTestDir(), "test.log");
+    writeTestLog(logFile);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(logFile, false);
+  }
+
+  @Test
+  public void testSplitLogs() throws IOException {
+    Map<byte[], byte[]> regionsMap = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+    splitTestLogs(getTableName(5), regionsMap);
+  }
+
+  @Test
+  public void testSplitLogsOnDifferentTable() throws IOException {
+    byte[] tableName = getTableName(1);
+    Map<byte[], byte[]> regionsMap = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+    for (int j = 0; j < 10; ++j) {
+      byte[] regionName = getRegionName(tableName, j);
+      byte[] newRegionName = getNewRegionName(tableName, j);
+      regionsMap.put(regionName, newRegionName);
+    }
+    splitTestLogs(tableName, regionsMap);
+  }
+
+  /*
+   * Split and verify test logs for the specified table
+   */
+  private void splitTestLogs(final byte[] tableName, final Map<byte[], byte[]> regionsMap)
+      throws IOException {
+    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), Bytes.toString(tableName));
+    SnapshotLogSplitter logSplitter = new SnapshotLogSplitter(conf, fs, tableDir,
+      tableName, regionsMap);
+    try {
+      logSplitter.splitLog(logFile);
+    } finally {
+      logSplitter.close();
+    }
+    verifyRecoverEdits(tableDir, tableName, regionsMap);
+  }
+
+  /*
+   * Verify that every logs in the table directory has just the specified table and regions.
+   */
+  private void verifyRecoverEdits(final Path tableDir, final byte[] tableName,
+      final Map<byte[], byte[]> regionsMap) throws IOException {
+    for (FileStatus regionStatus: FSUtils.listStatus(fs, tableDir)) {
+      assertTrue(regionStatus.getPath().getName().startsWith(Bytes.toString(tableName)));
+      Path regionEdits = HLogUtil.getRegionDirRecoveredEditsDir(regionStatus.getPath());
+      byte[] regionName = Bytes.toBytes(regionStatus.getPath().getName());
+      assertFalse(regionsMap.containsKey(regionName));
+      for (FileStatus logStatus: FSUtils.listStatus(fs, regionEdits)) {
+        HLog.Reader reader = HLogFactory.createReader(fs, logStatus.getPath(), conf);
+        try {
+          HLog.Entry entry;
+          while ((entry = reader.next()) != null) {
+            HLogKey key = entry.getKey();
+            assertArrayEquals(tableName, key.getTablename());
+            assertArrayEquals(regionName, key.getEncodedRegionName());
+          }
+        } finally {
+          reader.close();
+        }
+      }
+    }
+  }
+
+  /*
+   * Write some entries in the log file.
+   * 7 different tables with name "testtb-%d"
+   * 10 region per table with name "tableName-region-%d"
+   * 50 entry with row key "row-%d"
+   */
+  private void writeTestLog(final Path logFile) throws IOException {
+    fs.mkdirs(logFile.getParent());
+    HLog.Writer writer = HLogFactory.createWriter(fs, logFile, conf);
+    try {
+      for (int i = 0; i < 7; ++i) {
+        byte[] tableName = getTableName(i);
+        for (int j = 0; j < 10; ++j) {
+          byte[] regionName = getRegionName(tableName, j);
+          for (int k = 0; k < 50; ++k) {
+            byte[] rowkey = Bytes.toBytes("row-" + k);
+            HLogKey key = new HLogKey(regionName, tableName, (long)k,
+              System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
+            WALEdit edit = new WALEdit();
+            edit.add(new KeyValue(rowkey, TEST_FAMILY, TEST_QUALIFIER, rowkey));
+            writer.append(new HLog.Entry(key, edit));
+          }
+        }
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  private byte[] getTableName(int tableId) {
+    return Bytes.toBytes("testtb-" + tableId);
+  }
+
+  private byte[] getRegionName(final byte[] tableName, int regionId) {
+    return Bytes.toBytes(Bytes.toString(tableName) + "-region-" + regionId);
+  }
+
+  private byte[] getNewRegionName(final byte[] tableName, int regionId) {
+    return Bytes.toBytes(Bytes.toString(tableName) + "-new-region-" + regionId);
+  }
+}