You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ac...@apache.org on 2019/05/08 23:39:40 UTC

[hbase] branch branch-1.3 updated: HBASE-22330 Backport HBASE-20724 (Sometimes some compacted storefiles are still opened after region failover) to branch-1

This is an automated email from the ASF dual-hosted git repository.

achouhan pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.3 by this push:
     new 074ebb8  HBASE-22330 Backport HBASE-20724 (Sometimes some compacted storefiles are still opened after region failover) to branch-1
074ebb8 is described below

commit 074ebb856675d8bdd389e62774e2572e84d835a3
Author: Abhishek Singh Chouhan <ac...@apache.org>
AuthorDate: Tue May 7 14:53:12 2019 -0700

    HBASE-22330 Backport HBASE-20724 (Sometimes some compacted storefiles are still opened after region failover) to branch-1
---
 .../apache/hadoop/hbase/protobuf/ProtobufUtil.java |  31 ++
 .../hbase/protobuf/generated/HFileProtos.java      | 543 ++++++++++++++++++++-
 hbase-protocol/src/main/protobuf/HFile.proto       |   4 +
 .../regionserver/AbstractMultiFileWriter.java      |  12 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  61 +--
 .../hadoop/hbase/regionserver/StoreFile.java       |  92 +++-
 .../compactions/DateTieredCompactor.java           |   2 +-
 .../regionserver/compactions/DefaultCompactor.java |   2 +-
 .../regionserver/compactions/StripeCompactor.java  |   2 +-
 .../TestCleanupCompactedFileAfterFailover.java     | 193 ++++++++
 .../TestCleanupCompactedFileOnRegionClose.java     |  49 --
 .../regionserver/compactions/TestCompactor.java    |   8 +
 12 files changed, 882 insertions(+), 117 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 98ee13e..5abc1b7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -34,11 +34,14 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -119,6 +122,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
@@ -3430,4 +3434,31 @@ public final class ProtobufUtil {
     return new TimeRange(minStamp, maxStamp);
   }
 
