You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/09/28 08:27:45 UTC

svn commit: r1391326 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/master/cleaner/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/a...

Author: stack
Date: Fri Sep 28 06:27:44 2012
New Revision: 1391326

URL: http://svn.apache.org/viewvc?rev=1391326&view=rev
Log:
HBASE-6610 HFileLink: Hardlink alternative for snapshot restore

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java?rev=1391326&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java Fri Sep 28 06:27:44 2012
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.FileNotFoundException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * The FileLink is a sort of hardlink, that allows to access a file given a set of locations.
+ *
+ * <p><b>The Problem:</b>
+ * <ul>
+ *  <li>
+ *    HDFS doesn't have support for hardlinks, and this make impossible to referencing
+ *    the same data blocks using different names.
+ *  </li>
+ *  <li>
+ *    HBase store files in one location (e.g. table/region/family/) and when the file is not
+ *    needed anymore (e.g. compaction, region deletetion, ...) moves it to an archive directory.
+ *  </li>
+ * </ul>
+ * If we want to create a reference to a file, we need to remember that it can be in its
+ * original location or in the archive folder.
+ * The FileLink class tries to abstract this concept and given a set of locations
+ * it is able to switch between them making this operation transparent for the user.
+ * More concrete implementations of the FileLink are the {@link HFileLink} and the {@link HLogLink}.
+ *
+ * <p><b>Back-references:</b>
+ * To help the {@link CleanerChore} to keep track of the links to a particular file,
+ * during the FileLink creation, a new file is placed inside a back-reference directory.
+ * There's one back-reference directory for each file that has links,
+ * and in the directory there's one file per link.
+ *
+ * <p>HFileLink Example
+ * <ul>
+ *  <li>
+ *      /hbase/table/region-x/cf/file-k
+ *      (Original File)
+ *  </li>
+ *  <li>
+ *      /hbase/table-cloned/region-y/cf/file-k.region-x.table
+ *     (HFileLink to the original file)
+ *  </li>
+ *  <li>
+ *      /hbase/table-2nd-cloned/region-z/cf/file-k.region-x.table
+ *      (HFileLink to the original file)
+ *  </li>
+ *  <li>
+ *      /hbase/.archive/table/region-x/.links-file-k/region-y.table-cloned
+ *      (Back-reference to the link in table-cloned)
+ *  </li>
+ *  <li>
+ *      /hbase/.archive/table/region-x/.links-file-k/region-z.table-cloned
+ *      (Back-reference to the link in table-2nd-cloned)
+ *  </li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+public class FileLink {
+  private static final Log LOG = LogFactory.getLog(FileLink.class);
+
+  /** Define the Back-reference directory name prefix: .links-<hfile>/ */
+  public static final String BACK_REFERENCES_DIRECTORY_PREFIX = ".links-";
+
+  /**
+   * FileLink InputStream that handles the switch between the original path
+   * and the alternative locations, when the file is moved.
+   */
+  private static class FileLinkInputStream extends InputStream
+      implements Seekable, PositionedReadable {
+    private FSDataInputStream in = null;
+    private Path currentPath = null;
+    private long pos = 0;
+
+    private final FileLink fileLink;
+    private final int bufferSize;
+    private final FileSystem fs;
+
+    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
+        throws IOException {
+      this(fs, fileLink, fs.getConf().getInt("io.file.buffer.size", 4096));
+    }
+
+    public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
+        throws IOException {
+      this.bufferSize = bufferSize;
+      this.fileLink = fileLink;
+      this.fs = fs;
+
+      this.in = tryOpen();
+    }
+
+    @Override
+    public int read() throws IOException {
+      int res;
+      try {
+        res = in.read();
+      } catch (FileNotFoundException e) {
+        res = tryOpen().read();
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        res = tryOpen().read();
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        res = tryOpen().read();
+      }
+      if (res > 0) pos += 1;
+      return res;
+    }
+
+    @Override
+    public int read(byte b[]) throws IOException {
+       return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte b[], int off, int len) throws IOException {
+      int n;
+      try {
+        n = in.read(b, off, len);
+      } catch (FileNotFoundException e) {
+        n = tryOpen().read(b, off, len);
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        n = tryOpen().read(b, off, len);
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        n = tryOpen().read(b, off, len);
+      }
+      if (n > 0) pos += n;
+      assert(in.getPos() == pos);
+      return n;
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+      int n;
+      try {
+        n = in.read(position, buffer, offset, length);
+      } catch (FileNotFoundException e) {
+        n = tryOpen().read(position, buffer, offset, length);
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        n = tryOpen().read(position, buffer, offset, length);
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        n = tryOpen().read(position, buffer, offset, length);
+      }
+      return n;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+      readFully(position, buffer, 0, buffer.length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+      try {
+        in.readFully(position, buffer, offset, length);
+      } catch (FileNotFoundException e) {
+        tryOpen().readFully(position, buffer, offset, length);
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        tryOpen().readFully(position, buffer, offset, length);
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        tryOpen().readFully(position, buffer, offset, length);
+      }
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      long skipped;
+
+      try {
+        skipped = in.skip(n);
+      } catch (FileNotFoundException e) {
+        skipped = tryOpen().skip(n);
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        skipped = tryOpen().skip(n);
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        skipped = tryOpen().skip(n);
+      }
+
+      if (skipped > 0) pos += skipped;
+      return skipped;
+    }
+
+    @Override
+    public int available() throws IOException {
+      try {
+        return in.available();
+      } catch (FileNotFoundException e) {
+        return tryOpen().available();
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        return tryOpen().available();
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        return tryOpen().available();
+      }
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      try {
+        in.seek(pos);
+      } catch (FileNotFoundException e) {
+        tryOpen().seek(pos);
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        tryOpen().seek(pos);
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        tryOpen().seek(pos);
+      }
+      this.pos = pos;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return pos;
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      boolean res;
+      try {
+        res = in.seekToNewSource(targetPos);
+      } catch (FileNotFoundException e) {
+        res = tryOpen().seekToNewSource(targetPos);
+      } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
+        res = tryOpen().seekToNewSource(targetPos);
+      } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
+        res = tryOpen().seekToNewSource(targetPos);
+      }
+      if (res) pos = targetPos;
+      return res;
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+      throw new IOException("mark/reset not supported");
+    }
+
+    @Override
+    public boolean markSupported() {
+      return false;
+    }
+
+    /**
+     * Try to open the file from one of the available locations.
+     *
+     * @return FSDataInputStream stream of the opened file link
+     * @throws IOException on unexpected error, or file not found.
+     */
+    private FSDataInputStream tryOpen() throws IOException {
+      for (Path path: fileLink.getLocations()) {
+        if (path.equals(currentPath)) continue;
+        try {
+          in = fs.open(path, bufferSize);
+          in.seek(pos);
+          assert(in.getPos() == pos) : "Link unable to seek to the right position=" + pos;
+          if (LOG.isTraceEnabled()) {
+            if (currentPath != null) {
+              LOG.debug("link open path=" + path);
+            } else {
+              LOG.trace("link switch from path=" + currentPath + " to path=" + path);
+            }
+          }
+          currentPath = path;
+          return(in);
+        } catch (FileNotFoundException e) {
+          // Try another file location
+        }
+      }
+      throw new FileNotFoundException("Unable to open link: " + fileLink);
+    }
+  }
+
+  private Path[] locations = null;
+
+  protected FileLink() {
+    this.locations = null;
+  }
+
+  /**
+   * @param originPath Original location of the file to link
+   * @param alternativePaths Alternative locations to look for the linked file
+   */
+  public FileLink(Path originPath, Path... alternativePaths) {
+    setLocations(originPath, alternativePaths);
+  }
+
+  /**
+   * @param locations locations to look for the linked file
+   */
+  public FileLink(final Collection<Path> locations) {
+    this.locations = locations.toArray(new Path[locations.size()]);
+  }
+
+  /**
+   * @return the locations to look for the linked file.
+   */
+  public Path[] getLocations() {
+    return locations;
+  }
+
+  public String toString() {
+    StringBuilder str = new StringBuilder(getClass().getName());
+    str.append(" locations=[");
+    int i = 0;
+    for (Path location: locations) {
+      if (i++ > 0) str.append(", ");
+      str.append(location.toString());
+    }
+    str.append("]");
+    return str.toString();
+  }
+
+  /**
+   * @return the path of the first available link.
+   */
+  public Path getAvailablePath(FileSystem fs) throws IOException {
+    for (Path path: locations) {
+      if (fs.exists(path)) {
+        return path;
+      }
+    }
+    throw new FileNotFoundException("Unable to open link: " + this);
+  }
+
+  /**
+   * Get the FileStatus of the referenced file.
+   *
+   * @param fs {@link FileSystem} on which to get the file status
+   * @return InputStream for the hfile link.
+   * @throws IOException on unexpected error.
+   */
+  public FileStatus getFileStatus(FileSystem fs) throws IOException {
+    for (Path path: locations) {
+      try {
+        return fs.getFileStatus(path);
+      } catch (FileNotFoundException e) {
+        // Try another file location
+      }
+    }
+    throw new FileNotFoundException("Unable to open link: " + this);
+  }
+
+  /**
+   * Open the FileLink for read.
+   * <p>
+   * It uses a wrapper of FSDataInputStream that is agnostic to the location
+   * of the file, even if the file switches between locations.
+   *
+   * @param fs {@link FileSystem} on which to open the FileLink
+   * @return InputStream for reading the file link.
+   * @throws IOException on unexpected error.
+   */
+  public FSDataInputStream open(final FileSystem fs) throws IOException {
+    return new FSDataInputStream(new FileLinkInputStream(fs, this));
+  }
+
+  /**
+   * Open the FileLink for read.
+   * <p>
+   * It uses a wrapper of FSDataInputStream that is agnostic to the location
+   * of the file, even if the file switches between locations.
+   *
+   * @param fs {@link FileSystem} on which to open the FileLink
+   * @param bufferSize the size of the buffer to be used.
+   * @return InputStream for reading the file link.
+   * @throws IOException on unexpected error.
+   */
+  public FSDataInputStream open(final FileSystem fs, int bufferSize) throws IOException {
+    return new FSDataInputStream(new FileLinkInputStream(fs, this));
+  }
+
+  /**
+   * NOTE: This method must be used only in the constructor!
+   * It creates a List with the specified locations for the link.
+   */
+  protected void setLocations(Path originPath, Path... alternativePaths) {
+    assert this.locations == null : "Link locations already set";
+    this.locations = new Path[1 + alternativePaths.length];
+    this.locations[0] = originPath;
+    for (int i = 0; i < alternativePaths.length; i++) {
+      this.locations[i + 1] = alternativePaths[i];
+    }
+  }
+
+  /**
+   * Get the directory to store the link back references
+   *
+   * <p>To simplify the reference count process, during the FileLink creation
+   * a back-reference is added to the back-reference directory of the specified file.
+   *
+   * @param storeDir Root directory for the link reference folder
+   * @param fileName File Name with links
+   * @return Path for the link back references.
+   */
+  public static Path getBackReferencesDir(final Path storeDir, final String fileName) {
+    return new Path(storeDir, BACK_REFERENCES_DIRECTORY_PREFIX + fileName);
+  }
+
+  /**
+   * Get the referenced file name from the reference link directory path.
+   *
+   * @param dirPath Link references directory path
+   * @return Name of the file referenced
+   */
+  public static String getBackReferenceFileName(final Path dirPath) {
+    return dirPath.getName().substring(BACK_REFERENCES_DIRECTORY_PREFIX.length());
+  }
+
+  /**
+   * Checks if the specified directory path is a back reference links folder.
+   *
+   * @param dirPath Directory path to verify
+   * @return True if the specified directory is a link references folder
+   */
+  public static boolean isBackReferencesDir(final Path dirPath) {
+    if (dirPath == null) return false;
+    return dirPath.getName().startsWith(BACK_REFERENCES_DIRECTORY_PREFIX);
+  }
+}
+

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java?rev=1391326&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HFileLink.java Fri Sep 28 06:27:44 2012
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * HFileLink describes a link to an hfile.
+ *
+ * An hfile can be served from a region or from the hfile archive directory as
+ * specified by {@value HConstants.HFILE_ARCHIVE_DIRECTORY} conf property.
+ * HFileLink allows to access the referenced hfile regardless of the location where it is.
+ *
+ * <p>Searches for hfiles in the following order and locations:
+ * <ul>
+ *  <li>/hbase/table/region/cf/hfile</li>
+ *  <li>/hbase/archive/table/region/cf/hfile</li>
+ * </ul>
+ *
+ * The link checks first in the original path if it is not present
+ * it fallbacks to the archived path.
+ */
+@InterfaceAudience.Private
+public class HFileLink extends FileLink {
+  private static final Log LOG = LogFactory.getLog(HFileLink.class);
+
+  /** Define the HFile Link name pattern in the form of: hfile-region-table */
+  public static final Pattern LINK_NAME_PARSER =
+    Pattern.compile("^([0-9a-f\\.]+)-([0-9a-f]+)-([a-zA-Z_0-9]+[a-zA-Z0-9_\\-\\.]*)$");
+
+  private final Path archivePath;
+  private final Path originPath;
+
+  /**
+   * @param conf {@link Configuration} from which to extract specific archive locations
+   * @param path The path of the HFile Link.
+   * @throws IOException on unexpected error.
+   */
+  public HFileLink(Configuration conf, Path path) throws IOException {
+    this(FSUtils.getRootDir(conf), HFileArchiveUtil.getArchivePath(conf), path);
+  }
+
+  /**
+   * @param rootdir Path to the root directory where hbase files are stored
+   * @param archiveDir Path to the hbase archive directory
+   * @param path The path of the HFile Link.
+   */
+  public HFileLink(final Path rootDir, final Path archiveDir, final Path path) {
+    Path hfilePath = getRelativeTablePath(path);
+    this.originPath = new Path(rootDir, hfilePath);
+    this.archivePath = new Path(archiveDir, hfilePath);
+    setLocations(originPath, archivePath);
+  }
+
+  /**
+   * @param originPath Path to the hfile in the table directory
+   * @param archiveDir Path to the hfile in the archive directory
+   */
+  public HFileLink(final Path originPath, final Path archivePath) {
+    this.originPath = originPath;
+    this.archivePath = archivePath;
+    setLocations(originPath, archivePath);
+  }
+
+  /**
+   * @return the origin path of the hfile.
+   */
+  public Path getOriginPath() {
+    return this.originPath;
+  }
+
+  /**
+   * @return the path of the archived hfile.
+   */
+  public Path getArchivePath() {
+    return this.archivePath;
+  }
+
+  /**
+   * @param p Path to check.
+   * @return True if the path is a HFileLink.
+   */
+  public static boolean isHFileLink(final Path path) {
+    return isHFileLink(path.getName());
+  }
+
+
+  /**
+   * @param fileName File name to check.
+   * @return True if the path is a HFileLink.
+   */
+  public static boolean isHFileLink(String fileName) {
+    Matcher m = LINK_NAME_PARSER.matcher(fileName);
+    if (!m.matches()) return false;
+
+    return m.groupCount() > 2 && m.group(2) != null && m.group(3) != null;
+  }
+
+  /**
+   * The returned path can be the "original" file path like: /hbase/table/region/cf/hfile
+   * or a path to the archived file like: /hbase/archive/table/region/cf/hfile
+   *
+   * @param fs {@link FileSystem} on which to check the HFileLink
+   * @param path HFileLink path
+   * @return Referenced path (original path or archived path)
+   * @throws IOException on unexpected error.
+   */
+  public static Path getReferencedPath(FileSystem fs, final Path path) throws IOException {
+    return getReferencedPath(fs.getConf(), fs, path);
+  }
+
+  /**
+   * The returned path can be the "original" file path like: /hbase/table/region/cf/hfile
+   * or a path to the archived file like: /hbase/archive/table/region/cf/hfile
+   *
+   * @param fs {@link FileSystem} on which to check the HFileLink
+   * @param conf {@link Configuration} from which to extract specific archive locations
+   * @param path HFileLink path
+   * @return Referenced path (original path or archived path)
+   * @throws IOException on unexpected error.
+   */
+  public static Path getReferencedPath(final Configuration conf, final FileSystem fs,
+      final Path path) throws IOException {
+    return getReferencedPath(fs, FSUtils.getRootDir(conf),
+                             HFileArchiveUtil.getArchivePath(conf), path);
+  }
+
+  /**
+   * The returned path can be the "original" file path like: /hbase/table/region/cf/hfile
+   * or a path to the archived file like: /hbase/archive/table/region/cf/hfile
+   *
+   * @param fs {@link FileSystem} on which to check the HFileLink
+   * @param rootdir root hbase directory
+   * @param archiveDir Path to the hbase archive directory
+   * @param path HFileLink path
+   * @return Referenced path (original path or archived path)
+   * @throws IOException on unexpected error.
+   */
+  public static Path getReferencedPath(final FileSystem fs, final Path rootDir,
+      final Path archiveDir, final Path path) throws IOException {
+    Path hfilePath = getRelativeTablePath(path);
+
+    Path originPath = new Path(rootDir, hfilePath);
+    if (fs.exists(originPath)) {
+      return originPath;
+    }
+
+    return new Path(archiveDir, hfilePath);
+  }
+
+  /**
+   * Convert a HFileLink path to a table relative path.
+   * e.g. the link: /hbase/test/0123/cf/abcd-4567-testtb
+   *      becomes: /hbase/testtb/4567/cf/abcd
+   *
+   * @param path HFileLink path
+   * @return Relative table path
+   * @throws IOException on unexpected error.
+   */
+  private static Path getRelativeTablePath(final Path path) {
+    // hfile-region-table
+    Matcher m = LINK_NAME_PARSER.matcher(path.getName());
+    if (!m.matches()) {
+      throw new IllegalArgumentException(path.getName() + " is not a valid HFileLink name!");
+    }
+
+    // Convert the HFileLink name into a real table/region/cf/hfile path.
+    String hfileName = m.group(1);
+    String regionName = m.group(2);
+    String tableName = m.group(3);
+    String familyName = path.getParent().getName();
+    return new Path(new Path(tableName, regionName), new Path(familyName, hfileName));
+  }
+
+  /**
+   * Get the HFile name of the referenced link
+   *
+   * @param fileName HFileLink file name
+   * @return the name of the referenced HFile
+   */
+  public static String getReferencedHFileName(final String fileName) {
+    Matcher m = LINK_NAME_PARSER.matcher(fileName);
+    if (!m.matches()) {
+      throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!");
+    }
+    return(m.group(1));
+  }
+
+  /**
+   * Get the Region name of the referenced link
+   *
+   * @param fileName HFileLink file name
+   * @return the name of the referenced Region
+   */
+  public static String getReferencedRegionName(final String fileName) {
+    Matcher m = LINK_NAME_PARSER.matcher(fileName);
+    if (!m.matches()) {
+      throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!");
+    }
+    return(m.group(2));
+  }
+
+  /**
+   * Get the Table name of the referenced link
+   *
+   * @param fileName HFileLink file name
+   * @return the name of the referenced Table
+   */
+  public static String getReferencedTableName(final String fileName) {
+    Matcher m = LINK_NAME_PARSER.matcher(fileName);
+    if (!m.matches()) {
+      throw new IllegalArgumentException(fileName + " is not a valid HFileLink name!");
+    }
+    return(m.group(3));
+  }
+
+  /**
+   * Create a new HFileLink name
+   *
+   * @param hfileRegionInfo - Linked HFile Region Info
+   * @param hfileName - Linked HFile name
+   * @return file name of the HFile Link
+   */
+  public static String createHFileLinkName(final HRegionInfo hfileRegionInfo,
+      final String hfileName) {
+    return createHFileLinkName(hfileRegionInfo.getTableNameAsString(),
+                      hfileRegionInfo.getEncodedName(), hfileName);
+  }
+
+  /**
+   * Create a new HFileLink name
+   *
+   * @param tableName - Linked HFile table name
+   * @param regionName - Linked HFile region name
+   * @param hfileName - Linked HFile name
+   * @return file name of the HFile Link
+   */
+  public static String createHFileLinkName(final String tableName,
+      final String regionName, final String hfileName) {
+    return String.format("%s-%s-%s", hfileName, regionName, tableName);
+  }
+
+  /**
+   * Create a new HFileLink
+   *
+   * <p>It also add a back-reference to the hfile back-reference directory
+   * to simplify the reference-count and the cleaning process.
+   *
+   * @param conf {@link Configuration} to read for the archive directory name
+   * @param fs {@link FileSystem} on which to write the HFileLink
+   * @param dstFamilyPath - Destination path (table/region/cf/)
+   * @param hfileRegionInfo - Linked HFile Region Info
+   * @param hfileName - Linked HFile name
+   * @return true if the file is created, otherwise the file exists.
+   * @throws IOException on file or parent directory creation failure
+   */
+  public static boolean create(final Configuration conf, final FileSystem fs,
+      final Path dstFamilyPath, final HRegionInfo hfileRegionInfo,
+      final String hfileName) throws IOException {
+    String familyName = dstFamilyPath.getName();
+    String regionName = dstFamilyPath.getParent().getName();
+    String tableName = dstFamilyPath.getParent().getParent().getName();
+
+    String name = createHFileLinkName(hfileRegionInfo, hfileName);
+    String refName = createBackReferenceName(tableName, regionName);
+
+    // Make sure the destination directory exists
+    fs.mkdirs(dstFamilyPath);
+
+    // Make sure the FileLink reference directory exists
+    Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
+          hfileRegionInfo.getTableNameAsString(), hfileRegionInfo.getEncodedName(), familyName);
+    Path backRefssDir = getBackReferencesDir(archiveStoreDir, hfileName);
+    fs.mkdirs(backRefssDir);
+
+    // Create the reference for the link
+    Path backRefPath = new Path(backRefssDir, refName);
+    fs.createNewFile(backRefPath);
+    try {
+      // Create the link
+      return fs.createNewFile(new Path(dstFamilyPath, name));
+    } catch (IOException e) {
+      LOG.error("couldn't create the link=" + name + " for " + dstFamilyPath, e);
+      // Revert the reference if the link creation failed
+      fs.delete(backRefPath, false);
+      throw e;
+    }
+  }
+
+  /**
+   * Create the back reference name
+   */
+  private static String createBackReferenceName(final String tableName, final String regionName) {
+    return regionName + "." + tableName;
+  }
+
+  /**
+   * Get the full path of the HFile referenced by the back reference
+   *
+   * @param rootdir root hbase directory
+   * @param linkRefPath Link Back Reference path
+   * @return full path of the referenced hfile
+   * @throws IOException on unexpected error.
+   */
+  public static Path getHFileFromBackReference(final Path rootDir, final Path linkRefPath) {
+    int separatorIndex = linkRefPath.getName().indexOf('.');
+    String linkRegionName = linkRefPath.getName().substring(0, separatorIndex);
+    String linkTableName = linkRefPath.getName().substring(separatorIndex + 1);
+    String hfileName = getBackReferenceFileName(linkRefPath.getParent());
+    Path familyPath = linkRefPath.getParent().getParent();
+    Path regionPath = familyPath.getParent();
+    Path tablePath = regionPath.getParent();
+
+    String linkName = createHFileLinkName(tablePath.getName(), regionPath.getName(), hfileName);
+    Path linkTableDir = FSUtils.getTablePath(rootDir, linkTableName);
+    Path regionDir = HRegion.getRegionDir(linkTableDir, linkRegionName);
+    return new Path(new Path(regionDir, familyPath.getName()), linkName);
+  }
+
+  /**
+   * Get the full path of the HFile referenced by the back reference
+   *
+   * @param conf {@link Configuration} to read for the archive directory name
+   * @param linkRefPath Link Back Reference path
+   * @return full path of the referenced hfile
+   * @throws IOException on unexpected error.
+   */
+  public static Path getHFileFromBackReference(final Configuration conf, final Path linkRefPath)
+      throws IOException {
+    return getHFileFromBackReference(FSUtils.getRootDir(conf), linkRefPath);
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Fri Sep 28 06:27:44 2012
@@ -591,6 +591,39 @@ public class HFile {
   }
 
   /**
+   * @param fs A file system
+   * @param path Path to HFile
+   * @param fsdis an open checksummed stream of path's file
+   * @param fsdisNoFsChecksum an open unchecksummed stream of path's file
+   * @param size max size of the trailer.
+   * @param cacheConf Cache configuration for hfile's contents
+   * @param preferredEncodingInCache Preferred in-cache data encoding algorithm.
+   * @param closeIStream boolean for closing file after the getting the reader version.
+   * @return A version specific Hfile Reader
+   * @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
+   */
+  public static Reader createReaderWithEncoding(
+      FileSystem fs, Path path, FSDataInputStream fsdis,
+      FSDataInputStream fsdisNoFsChecksum, long size, CacheConfig cacheConf,
+      DataBlockEncoding preferredEncodingInCache, boolean closeIStream)
+      throws IOException {
+    HFileSystem hfs = null;
+
+    // If the fs is not an instance of HFileSystem, then create an
+    // instance of HFileSystem that wraps over the specified fs.
+    // In this case, we will not be able to avoid checksumming inside
+    // the filesystem.
+    if (!(fs instanceof HFileSystem)) {
+      hfs = new HFileSystem(fs);
+    } else {
+      hfs = (HFileSystem)fs;
+    }
+    return pickReaderVersion(path, fsdis, fsdisNoFsChecksum, size,
+                             closeIStream, cacheConf,
+                             preferredEncodingInCache, hfs);
+  }
+
+  /**
    *
    * @param fs filesystem
    * @param path Path to file to read

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java Fri Sep 28 06:27:44 2012
@@ -82,7 +82,10 @@ public abstract class CleanerChore<T ext
     if (logCleaners != null) {
       for (String className : logCleaners) {
         T logCleaner = newFileCleaner(className, conf);
-        if (logCleaner != null) this.cleanersChain.add(logCleaner);
+        if (logCleaner != null) {
+          LOG.debug("initialize cleaner=" + className);
+          this.cleanersChain.add(logCleaner);
+        }
       }
     }
   }
@@ -196,7 +199,7 @@ public abstract class CleanerChore<T ext
    */
   private void checkAndDelete(Path filePath) throws IOException, IllegalArgumentException {
     if (!validate(filePath)) {
-      LOG.warn("Found a wrongly formatted file: " + filePath.getName() + "deleting it.");
+      LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it.");
       if (!this.fs.delete(filePath, true)) {
         LOG.warn("Attempted to delete:" + filePath
             + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java Fri Sep 28 06:27:44 2012
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 /**
  * This Chore, every time it runs, will clear the HFiles in the hfile archive
@@ -46,6 +47,9 @@ public class HFileCleaner extends Cleane
 
   @Override
   protected boolean validate(Path file) {
+    if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
+      return true;
+    }
     return StoreFile.validateStoreFileName(file.getName());
   }
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java?rev=1391326&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java Fri Sep 28 06:27:44 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+
+/**
+ * HFileLink cleaner that determines if a hfile should be deleted.
+ * HFiles can be deleted only if there're no links to them.
+ *
+ * When a HFileLink is created a back reference file is created in:
+ *      /hbase/archive/table/region/cf/.links-hfile/ref-region.ref-table
+ * To check if the hfile can be deleted the back references folder must be empty.
+ */
+@InterfaceAudience.Private
+public class HFileLinkCleaner extends BaseHFileCleanerDelegate {
+  private static final Log LOG = LogFactory.getLog(HFileLinkCleaner.class);
+
+  private FileSystem fs = null;
+
+  @Override
+  public synchronized boolean isFileDeletable(Path filePath) {
+    if (this.fs == null) return false;
+
+    // HFile Link is always deletable
+    if (HFileLink.isHFileLink(filePath)) return true;
+
+    // If the file is inside a link references directory, means that is a back ref link.
+    // The back ref can be deleted only if the referenced file doesn't exists.
+    Path parentDir = filePath.getParent();
+    if (HFileLink.isBackReferencesDir(parentDir)) {
+      try {
+        Path hfilePath = HFileLink.getHFileFromBackReference(getConf(), filePath);
+        return !fs.exists(hfilePath);
+      } catch (IOException e) {
+        LOG.error("Couldn't verify if the referenced file still exists, keep it just in case");
+        return false;
+      }
+    }
+
+    // HFile is deletable only if has no links
+    try {
+      Path backRefDir = HFileLink.getBackReferencesDir(parentDir, filePath.getName());
+      return FSUtils.listStatus(fs, backRefDir) == null;
+    } catch (IOException e) {
+      LOG.error("Couldn't get the references, not deleting file, just in case");
+      return false;
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+
+    // setup filesystem
+    try {
+      this.fs = FileSystem.get(this.getConf());
+    } catch (IOException e) {
+      LOG.error("Couldn't instantiate the file system, not deleting file, just in case");
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Sep 28 06:27:44 2012
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
@@ -328,11 +329,29 @@ public class HStore extends SchemaConfig
    */
   public static Path getStoreHomedir(final Path tabledir,
       final String encodedName, final byte [] family) {
-    return new Path(tabledir, new Path(encodedName,
-      new Path(Bytes.toString(family))));
+    return getStoreHomedir(tabledir, encodedName, Bytes.toString(family));
   }
 
   /**
+   * @param tabledir
+   * @param encodedName Encoded region name.
+   * @param family
+   * @return Path to family/Store home directory.
+   */
+  public static Path getStoreHomedir(final Path tabledir,
+      final String encodedName, final String family) {
+    return new Path(tabledir, new Path(encodedName, new Path(family)));
+  }
+
+  /**
+   * @param parentRegionDirectory directory for the parent region
+   * @param family family name of this store
+   * @return Path to the family/Store home directory
+   */
+  public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) {
+    return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
+  }
+  /**
    * Return the directory in which this store stores its
    * StoreFiles
    */
@@ -383,9 +402,10 @@ public class HStore extends SchemaConfig
         continue;
       }
       final Path p = files[i].getPath();
-      // Check for empty file. Should never be the case but can happen
+      // Check for empty hfile. Should never be the case but can happen
       // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
-      if (this.fs.getFileStatus(p).getLen() <= 0) {
+      // NOTE: that the HFileLink is just a name, so it's an empty file.
+      if (!HFileLink.isHFileLink(p) && this.fs.getFileStatus(p).getLen() <= 0) {
         LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
         continue;
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Sep 28 06:27:44 2012
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +46,8 @@ import org.apache.hadoop.hbase.HDFSBlock
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
@@ -151,6 +154,9 @@ public class StoreFile extends SchemaCon
   // If this StoreFile references another, this is the other files path.
   private Path referencePath;
 
+  // If this storefile is a link to another, this is the link instance.
+  private HFileLink link;
+
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
 
@@ -245,9 +251,14 @@ public class StoreFile extends SchemaCon
     this.dataBlockEncoder =
         dataBlockEncoder == null ? NoOpDataBlockEncoder.INSTANCE
             : dataBlockEncoder;
-    if (isReference(p)) {
+
+    if (HFileLink.isHFileLink(p)) {
+      this.link = new HFileLink(conf, p);
+      LOG.debug("Store file " + p + " is a link");
+    } else if (isReference(p)) {
       this.reference = Reference.read(fs, p);
       this.referencePath = getReferredToFile(this.path);
+      LOG.debug("Store file " + p + " is a reference");
     }
 
     if (BloomFilterFactory.isGeneralBloomEnabled(conf)) {
@@ -292,6 +303,13 @@ public class StoreFile extends SchemaCon
   }
 
   /**
+   * @return <tt>true</tt> if this StoreFile is an HFileLink
+   */
+  boolean isLink() {
+    return this.link != null;
+  }
+
+  /**
    * @param p Path to check.
    * @return True if the path has format of a HStoreFile reference.
    */
@@ -476,6 +494,7 @@ public class StoreFile extends SchemaCon
       Path referencePath = getReferredToFile(p);
       return computeRefFileHDFSBlockDistribution(fs, reference, referencePath);
     } else {
+      if (HFileLink.isHFileLink(p)) p = HFileLink.getReferencedPath(fs, p);
       FileStatus status = fs.getFileStatus(p);
       long length = status.getLen();
       return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length);
@@ -491,7 +510,12 @@ public class StoreFile extends SchemaCon
       this.hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(
         this.fs, this.reference, this.referencePath);
     } else {
-      FileStatus status = this.fs.getFileStatus(this.path);
+      FileStatus status;
+      if (isLink()) {
+        status = link.getFileStatus(fs);
+      } else {
+        status = this.fs.getFileStatus(path);
+      }
       long length = status.getLen();
       this.hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(
         this.fs, status, 0, length);
@@ -512,6 +536,10 @@ public class StoreFile extends SchemaCon
       this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
           this.cacheConf, this.reference,
           dataBlockEncoder.getEncodingInCache());
+    } else if (isLink()) {
+      long size = link.getFileStatus(fs).getLen();
+      this.reader = new Reader(this.fs, this.path, link, size, this.cacheConf,
+                               dataBlockEncoder.getEncodingInCache(), true);
     } else {
       this.reader = new Reader(this.fs, this.path, this.cacheConf,
           dataBlockEncoder.getEncodingInCache());
@@ -888,6 +916,8 @@ public class StoreFile extends SchemaCon
    * @return <tt>true</tt> if the file could be a valid store file, <tt>false</tt> otherwise
    */
   public static boolean validateStoreFileName(String fileName) {
+    if (HFileLink.isHFileLink(fileName))
+      return true;
     return !fileName.contains("-");
   }
 
@@ -1277,6 +1307,23 @@ public class StoreFile extends SchemaCon
       bloomFilterType = BloomType.NONE;
     }
 
+    public Reader(FileSystem fs, Path path, HFileLink hfileLink, long size,
+        CacheConfig cacheConf, DataBlockEncoding preferredEncodingInCache,
+        boolean closeIStream) throws IOException {
+      super(path);
+
+      FSDataInputStream in = hfileLink.open(fs);
+      FSDataInputStream inNoChecksum = in;
+      if (fs instanceof HFileSystem) {
+        FileSystem noChecksumFs = ((HFileSystem)fs).getNoChecksumFs();
+        inNoChecksum = hfileLink.open(noChecksumFs);
+      }
+
+      reader = HFile.createReaderWithEncoding(fs, path, in, inNoChecksum,
+                  size, cacheConf, preferredEncodingInCache, closeIStream);
+      bloomFilterType = BloomType.NONE;
+    }
+
     /**
      * ONLY USE DEFAULT CONSTRUCTOR FOR UNIT TESTS
      */

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Fri Sep 28 06:27:44 2012
@@ -1112,6 +1112,25 @@ public abstract class FSUtils {
   }
 
   /**
+   * Given a particular region dir, return all the familydirs inside it
+   *
+   * @param fs A file system for the Path
+   * @param regionDir Path to a specific region directory
+   * @return List of paths to valid family directories in region dir.
+   * @throws IOException
+   */
+  public static List<Path> getFamilyDirs(final FileSystem fs, final Path regionDir) throws IOException {
+    // assumes we are in a region dir.
+    FileStatus[] fds = fs.listStatus(regionDir, new FamilyDirFilter(fs));
+    List<Path> familyDirs = new ArrayList<Path>(fds.length);
+    for (FileStatus fdfs: fds) {
+      Path fdPath = fdfs.getPath();
+      familyDirs.add(fdPath);
+    }
+    return familyDirs;
+  }
+
+  /**
    * Filter for HFiles that excludes reference files.
    */
   public static class HFileFilter implements PathFilter {
@@ -1209,7 +1228,7 @@ public abstract class FSUtils {
   
   /**
    * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
-   * This would accommodate difference in various hadoop versions
+   * This accommodates differences between hadoop versions
    * 
    * @param fs file system
    * @param dir directory
@@ -1228,7 +1247,19 @@ public abstract class FSUtils {
     if (status == null || status.length < 1) return null;
     return status;
   }
-  
+
+  /**
+   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
+   * This would accommodates differences between hadoop versions
+   *
+   * @param fs file system
+   * @param dir directory
+   * @return null if tabledir doesn't exist, otherwise FileStatus array
+   */
+  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
+    return listStatus(fs, dir, null);
+  }
+
   /**
    * Calls fs.delete() and returns the value returned by the fs.delete()
    * 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileArchiveUtil.java Fri Sep 28 06:27:44 2012
@@ -42,6 +42,21 @@ public class HFileArchiveUtil {
   /**
    * Get the directory to archive a store directory
    * @param conf {@link Configuration} to read for the archive directory name
+   * @param tableName table name under which the store currently lives
+   * @param regionName region encoded name under which the store currently lives
+   * @param family name of the family in the store
+   * @return {@link Path} to the directory to archive the given store or
+   *         <tt>null</tt> if it should not be archived
+   */
+  public static Path getStoreArchivePath(final Configuration conf, final String tableName,
+      final String regionName, final String familyName) throws IOException {
+    Path tableArchiveDir = getTableArchivePath(conf, tableName);
+    return HStore.getStoreHomedir(tableArchiveDir, regionName, familyName);
+  }
+
+  /**
+   * Get the directory to archive a store directory
+   * @param conf {@link Configuration} to read for the archive directory name
    * @param region parent region information under which the store currently
    *          lives
    * @param family name of the family in the store
@@ -104,6 +119,19 @@ public class HFileArchiveUtil {
   }
 
   /**
+   * Get the path to the table archive directory based on the configured archive directory.
+   * <p>
+   * Assumed that the table should already be archived.
+   * @param conf {@link Configuration} to read the archive directory property. Can be null
+   * @param tableName Name of the table to be archived. Cannot be null.
+   * @return {@link Path} to the archive directory for the table
+   */
+  public static Path getTableArchivePath(final Configuration conf, final String tableName)
+      throws IOException {
+    return new Path(getArchivePath(conf), tableName);
+  }
+
+  /**
    * Get the archive directory as per the configuration
    * @param conf {@link Configuration} to read the archive directory from (can be null, in which
    *          case you get the default value). Can be null.

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java?rev=1391326&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java Fri Sep 28 06:27:44 2012
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import junit.framework.TestCase;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.io.FileLink;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test that FileLink switches between alternate locations
+ * when the current location moves or gets deleted.
+ */
+@Category(MediumTests.class)
+public class TestFileLink {
+  /**
+   * Test, on HDFS, that the FileLink is still readable
+   * even when the current file gets renamed.
+   */
+  @Test
+  public void testHDFSLinkReadDuringRename() throws Exception {
+    HBaseTestingUtility testUtil = new HBaseTestingUtility();
+    Configuration conf = testUtil.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+    testUtil.startMiniDFSCluster(1);
+    MiniDFSCluster cluster = testUtil.getDFSCluster();
+    FileSystem fs = cluster.getFileSystem();
+    assertEquals("hdfs", fs.getUri().getScheme());
+
+    try {
+      testLinkReadDuringRename(fs, testUtil.getDefaultRootDirPath());
+    } finally {
+      testUtil.shutdownMiniCluster();
+    }
+  }
+
+  /**
+   * Test, on a local filesystem, that the FileLink is still readable
+   * even when the current file gets renamed.
+   */
+  @Test
+  public void testLocalLinkReadDuringRename() throws IOException {
+    HBaseTestingUtility testUtil = new HBaseTestingUtility();
+    FileSystem fs = testUtil.getTestFileSystem();
+    assertEquals("file", fs.getUri().getScheme());
+    testLinkReadDuringRename(fs, testUtil.getDataTestDir());
+  }
+
+  /**
+   * Test that link is still readable even when the current file gets renamed.
+   */
+  private void testLinkReadDuringRename(FileSystem fs, Path rootDir) throws IOException {
+    Path originalPath = new Path(rootDir, "test.file");
+    Path archivedPath = new Path(rootDir, "archived.file");
+
+    writeSomeData(fs, originalPath, 256 << 20, (byte)2);
+
+    List<Path> files = new ArrayList<Path>();
+    files.add(originalPath);
+    files.add(archivedPath);
+
+    FileLink link = new FileLink(files);
+    FSDataInputStream in = link.open(fs);
+    try {
+      byte[] data = new byte[8192];
+      long size = 0;
+
+      // Read from origin
+      int n = in.read(data);
+      dataVerify(data, n, (byte)2);
+      size += n;
+
+      // Move origin to archive
+      assertFalse(fs.exists(archivedPath));
+      fs.rename(originalPath, archivedPath);
+      assertFalse(fs.exists(originalPath));
+      assertTrue(fs.exists(archivedPath));
+
+      // Try to read to the end
+      while ((n = in.read(data)) > 0) {
+        dataVerify(data, n, (byte)2);
+        size += n;
+      }
+
+      assertEquals(256 << 20, size);
+    } finally {
+      in.close();
+      if (fs.exists(originalPath)) fs.delete(originalPath);
+      if (fs.exists(archivedPath)) fs.delete(archivedPath);
+    }
+  }
+
+  /**
+   * Test that link is still readable even when the current file gets deleted.
+   *
+   * NOTE: This test is valid only on HDFS.
+   * When a file is deleted from a local file-system, it is simply 'unlinked'.
+   * The inode, which contains the file's data, is not deleted until all
+   * processes have finished with it.
+   * In HDFS when the request exceed the cached block locations,
+   * a query to the namenode is performed, using the filename,
+   * and the deleted file doesn't exists anymore (FileNotFoundException).
+   */
+  @Test
+  public void testHDFSLinkReadDuringDelete() throws Exception {
+    HBaseTestingUtility testUtil = new HBaseTestingUtility();
+    Configuration conf = testUtil.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+    testUtil.startMiniDFSCluster(1);
+    MiniDFSCluster cluster = testUtil.getDFSCluster();
+    FileSystem fs = cluster.getFileSystem();
+    assertEquals("hdfs", fs.getUri().getScheme());
+
+    try {
+      List<Path> files = new ArrayList<Path>();
+      for (int i = 0; i < 3; i++) {
+        Path path = new Path(String.format("test-data-%d", i));
+        writeSomeData(fs, path, 1 << 20, (byte)i);
+        files.add(path);
+      }
+
+      FileLink link = new FileLink(files);
+      FSDataInputStream in = link.open(fs);
+      try {
+        byte[] data = new byte[8192];
+        int n;
+
+        // Switch to file 1
+        n = in.read(data);
+        dataVerify(data, n, (byte)0);
+        fs.delete(files.get(0));
+        skipBuffer(in, (byte)0);
+
+        // Switch to file 2
+        n = in.read(data);
+        dataVerify(data, n, (byte)1);
+        fs.delete(files.get(1));
+        skipBuffer(in, (byte)1);
+
+        // Switch to file 3
+        n = in.read(data);
+        dataVerify(data, n, (byte)2);
+        fs.delete(files.get(2));
+        skipBuffer(in, (byte)2);
+
+        // No more files available
+        try {
+          n = in.read(data);
+          assert(n <= 0);
+        } catch (FileNotFoundException e) {
+          assertTrue(true);
+        }
+      } finally {
+        in.close();
+      }
+    } finally {
+      testUtil.shutdownMiniCluster();
+    }
+  }
+
+  /**
+   * Write up to 'size' bytes with value 'v' into a new file called 'path'.
+   */
+  private void writeSomeData (FileSystem fs, Path path, long size, byte v) throws IOException {
+    byte[] data = new byte[4096];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = v;
+    }
+
+    FSDataOutputStream stream = fs.create(path);
+    try {
+      long written = 0;
+      while (written < size) {
+        stream.write(data, 0, data.length);
+        written += data.length;
+      }
+    } finally {
+      stream.close();
+    }
+  }
+
+  /**
+   * Verify that all bytes in 'data' have 'v' as value.
+   */
+  private static void dataVerify(byte[] data, int n, byte v) {
+    for (int i = 0; i < n; ++i) {
+      assertEquals(v, data[i]);
+    }
+  }
+
+  private static void skipBuffer(FSDataInputStream in, byte v) throws IOException {
+    byte[] data = new byte[8192];
+    try {
+      int n;
+      while ((n = in.read(data)) == data.length) {
+        for (int i = 0; i < data.length; ++i) {
+          if (data[i] != v)
+            throw new Exception("File changed");
+        }
+      }
+    } catch (Exception e) {
+    }
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java?rev=1391326&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java Fri Sep 28 06:27:44 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileArchiveUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test the HFileLink Cleaner.
+ * HFiles with links cannot be deleted until a link is present.
+ */
+@Category(SmallTests.class)
+public class TestHFileLinkCleaner {
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @Test
+  public void testHFileLinkCleaning() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(HConstants.HBASE_DIR, TEST_UTIL.getDataTestDir().toString());
+    conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
+             "org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner," +
+             "org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner");
+    Path rootDir = FSUtils.getRootDir(conf);
+    FileSystem fs = FileSystem.get(conf);
+
+    final String tableName = "test-table";
+    final String tableLinkName = "test-link";
+    final String hfileName = "1234567890";
+    final String familyName = "cf";
+
+    HRegionInfo hri = new HRegionInfo(Bytes.toBytes(tableName));
+    HRegionInfo hriLink = new HRegionInfo(Bytes.toBytes(tableLinkName));
+
+    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
+    Path archiveStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
+          tableName, hri.getEncodedName(), familyName);
+    Path archiveLinkStoreDir = HFileArchiveUtil.getStoreArchivePath(conf,
+          tableLinkName, hriLink.getEncodedName(), familyName);
+
+    // Create hfile /hbase/table-link/region/cf/getEncodedName.HFILE(conf);
+    Path familyPath = getFamilyDirPath(archiveDir, tableName, hri.getEncodedName(), familyName);
+    fs.mkdirs(familyPath);
+    Path hfilePath = new Path(familyPath, hfileName);
+    fs.createNewFile(hfilePath);
+
+    // Create link to hfile
+    Path familyLinkPath = getFamilyDirPath(rootDir, tableLinkName,
+                                        hriLink.getEncodedName(), familyName);
+    fs.mkdirs(familyLinkPath);
+    HFileLink.create(conf, fs, familyLinkPath, hri, hfileName);
+    Path linkBackRefDir = HFileLink.getBackReferencesDir(archiveStoreDir, hfileName);
+    assertTrue(fs.exists(linkBackRefDir));
+    FileStatus[] backRefs = fs.listStatus(linkBackRefDir);
+    assertEquals(1, backRefs.length);
+    Path linkBackRef = backRefs[0].getPath();
+
+    // Initialize cleaner
+    final long ttl = 1000;
+    conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
+    Server server = new DummyServer();
+    HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir);
+
+    // Link backref cannot be removed
+    Thread.sleep(ttl * 2);
+    cleaner.chore();
+    assertTrue(fs.exists(linkBackRef));
+    assertTrue(fs.exists(hfilePath));
+
+    // Link backref can be removed
+    fs.rename(new Path(rootDir, tableLinkName), new Path(archiveDir, tableLinkName));
+    Thread.sleep(ttl * 2);
+    cleaner.chore();
+    assertFalse("Link should be deleted", fs.exists(linkBackRef));
+
+    // HFile can be removed
+    Thread.sleep(ttl * 2);
+    cleaner.chore();
+    assertFalse("HFile should be deleted", fs.exists(hfilePath));
+
+    // Remove everything
+    for (int i = 0; i < 4; ++i) {
+      Thread.sleep(ttl * 2);
+      cleaner.chore();
+    }
+    assertFalse("HFile should be deleted", fs.exists(new Path(archiveDir, tableName)));
+    assertFalse("Link should be deleted", fs.exists(new Path(archiveDir, tableLinkName)));
+
+    cleaner.interrupt();
+  }
+
+  private static Path getFamilyDirPath (final Path rootDir, final String table,
+    final String region, final String family) {
+    return new Path(new Path(new Path(rootDir, table), region), family);
+  }
+
+  static class DummyServer implements Server {
+
+    @Override
+    public Configuration getConfiguration() {
+      return TEST_UTIL.getConfiguration();
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      try {
+        return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      return null;
+    }
+
+    @Override
+    public CatalogTracker getCatalogTracker() {
+      return null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return new ServerName("regionserver,60020,000000");
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {}
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+
+    @Override
+    public void stop(String why) {}
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java?rev=1391326&r1=1391325&r2=1391326&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java Fri Sep 28 06:27:44 2012
@@ -35,9 +35,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -52,6 +54,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
@@ -171,6 +174,40 @@ public class TestStoreFile extends HBase
     assertTrue(Bytes.equals(kv.getRow(), finalRow));
   }
 
+  public void testHFileLink() throws IOException {
+    HRegionInfo hri = new HRegionInfo(Bytes.toBytes("table-link"));
+    Path storedir = new Path(new Path(FSUtils.getRootDir(conf),
+                             new Path(hri.getTableNameAsString(), hri.getEncodedName())), "cf");
+
+    // Make a store file and write data to it.
+    StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf,
+         this.fs, 8 * 1024)
+            .withOutputDir(storedir)
+            .build();
+    Path storeFilePath = writer.getPath();
+    writeStoreFile(writer);
+    writer.close();
+
+    Path dstPath = new Path(FSUtils.getRootDir(conf), new Path("test-region", "cf"));
+    HFileLink.create(conf, this.fs, dstPath, hri, storeFilePath.getName());
+    Path linkFilePath = new Path(dstPath,
+                  HFileLink.createHFileLinkName(hri, storeFilePath.getName()));
+
+    // Try to open store file from link
+    StoreFile hsf = new StoreFile(this.fs, linkFilePath, conf, cacheConf,
+        StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
+    assertTrue(hsf.isLink());
+
+    // Now confirm that I can read from the link
+    int count = 1;
+    HFileScanner s = hsf.createReader().getScanner(false, false);
+    s.seekTo();
+    while (s.next()) {
+      count++;
+    }
+    assertEquals((LAST_CHAR - FIRST_CHAR + 1) * (LAST_CHAR - FIRST_CHAR + 1), count);
+  }
+
   private void checkHalfHFile(final StoreFile f)
   throws IOException {
     byte [] midkey = f.createReader().midkey();