+  public static byte[] toCompactionEventTrackerBytes(Set<String> storeFiles) {
+    HFileProtos.CompactionEventTracker.Builder builder =
+        HFileProtos.CompactionEventTracker.newBuilder();
+    for (String sf : storeFiles) {
+      builder.addCompactedStoreFile(ByteString.copyFromUtf8(sf));
+    }
+    return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+  }
+
+  public static Set<String> toCompactedStoreFiles(byte[] bytes) throws IOException {
+    if (bytes != null && ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pbLen = ProtobufUtil.lengthOfPBMagic();
+      HFileProtos.CompactionEventTracker.Builder builder =
+          HFileProtos.CompactionEventTracker.newBuilder();
+      ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
+      HFileProtos.CompactionEventTracker compactionEventTracker = builder.build();
+      List<ByteString> compactedStoreFiles = compactionEventTracker.getCompactedStoreFileList();
+      if (compactedStoreFiles != null && compactedStoreFiles.size() != 0) {
+        Set<String> compactedStoreFileSet = new HashSet<>();
+        for (ByteString sf : compactedStoreFiles) {
+          compactedStoreFileSet.add(sf.toStringUtf8());
+        }
+        return compactedStoreFileSet;
+      }
+    }
+    return Collections.emptySet();
+  }
 }
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
index 5b6f2f4..7f146b7 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
@@ -8,6 +8,503 @@ public final class HFileProtos {
   public static void registerAllExtensions(
       com.google.protobuf.ExtensionRegistry registry) {
   }
+  public interface CompactionEventTrackerOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // repeated bytes compacted_store_file = 1;
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    java.util.List<com.google.protobuf.ByteString> getCompactedStoreFileList();
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    int getCompactedStoreFileCount();
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    com.google.protobuf.ByteString getCompactedStoreFile(int index);
+  }
+  /**
+   * Protobuf type {@code hbase.pb.CompactionEventTracker}
+   */
+  public static final class CompactionEventTracker extends
+      com.google.protobuf.GeneratedMessage
+      implements CompactionEventTrackerOrBuilder {
+    // Use CompactionEventTracker.newBuilder() to construct.
+    private CompactionEventTracker(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private CompactionEventTracker(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final CompactionEventTracker defaultInstance;
+    public static CompactionEventTracker getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public CompactionEventTracker getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private CompactionEventTracker(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+                compactedStoreFile_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
+                mutable_bitField0_ |= 0x00000001;
+              }
+              compactedStoreFile_.add(input.readBytes());
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+          compactedStoreFile_ = java.util.Collections.unmodifiableList(compactedStoreFile_);
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.class, org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<CompactionEventTracker> PARSER =
+        new com.google.protobuf.AbstractParser<CompactionEventTracker>() {
+      public CompactionEventTracker parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new CompactionEventTracker(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<CompactionEventTracker> getParserForType() {
+      return PARSER;
+    }
+
+    // repeated bytes compacted_store_file = 1;
+    public static final int COMPACTED_STORE_FILE_FIELD_NUMBER = 1;
+    private java.util.List<com.google.protobuf.ByteString> compactedStoreFile_;
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    public java.util.List<com.google.protobuf.ByteString>
+        getCompactedStoreFileList() {
+      return compactedStoreFile_;
+    }
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    public int getCompactedStoreFileCount() {
+      return compactedStoreFile_.size();
+    }
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    public com.google.protobuf.ByteString getCompactedStoreFile(int index) {
+      return compactedStoreFile_.get(index);
+    }
+
+    private void initFields() {
+      compactedStoreFile_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      for (int i = 0; i < compactedStoreFile_.size(); i++) {
+        output.writeBytes(1, compactedStoreFile_.get(i));
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      {
+        int dataSize = 0;
+        for (int i = 0; i < compactedStoreFile_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(compactedStoreFile_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getCompactedStoreFileList().size();
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker other = (org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) obj;
+
+      boolean result = true;
+      result = result && getCompactedStoreFileList()
+          .equals(other.getCompactedStoreFileList());
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (getCompactedStoreFileCount() > 0) {
+        hash = (37 * hash) + COMPACTED_STORE_FILE_FIELD_NUMBER;
+        hash = (53 * hash) + getCompactedStoreFileList().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code hbase.pb.CompactionEventTracker}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTrackerOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.class, org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.Builder.class);
+      }
+
+      // Construct using org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        compactedStoreFile_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker getDefaultInstanceForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.getDefaultInstance();
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker build() {
+        org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker buildPartial() {
+        org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker result = new org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker(this);
+        int from_bitField0_ = bitField0_;
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          compactedStoreFile_ = java.util.Collections.unmodifiableList(compactedStoreFile_);
+          bitField0_ = (bitField0_ & ~0x00000001);
+        }
+        result.compactedStoreFile_ = compactedStoreFile_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) {
+          return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker other) {
+        if (other == org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.getDefaultInstance()) return this;
+        if (!other.compactedStoreFile_.isEmpty()) {
+          if (compactedStoreFile_.isEmpty()) {
+            compactedStoreFile_ = other.compactedStoreFile_;
+            bitField0_ = (bitField0_ & ~0x00000001);
+          } else {
+            ensureCompactedStoreFileIsMutable();
+            compactedStoreFile_.addAll(other.compactedStoreFile_);
+          }
+          onChanged();
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // repeated bytes compacted_store_file = 1;
+      private java.util.List<com.google.protobuf.ByteString> compactedStoreFile_ = java.util.Collections.emptyList();
+      private void ensureCompactedStoreFileIsMutable() {
+        if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+          compactedStoreFile_ = new java.util.ArrayList<com.google.protobuf.ByteString>(compactedStoreFile_);
+          bitField0_ |= 0x00000001;
+         }
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public java.util.List<com.google.protobuf.ByteString>
+          getCompactedStoreFileList() {
+        return java.util.Collections.unmodifiableList(compactedStoreFile_);
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public int getCompactedStoreFileCount() {
+        return compactedStoreFile_.size();
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public com.google.protobuf.ByteString getCompactedStoreFile(int index) {
+        return compactedStoreFile_.get(index);
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder setCompactedStoreFile(
+          int index, com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureCompactedStoreFileIsMutable();
+        compactedStoreFile_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder addCompactedStoreFile(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureCompactedStoreFileIsMutable();
+        compactedStoreFile_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder addAllCompactedStoreFile(
+          java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
+        ensureCompactedStoreFileIsMutable();
+        super.addAll(values, compactedStoreFile_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder clearCompactedStoreFile() {
+        compactedStoreFile_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:hbase.pb.CompactionEventTracker)
+    }
+
+    static {
+      defaultInstance = new CompactionEventTracker(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:hbase.pb.CompactionEventTracker)
+  }
+
   public interface FileInfoProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
@@ -2338,6 +2835,11 @@ public final class HFileProtos {
   }
 
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_hbase_pb_CompactionEventTracker_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_hbase_pb_FileInfoProto_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -2356,35 +2858,42 @@ public final class HFileProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\013HFile.proto\022\010hbase.pb\032\013HBase.proto\"<\n\r" +
-      "FileInfoProto\022+\n\tmap_entry\030\001 \003(\0132\030.hbase" +
-      ".pb.BytesBytesPair\"\221\003\n\020FileTrailerProto\022" +
-      "\030\n\020file_info_offset\030\001 \001(\004\022 \n\030load_on_ope" +
-      "n_data_offset\030\002 \001(\004\022$\n\034uncompressed_data" +
-      "_index_size\030\003 \001(\004\022 \n\030total_uncompressed_" +
-      "bytes\030\004 \001(\004\022\030\n\020data_index_count\030\005 \001(\r\022\030\n" +
-      "\020meta_index_count\030\006 \001(\r\022\023\n\013entry_count\030\007" +
-      " \001(\004\022\035\n\025num_data_index_levels\030\010 \001(\r\022\037\n\027f" +
-      "irst_data_block_offset\030\t \001(\004\022\036\n\026last_dat",
-      "a_block_offset\030\n \001(\004\022\035\n\025comparator_class" +
-      "_name\030\013 \001(\t\022\031\n\021compression_codec\030\014 \001(\r\022\026" +
-      "\n\016encryption_key\030\r \001(\014BA\n*org.apache.had" +
-      "oop.hbase.protobuf.generatedB\013HFileProto" +
-      "sH\001\210\001\001\240\001\001"
+      "\n\013HFile.proto\022\010hbase.pb\032\013HBase.proto\"6\n\026" +
+      "CompactionEventTracker\022\034\n\024compacted_stor" +
+      "e_file\030\001 \003(\014\"<\n\rFileInfoProto\022+\n\tmap_ent" +
+      "ry\030\001 \003(\0132\030.hbase.pb.BytesBytesPair\"\221\003\n\020F" +
+      "ileTrailerProto\022\030\n\020file_info_offset\030\001 \001(" +
+      "\004\022 \n\030load_on_open_data_offset\030\002 \001(\004\022$\n\034u" +
+      "ncompressed_data_index_size\030\003 \001(\004\022 \n\030tot" +
+      "al_uncompressed_bytes\030\004 \001(\004\022\030\n\020data_inde" +
+      "x_count\030\005 \001(\r\022\030\n\020meta_index_count\030\006 \001(\r\022" +
+      "\023\n\013entry_count\030\007 \001(\004\022\035\n\025num_data_index_l",
+      "evels\030\010 \001(\r\022\037\n\027first_data_block_offset\030\t" +
+      " \001(\004\022\036\n\026last_data_block_offset\030\n \001(\004\022\035\n\025" +
+      "comparator_class_name\030\013 \001(\t\022\031\n\021compressi" +
+      "on_codec\030\014 \001(\r\022\026\n\016encryption_key\030\r \001(\014BA" +
+      "\n*org.apache.hadoop.hbase.protobuf.gener" +
+      "atedB\013HFileProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
         public com.google.protobuf.ExtensionRegistry assignDescriptors(
             com.google.protobuf.Descriptors.FileDescriptor root) {
           descriptor = root;
-          internal_static_hbase_pb_FileInfoProto_descriptor =
+          internal_static_hbase_pb_CompactionEventTracker_descriptor =
             getDescriptor().getMessageTypes().get(0);
+          internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_hbase_pb_CompactionEventTracker_descriptor,
+              new java.lang.String[] { "CompactedStoreFile", });
+          internal_static_hbase_pb_FileInfoProto_descriptor =
+            getDescriptor().getMessageTypes().get(1);
           internal_static_hbase_pb_FileInfoProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_FileInfoProto_descriptor,
               new java.lang.String[] { "MapEntry", });
           internal_static_hbase_pb_FileTrailerProto_descriptor =
-            getDescriptor().getMessageTypes().get(1);
+            getDescriptor().getMessageTypes().get(2);
           internal_static_hbase_pb_FileTrailerProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_FileTrailerProto_descriptor,
diff --git a/hbase-protocol/src/main/protobuf/HFile.proto b/hbase-protocol/src/main/protobuf/HFile.proto
index 5c5e4f3..6988331 100644
--- a/hbase-protocol/src/main/protobuf/HFile.proto
+++ b/hbase-protocol/src/main/protobuf/HFile.proto
@@ -26,6 +26,10 @@ option optimize_for = SPEED;
 
 import "HBase.proto";
 
+message CompactionEventTracker {
+  repeated bytes compacted_store_file = 1;
+}
+
 // Map of name/values
 message FileInfoProto {
   repeated BytesBytesPair map_entry = 1;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index 4987c59..eade5ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -65,18 +66,23 @@ public abstract class AbstractMultiFileWriter implements CellSink {
    * comments in HBASE-15400 for more details.
    */
   public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
+    return commitWriters(maxSeqId, majorCompaction, Collections.<StoreFile>emptySet());
+  }
+
+  public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
+      Collection<StoreFile> storeFiles) throws IOException {
     preCommitWriters();
     Collection<StoreFile.Writer> writers = this.writers();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
-          + ", majorCompaction=" + majorCompaction);
+      LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId + ", majorCompaction="
+          + majorCompaction);
     }
     List<Path> paths = new ArrayList<Path>();
     for (Writer writer : writers) {
       if (writer == null) {
         continue;
       }
-      writer.appendMetadata(maxSeqId, majorCompaction);
+      writer.appendMetadata(maxSeqId, majorCompaction, storeFiles);
       preCloseWriter(writer);
       paths.add(writer.getPath());
       writer.close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index b5bb92a..42930cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -553,6 +553,7 @@ public class HStore implements Store {
       totalValidStoreFile++;
     }
 
+    Set<String> compactedStoreFiles = new HashSet<>();
     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
     IOException ioe = null;
     try {
@@ -565,6 +566,7 @@ public class HStore implements Store {
               LOG.debug("loaded " + storeFile.toStringDetailed());
             }
             results.add(storeFile);
+            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
           }
         } catch (InterruptedException e) {
           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
@@ -589,6 +591,21 @@ public class HStore implements Store {
       throw ioe;
     }
 
+    // Remove the compacted files from result
+    List<StoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
+    for (StoreFile storeFile : results) {
+      if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
+        LOG.warn("Clearing the compacted storefile " + storeFile + " from this store");
+        storeFile.getReader().close(true);
+        filesToRemove.add(storeFile);
+      }
+    }
+    results.removeAll(filesToRemove);
+    if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
+      LOG.debug("Moving the files " + filesToRemove + " to archive");
+      this.fs.removeStoreFiles(this.getColumnFamilyName(), filesToRemove);
+    }
+
     return results;
   }
 
@@ -874,7 +891,7 @@ public class HStore implements Store {
           storeEngine.getStoreFileManager().clearCompactedFiles();
       // clear the compacted files
       if (compactedfiles != null && !compactedfiles.isEmpty()) {
-        removeCompactedfiles(compactedfiles, true);
+        removeCompactedfiles(compactedfiles);
       }
       if (!result.isEmpty()) {
         // initialize the thread pool for closing store files in parallel.
@@ -1082,7 +1099,8 @@ public class HStore implements Store {
             .withMaxKeyCount(maxKeyCount)
             .withFavoredNodes(favoredNodes)
             .withFileContext(hFileContext)
-            .withShouldDropCacheBehind(shouldDropBehind);
+            .withShouldDropCacheBehind(shouldDropBehind)
+            .withCompactedFiles(this.getCompactedfiles());
     if (trt != null) {
       builder.withTimeRangeTracker(trt);
     }
@@ -2731,11 +2749,6 @@ public class HStore implements Store {
 
   @Override
   public synchronized void closeAndArchiveCompactedFiles() throws IOException {
-    closeAndArchiveCompactedFiles(false);
-  }
-
-  @VisibleForTesting
-  public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException {
     // ensure other threads do not attempt to archive the same files on close()
     archiveLock.lock();
     try {
@@ -2756,7 +2769,7 @@ public class HStore implements Store {
         lock.readLock().unlock();
       }
       if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
-        removeCompactedfiles(copyCompactedfiles, storeClosing);
+        removeCompactedfiles(copyCompactedfiles);
       }
     } finally {
       archiveLock.unlock();
@@ -2768,7 +2781,7 @@ public class HStore implements Store {
    * @param compactedfiles The compacted files in this store that are not active in reads
    * @throws IOException
    */
-  private void removeCompactedfiles(Collection<StoreFile> compactedfiles, boolean storeClosing)
+  private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
       throws IOException {
     final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
     for (final StoreFile file : compactedfiles) {
@@ -2776,16 +2789,6 @@ public class HStore implements Store {
         try {
           StoreFile.Reader r = file.getReader();
 
-          //Compacted files in the list should always be marked compacted away. In the event
-          //they're contradicting in order to guarantee data consistency
-          //should we choose one and ignore the other?
-          if (storeClosing && r != null && !r.isCompactedAway()) {
-            String msg =
-                "Region closing but StoreFile is in compacted list but not compacted away: " +
-                    file.getPath();
-            throw new IllegalStateException(msg);
-          }
-
           if (r == null) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("The file " + file + " was closed but still not archived.");
@@ -2793,13 +2796,7 @@ public class HStore implements Store {
             filesToRemove.add(file);
           }
 
-          //If store is closing we're ignoring any references to keep things consistent
-          //and remove compacted storefiles from the region directory
-          if (r != null && file.isCompactedAway() && (!r.isReferencedInReads() || storeClosing)) {
-            if (storeClosing && r.isReferencedInReads()) {
-              LOG.warn("Region closing but StoreFile still has references: file=" +
-                  file.getPath() + ", refCount=" + r.getRefCount());
-            }
+          if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
             // Even if deleting fails we need not bother as any new scanners won't be
             // able to use the compacted file as the status is already compactedAway
             if (LOG.isTraceEnabled()) {
@@ -2815,16 +2812,8 @@ public class HStore implements Store {
                 + ", refCount=" + r.getRefCount() + ", skipping for now.");
           }
         } catch (Exception e) {
-          String msg = "Exception while trying to close the compacted store file " +
-              file.getPath();
-          if (storeClosing) {
-            msg = "Store is closing. " + msg;
-          }
-          LOG.error(msg, e);
-          //if we get an exception let caller know so it can abort the server
-          if (storeClosing) {
-            throw new IOException(msg, e);
-          }
+          LOG.error(
+            "Exception while trying to close the compacted store file " + file.getPath().getName());
         }
       }
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index a7f55f3..b9201e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -26,7 +26,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -114,6 +117,11 @@ public class StoreFile {
   /** Key for timestamp of earliest-put in metadata*/
   public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
 
+  /**
+   * Key for compaction event which contains the compacted storefiles in FileInfo
+   */
+  public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
+
   private final StoreFileInfo fileInfo;
   private final FileSystem fs;
 
@@ -167,6 +175,9 @@ public class StoreFile {
   // It's set whenever you get a Reader.
   private boolean excludeFromMinorCompaction = false;
 
+  // This file was product of these compacted store files
+  private final Set<String> compactedStoreFiles = new HashSet<>();
+
   /** Meta key set when store file is a result of a bulk load */
   public static final byte[] BULKLOAD_TASK_KEY =
     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
@@ -507,6 +518,14 @@ public class StoreFile {
           "proceeding without", e);
       this.reader.timeRange = null;
     }
+
+    try {
+      byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
+      this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
+    } catch (IOException e) {
+      LOG.error("Error reading compacted storefiles from meta data", e);
+    }
+
     // initialize so we can reuse them after reader closed.
     firstKey = reader.getFirstKey();
     lastKey = reader.getLastKey();
@@ -619,6 +638,7 @@ public class StoreFile {
     private HFileContext fileContext;
     private TimeRangeTracker trt;
     private boolean shouldDropCacheBehind;
+    private Collection<StoreFile> compactedFiles = Collections.emptySet();
 
     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
         FileSystem fs) {
@@ -702,6 +722,11 @@ public class StoreFile {
       return this;
     }
 
+    public WriterBuilder withCompactedFiles(Collection<StoreFile> compactedFiles) {
+      this.compactedFiles = compactedFiles;
+      return this;
+    }
+
     /**
      * Create a store file writer. Client is responsible for closing file when
      * done. If metadata, add BEFORE closing using
@@ -733,7 +758,7 @@ public class StoreFile {
       }
       return new Writer(fs, filePath,
           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext,
-          shouldDropCacheBehind, trt);
+          shouldDropCacheBehind, trt, compactedFiles);
     }
   }
 
@@ -793,6 +818,10 @@ public class StoreFile {
     return null;
   }
 
+  Set<String> getCompactedStoreFiles() {
+    return Collections.unmodifiableSet(this.compactedStoreFiles);
+  }
+
   /**
    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
    * local because it is an implementation detail of the HBase regionserver.
@@ -820,6 +849,7 @@ public class StoreFile {
     final TimeRangeTracker timeRangeTracker;
 
     protected HFile.Writer writer;
+    private final Collection<StoreFile> compactedFiles;
 
     /**
      * Creates an HFile.Writer that also write helpful meta data.
@@ -842,7 +872,7 @@ public class StoreFile {
         InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind)
             throws IOException {
       this(fs, path, conf, cacheConf, comparator, bloomType, maxKeys, favoredNodes, fileContext,
-          shouldDropCacheBehind, null);
+          shouldDropCacheBehind, null, Collections.<StoreFile>emptySet());
     }
 
     /**
@@ -858,6 +888,7 @@ public class StoreFile {
      * @param fileContext - The HFile context
      * @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
      * @param trt Ready-made timetracker to use.
+     * @param compactedFilesSupplier Compacted files which not archived
      * @throws IOException problem writing to FS
      */
     private Writer(FileSystem fs, Path path,
@@ -865,8 +896,11 @@ public class StoreFile {
         CacheConfig cacheConf,
         final KVComparator comparator, BloomType bloomType, long maxKeys,
         InetSocketAddress[] favoredNodes, HFileContext fileContext,
-        boolean shouldDropCacheBehind, final TimeRangeTracker trt)
-            throws IOException {
+        boolean shouldDropCacheBehind, final TimeRangeTracker trt,
+        Collection<StoreFile> compactedFiles)
+        throws IOException {
+      this.compactedFiles =
+          (compactedFiles == null ? Collections.<StoreFile> emptySet() : compactedFiles);
       // If passed a TimeRangeTracker, use it. Set timeRangeTrackerSet so we don't destroy it.
       // TODO: put the state of the TRT on the TRT; i.e. make a read-only version (TimeRange) when
       // it no longer writable.
@@ -911,21 +945,61 @@ public class StoreFile {
     }
 
     /**
-     * Writes meta data.
-     * Call before {@link #close()} since its written as meta data to this file.
+     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
      * @param maxSequenceId Maximum sequence id.
      * @param majorCompaction True if this file is product of a major compaction
      * @throws IOException problem writing to FS
      */
     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
-    throws IOException {
+        throws IOException {
+      appendMetadata(maxSequenceId, majorCompaction, Collections.<StoreFile> emptySet());
+    }
+
+    /**
+     * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
+     * @param maxSequenceId Maximum sequence id.
+     * @param majorCompaction True if this file is product of a major compaction
+     * @param storeFiles The compacted store files to generate this new file
+     * @throws IOException problem writing to FS
+     */
+    public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
+        final Collection<StoreFile> storeFiles) throws IOException {
       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
-      writer.appendFileInfo(MAJOR_COMPACTION_KEY,
-          Bytes.toBytes(majorCompaction));
+      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
+      writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
       appendTrackedTimestampsToMetadata();
     }
 
     /**
+     * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The
+     * compacted store files's name is needed. But if the compacted store file is a result of
+     * compaction, it's compacted files which still not archived is needed, too. And don't need to
+     * add compacted files recursively. If file A, B, C compacted to new file D, and file D
+     * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E
+     * compacted to new file F, will add E to F's compacted files first, then add E's compacted
+     * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has
+     * been in E's compacted files, too. See HBASE-20724 for more details.
+     * @param storeFiles The compacted store files to generate this new file
+     * @return bytes of CompactionEventTracker
+     */
+    private byte[] toCompactionEventTrackerBytes(Collection<StoreFile> storeFiles) {
+      Set<String> notArchivedCompactedStoreFiles = new HashSet<>();
+      for (StoreFile sf : this.compactedFiles) {
+        notArchivedCompactedStoreFiles.add(sf.getPath().getName());
+      }
+      Set<String> compactedStoreFiles = new HashSet<>();
+      for (StoreFile storeFile : storeFiles) {
+        compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
+        for (String csf : storeFile.getCompactedStoreFiles()) {
+          if (notArchivedCompactedStoreFiles.contains(csf)) {
+            compactedStoreFiles.add(csf);
+          }
+        }
+      }
+      return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
+    }
+
+    /**
      * Add TimestampRange and earliest put timestamp to Metadata
      */
     public void appendTrackedTimestampsToMetadata() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index b1203c5c..e5f5b81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -74,6 +74,6 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
   @Override
   protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
       CompactionRequest request) throws IOException {
-    return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
+    return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 9759d2b..bea8b1e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -87,7 +87,7 @@ public class DefaultCompactor extends Compactor<Writer> {
   protected List<Path> commitWriter(Writer writer, FileDetails fd,
       CompactionRequest request) throws IOException {
     List<Path> newFiles = Lists.newArrayList(writer.getPath());
-    writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
+    writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
     writer.close();
     return newFiles;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 5e796ad..526b32a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -126,7 +126,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
   @Override
   protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
       CompactionRequest request) throws IOException {
-    List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
+    List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
     assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
     return newFiles;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
new file mode 100644
index 0000000..9925c04
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({LargeTests.class})
+public class TestCleanupCompactedFileAfterFailover {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestCleanupCompactedFileAfterFailover.class);
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static Admin admin;
+  private static Table table;
+
+  private static TableName TABLE_NAME = TableName.valueOf("TestCleanupCompactedFileAfterFailover");
+  private static byte[] ROW = Bytes.toBytes("row");
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+  private static byte[] VALUE = Bytes.toBytes("value");
+  private static final int RS_NUMBER = 5;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    // Set the scanner lease to 20min, so the scanner can't be closed by RegionServer
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
+    TEST_UTIL.getConfiguration()
+        .setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+    TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000");
+    TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
+    TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0");
+    TEST_UTIL.startMiniCluster(RS_NUMBER);
+    admin = TEST_UTIL.getHBaseAdmin();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
+    htd.addFamily(new HColumnDescriptor(FAMILY));
+    admin.createTable(htd);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
+  }
+
+  @After
+  public void after() throws Exception {
+    admin.disableTable(TABLE_NAME);
+    admin.deleteTable(TABLE_NAME);
+  }
+
+  @Test
+  public void testCleanupAfterFailoverWithCompactOnce() throws Exception {
+    testCleanupAfterFailover(1);
+  }
+
+  @Test
+  public void testCleanupAfterFailoverWithCompactTwice() throws Exception {
+    testCleanupAfterFailover(2);
+  }
+
+  @Test
+  public void testCleanupAfterFailoverWithCompactThreeTimes() throws Exception {
+    testCleanupAfterFailover(3);
+  }
+
+  private void testCleanupAfterFailover(int compactNum) throws Exception {
+    HRegionServer rsServedTable = null;
+    List<Region> regions = new ArrayList<>();
+    for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
+        .getLiveRegionServerThreads()) {
+      HRegionServer rs = rsThread.getRegionServer();
+      if (rs.getOnlineTables().contains(TABLE_NAME)) {
+        regions.addAll(rs.getOnlineRegions(TABLE_NAME));
+        rsServedTable = rs;
+      }
+    }
+    assertNotNull(rsServedTable);
+    assertEquals("Table should only have one region", 1, regions.size());
+    HRegion region = (HRegion)regions.get(0);
+    HStore store = (HStore)region.getStore(FAMILY);
+
+    writeDataAndFlush(3, region);
+    assertEquals(3, store.getStorefilesCount());
+
+    // Open a scanner and not close, then the storefile will be referenced
+    store.getScanner(new Scan(), null, Long.MAX_VALUE);
+
+    region.compact(true);
+    assertEquals(1, store.getStorefilesCount());
+    // The compacted file should not be archived as there are references by user scanner
+    assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+
+    for (int i = 1; i < compactNum; i++) {
+      // Compact again
+      region.compact(true);
+      assertEquals(1, store.getStorefilesCount());
+      store.closeAndArchiveCompactedFiles();
+      // Compacted storefiles still be 3 as the new compacted storefile was archived
+      assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+    }
+
+    int walNum = ((FSHLog)(rsServedTable.getWAL(null))).getNumLogFiles();
+    // Roll WAL
+    rsServedTable.walRoller.requestRollAll();
+    // Flush again
+    region.flush(true);
+    // The WAL which contains compaction event marker should be archived
+    assertEquals("The old WAL should be archived", walNum,
+      ((FSHLog)(rsServedTable.getWAL(null))).getNumLogFiles());
+
+    rsServedTable.kill();
+    // Sleep to wait failover
+    Thread.sleep(3000);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+
+    regions.clear();
+    for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
+        .getLiveRegionServerThreads()) {
+      HRegionServer rs = rsThread.getRegionServer();
+      if (rs != rsServedTable && rs.getOnlineTables().contains(TABLE_NAME)) {
+        regions.addAll(rs.getOnlineRegions(TABLE_NAME));
+      }
+    }
+    assertEquals("Table should only have one region", 1, regions.size());
+    region = (HRegion)regions.get(0);
+    store = (HStore)region.getStore(FAMILY);
+    // The compacted storefile should be cleaned and only have 1 storefile
+    assertEquals(1, store.getStorefilesCount());
+  }
+
+  private void writeDataAndFlush(int fileNum, HRegion region) throws Exception {
+    for (int i = 0; i < fileNum; i++) {
+      for (int j = 0; j < 100; j++) {
+        table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, concat(VALUE, j)));
+      }
+      region.flush(true);
+    }
+  }
+
+  private byte[] concat(byte[] base, int index) {
+    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
index 2300002..f54c636 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
@@ -23,18 +23,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -145,48 +140,4 @@ public class TestCleanupCompactedFileOnRegionClose {
     assertNull(((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
         .getCompactedfiles());
   }
-
-  @Test
-  public void testIOExceptionThrownOnClose() throws Exception {
-    byte[] filler = new byte[128000];
-    TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
-    String familyName = "f";
-    byte[] familyNameBytes = Bytes.toBytes(familyName);
-    util.createTable(tableName, familyName);
-
-    Table table = util.getConnection().getTable(tableName);
-
-    HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
-    Region region = rs.getOnlineRegions(tableName).get(0);
-
-    int refSFCount = 4;
-    for (int i = 0; i < refSFCount; i++) {
-      for (int j = 0; j < refSFCount; j++) {
-        Put put = new Put(Bytes.toBytes(j));
-        put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
-        table.put(put);
-      }
-      util.flush(tableName);
-    }
-    assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
-
-    HStore store = (HStore)region.getStore(familyNameBytes);
-    StoreFile hsf = region.getStore(familyNameBytes).getStorefiles().iterator().next();
-    long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
-    StoreFileScanner preadScanner = hsf.getReader().getStoreFileScanner(
-        false, true, false, readPt);
-    preadScanner.seek(KeyValue.LOWESTKEY);
-
-    //Major compact to produce compacted storefiles that need to be cleaned up
-    util.compact(tableName, true);
-    assertNotNull(preadScanner.next());
-    store.closeAndArchiveCompactedFiles(true);
-
-    try {
-      assertNotNull(preadScanner.next());
-      fail("Expected IOException");
-    }catch (IOException ex) {
-      ex.printStackTrace();
-    }
-  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
index 6ec4cd4..37d3652 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -110,6 +111,13 @@ public class TestCompactor {
           return null;
         }
       }).when(writer).appendMetadata(any(long.class), any(boolean.class));
+      doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          realWriter.hasMetadata = true;
+          return null;
+        }
+      }).when(writer).appendMetadata(any(long.class), any(boolean.class), anyCollection());
       doAnswer(new Answer<Path>() {
         @Override
         public Path answer(InvocationOnMock invocation) throws Throwable {