You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ac...@apache.org on 2019/05/08 23:39:40 UTC
[hbase] branch branch-1.3 updated: HBASE-22330 Backport HBASE-20724
(Sometimes some compacted storefiles are still opened after region
failover) to branch-1
This is an automated email from the ASF dual-hosted git repository.
achouhan pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.3 by this push:
new 074ebb8 HBASE-22330 Backport HBASE-20724 (Sometimes some compacted storefiles are still opened after region failover) to branch-1
074ebb8 is described below
commit 074ebb856675d8bdd389e62774e2572e84d835a3
Author: Abhishek Singh Chouhan <ac...@apache.org>
AuthorDate: Tue May 7 14:53:12 2019 -0700
HBASE-22330 Backport HBASE-20724 (Sometimes some compacted storefiles are still opened after region failover) to branch-1
---
.../apache/hadoop/hbase/protobuf/ProtobufUtil.java | 31 ++
.../hbase/protobuf/generated/HFileProtos.java | 543 ++++++++++++++++++++-
hbase-protocol/src/main/protobuf/HFile.proto | 4 +
.../regionserver/AbstractMultiFileWriter.java | 12 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 61 +--
.../hadoop/hbase/regionserver/StoreFile.java | 92 +++-
.../compactions/DateTieredCompactor.java | 2 +-
.../regionserver/compactions/DefaultCompactor.java | 2 +-
.../regionserver/compactions/StripeCompactor.java | 2 +-
.../TestCleanupCompactedFileAfterFailover.java | 193 ++++++++
.../TestCleanupCompactedFileOnRegionClose.java | 49 --
.../regionserver/compactions/TestCompactor.java | 8 +
12 files changed, 882 insertions(+), 117 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 98ee13e..5abc1b7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -34,11 +34,14 @@ import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@@ -119,6 +122,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
@@ -3430,4 +3434,31 @@ public final class ProtobufUtil {
return new TimeRange(minStamp, maxStamp);
}
+ public static byte[] toCompactionEventTrackerBytes(Set<String> storeFiles) {
+ HFileProtos.CompactionEventTracker.Builder builder =
+ HFileProtos.CompactionEventTracker.newBuilder();
+ for (String sf : storeFiles) {
+ builder.addCompactedStoreFile(ByteString.copyFromUtf8(sf));
+ }
+ return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+ }
+
+ public static Set<String> toCompactedStoreFiles(byte[] bytes) throws IOException {
+ if (bytes != null && ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pbLen = ProtobufUtil.lengthOfPBMagic();
+ HFileProtos.CompactionEventTracker.Builder builder =
+ HFileProtos.CompactionEventTracker.newBuilder();
+ ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
+ HFileProtos.CompactionEventTracker compactionEventTracker = builder.build();
+ List<ByteString> compactedStoreFiles = compactionEventTracker.getCompactedStoreFileList();
+ if (compactedStoreFiles != null && compactedStoreFiles.size() != 0) {
+ Set<String> compactedStoreFileSet = new HashSet<>();
+ for (ByteString sf : compactedStoreFiles) {
+ compactedStoreFileSet.add(sf.toStringUtf8());
+ }
+ return compactedStoreFileSet;
+ }
+ }
+ return Collections.emptySet();
+ }
}
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
index 5b6f2f4..7f146b7 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
@@ -8,6 +8,503 @@ public final class HFileProtos {
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
+ public interface CompactionEventTrackerOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // repeated bytes compacted_store_file = 1;
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ java.util.List<com.google.protobuf.ByteString> getCompactedStoreFileList();
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ int getCompactedStoreFileCount();
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ com.google.protobuf.ByteString getCompactedStoreFile(int index);
+ }
+ /**
+ * Protobuf type {@code hbase.pb.CompactionEventTracker}
+ */
+ public static final class CompactionEventTracker extends
+ com.google.protobuf.GeneratedMessage
+ implements CompactionEventTrackerOrBuilder {
+ // Use CompactionEventTracker.newBuilder() to construct.
+ private CompactionEventTracker(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private CompactionEventTracker(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final CompactionEventTracker defaultInstance;
+ public static CompactionEventTracker getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public CompactionEventTracker getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final com.google.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private CompactionEventTracker(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+ compactedStoreFile_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
+ mutable_bitField0_ |= 0x00000001;
+ }
+ compactedStoreFile_.add(input.readBytes());
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+ compactedStoreFile_ = java.util.Collections.unmodifiableList(compactedStoreFile_);
+ }
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.class, org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.Builder.class);
+ }
+
+ public static com.google.protobuf.Parser<CompactionEventTracker> PARSER =
+ new com.google.protobuf.AbstractParser<CompactionEventTracker>() {
+ public CompactionEventTracker parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new CompactionEventTracker(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public com.google.protobuf.Parser<CompactionEventTracker> getParserForType() {
+ return PARSER;
+ }
+
+ // repeated bytes compacted_store_file = 1;
+ public static final int COMPACTED_STORE_FILE_FIELD_NUMBER = 1;
+ private java.util.List<com.google.protobuf.ByteString> compactedStoreFile_;
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public java.util.List<com.google.protobuf.ByteString>
+ getCompactedStoreFileList() {
+ return compactedStoreFile_;
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public int getCompactedStoreFileCount() {
+ return compactedStoreFile_.size();
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public com.google.protobuf.ByteString getCompactedStoreFile(int index) {
+ return compactedStoreFile_.get(index);
+ }
+
+ private void initFields() {
+ compactedStoreFile_ = java.util.Collections.emptyList();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ for (int i = 0; i < compactedStoreFile_.size(); i++) {
+ output.writeBytes(1, compactedStoreFile_.get(i));
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ {
+ int dataSize = 0;
+ for (int i = 0; i < compactedStoreFile_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(compactedStoreFile_.get(i));
+ }
+ size += dataSize;
+ size += 1 * getCompactedStoreFileList().size();
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker)) {
+ return super.equals(obj);
+ }
+ org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker other = (org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) obj;
+
+ boolean result = true;
+ result = result && getCompactedStoreFileList()
+ .equals(other.getCompactedStoreFileList());
+ result = result &&
+ getUnknownFields().equals(other.getUnknownFields());
+ return result;
+ }
+
+ private int memoizedHashCode = 0;
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptorForType().hashCode();
+ if (getCompactedStoreFileCount() > 0) {
+ hash = (37 * hash) + COMPACTED_STORE_FILE_FIELD_NUMBER;
+ hash = (53 * hash) + getCompactedStoreFileList().hashCode();
+ }
+ hash = (29 * hash) + getUnknownFields().hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code hbase.pb.CompactionEventTracker}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTrackerOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.class, org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.Builder.class);
+ }
+
+ // Construct using org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ compactedStoreFile_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker getDefaultInstanceForType() {
+ return org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.getDefaultInstance();
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker build() {
+ org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker buildPartial() {
+ org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker result = new org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker(this);
+ int from_bitField0_ = bitField0_;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ compactedStoreFile_ = java.util.Collections.unmodifiableList(compactedStoreFile_);
+ bitField0_ = (bitField0_ & ~0x00000001);
+ }
+ result.compactedStoreFile_ = compactedStoreFile_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) {
+ return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker other) {
+ if (other == org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.getDefaultInstance()) return this;
+ if (!other.compactedStoreFile_.isEmpty()) {
+ if (compactedStoreFile_.isEmpty()) {
+ compactedStoreFile_ = other.compactedStoreFile_;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ } else {
+ ensureCompactedStoreFileIsMutable();
+ compactedStoreFile_.addAll(other.compactedStoreFile_);
+ }
+ onChanged();
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // repeated bytes compacted_store_file = 1;
+ private java.util.List<com.google.protobuf.ByteString> compactedStoreFile_ = java.util.Collections.emptyList();
+ private void ensureCompactedStoreFileIsMutable() {
+ if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+ compactedStoreFile_ = new java.util.ArrayList<com.google.protobuf.ByteString>(compactedStoreFile_);
+ bitField0_ |= 0x00000001;
+ }
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public java.util.List<com.google.protobuf.ByteString>
+ getCompactedStoreFileList() {
+ return java.util.Collections.unmodifiableList(compactedStoreFile_);
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public int getCompactedStoreFileCount() {
+ return compactedStoreFile_.size();
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public com.google.protobuf.ByteString getCompactedStoreFile(int index) {
+ return compactedStoreFile_.get(index);
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public Builder setCompactedStoreFile(
+ int index, com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureCompactedStoreFileIsMutable();
+ compactedStoreFile_.set(index, value);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public Builder addCompactedStoreFile(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureCompactedStoreFileIsMutable();
+ compactedStoreFile_.add(value);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public Builder addAllCompactedStoreFile(
+ java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
+ ensureCompactedStoreFileIsMutable();
+ super.addAll(values, compactedStoreFile_);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes compacted_store_file = 1;</code>
+ */
+ public Builder clearCompactedStoreFile() {
+ compactedStoreFile_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000001);
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:hbase.pb.CompactionEventTracker)
+ }
+
+ static {
+ defaultInstance = new CompactionEventTracker(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:hbase.pb.CompactionEventTracker)
+ }
+
public interface FileInfoProtoOrBuilder
extends com.google.protobuf.MessageOrBuilder {
@@ -2338,6 +2835,11 @@ public final class HFileProtos {
}
private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_hbase_pb_CompactionEventTracker_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
internal_static_hbase_pb_FileInfoProto_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -2356,35 +2858,42 @@ public final class HFileProtos {
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\013HFile.proto\022\010hbase.pb\032\013HBase.proto\"<\n\r" +
- "FileInfoProto\022+\n\tmap_entry\030\001 \003(\0132\030.hbase" +
- ".pb.BytesBytesPair\"\221\003\n\020FileTrailerProto\022" +
- "\030\n\020file_info_offset\030\001 \001(\004\022 \n\030load_on_ope" +
- "n_data_offset\030\002 \001(\004\022$\n\034uncompressed_data" +
- "_index_size\030\003 \001(\004\022 \n\030total_uncompressed_" +
- "bytes\030\004 \001(\004\022\030\n\020data_index_count\030\005 \001(\r\022\030\n" +
- "\020meta_index_count\030\006 \001(\r\022\023\n\013entry_count\030\007" +
- " \001(\004\022\035\n\025num_data_index_levels\030\010 \001(\r\022\037\n\027f" +
- "irst_data_block_offset\030\t \001(\004\022\036\n\026last_dat",
- "a_block_offset\030\n \001(\004\022\035\n\025comparator_class" +
- "_name\030\013 \001(\t\022\031\n\021compression_codec\030\014 \001(\r\022\026" +
- "\n\016encryption_key\030\r \001(\014BA\n*org.apache.had" +
- "oop.hbase.protobuf.generatedB\013HFileProto" +
- "sH\001\210\001\001\240\001\001"
+ "\n\013HFile.proto\022\010hbase.pb\032\013HBase.proto\"6\n\026" +
+ "CompactionEventTracker\022\034\n\024compacted_stor" +
+ "e_file\030\001 \003(\014\"<\n\rFileInfoProto\022+\n\tmap_ent" +
+ "ry\030\001 \003(\0132\030.hbase.pb.BytesBytesPair\"\221\003\n\020F" +
+ "ileTrailerProto\022\030\n\020file_info_offset\030\001 \001(" +
+ "\004\022 \n\030load_on_open_data_offset\030\002 \001(\004\022$\n\034u" +
+ "ncompressed_data_index_size\030\003 \001(\004\022 \n\030tot" +
+ "al_uncompressed_bytes\030\004 \001(\004\022\030\n\020data_inde" +
+ "x_count\030\005 \001(\r\022\030\n\020meta_index_count\030\006 \001(\r\022" +
+ "\023\n\013entry_count\030\007 \001(\004\022\035\n\025num_data_index_l",
+ "evels\030\010 \001(\r\022\037\n\027first_data_block_offset\030\t" +
+ " \001(\004\022\036\n\026last_data_block_offset\030\n \001(\004\022\035\n\025" +
+ "comparator_class_name\030\013 \001(\t\022\031\n\021compressi" +
+ "on_codec\030\014 \001(\r\022\026\n\016encryption_key\030\r \001(\014BA" +
+ "\n*org.apache.hadoop.hbase.protobuf.gener" +
+ "atedB\013HFileProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
- internal_static_hbase_pb_FileInfoProto_descriptor =
+ internal_static_hbase_pb_CompactionEventTracker_descriptor =
getDescriptor().getMessageTypes().get(0);
+ internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_hbase_pb_CompactionEventTracker_descriptor,
+ new java.lang.String[] { "CompactedStoreFile", });
+ internal_static_hbase_pb_FileInfoProto_descriptor =
+ getDescriptor().getMessageTypes().get(1);
internal_static_hbase_pb_FileInfoProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_FileInfoProto_descriptor,
new java.lang.String[] { "MapEntry", });
internal_static_hbase_pb_FileTrailerProto_descriptor =
- getDescriptor().getMessageTypes().get(1);
+ getDescriptor().getMessageTypes().get(2);
internal_static_hbase_pb_FileTrailerProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_FileTrailerProto_descriptor,
diff --git a/hbase-protocol/src/main/protobuf/HFile.proto b/hbase-protocol/src/main/protobuf/HFile.proto
index 5c5e4f3..6988331 100644
--- a/hbase-protocol/src/main/protobuf/HFile.proto
+++ b/hbase-protocol/src/main/protobuf/HFile.proto
@@ -26,6 +26,10 @@ option optimize_for = SPEED;
import "HBase.proto";
+message CompactionEventTracker {
+ repeated bytes compacted_store_file = 1;
+}
+
// Map of name/values
message FileInfoProto {
repeated BytesBytesPair map_entry = 1;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index 4987c59..eade5ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -65,18 +66,23 @@ public abstract class AbstractMultiFileWriter implements CellSink {
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
+ return commitWriters(maxSeqId, majorCompaction, Collections.<StoreFile>emptySet());
+ }
+
+ public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
+ Collection<StoreFile> storeFiles) throws IOException {
preCommitWriters();
Collection<StoreFile.Writer> writers = this.writers();
if (LOG.isDebugEnabled()) {
- LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
- + ", majorCompaction=" + majorCompaction);
+ LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId + ", majorCompaction="
+ + majorCompaction);
}
List<Path> paths = new ArrayList<Path>();
for (Writer writer : writers) {
if (writer == null) {
continue;
}
- writer.appendMetadata(maxSeqId, majorCompaction);
+ writer.appendMetadata(maxSeqId, majorCompaction, storeFiles);
preCloseWriter(writer);
paths.add(writer.getPath());
writer.close();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index b5bb92a..42930cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -553,6 +553,7 @@ public class HStore implements Store {
totalValidStoreFile++;
}
+ Set<String> compactedStoreFiles = new HashSet<>();
ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
IOException ioe = null;
try {
@@ -565,6 +566,7 @@ public class HStore implements Store {
LOG.debug("loaded " + storeFile.toStringDetailed());
}
results.add(storeFile);
+ compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
}
} catch (InterruptedException e) {
if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
@@ -589,6 +591,21 @@ public class HStore implements Store {
throw ioe;
}
+ // Remove the compacted files from result
+ List<StoreFile> filesToRemove = new ArrayList<>(compactedStoreFiles.size());
+ for (StoreFile storeFile : results) {
+ if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
+ LOG.warn("Clearing the compacted storefile " + storeFile + " from this store");
+ storeFile.getReader().close(true);
+ filesToRemove.add(storeFile);
+ }
+ }
+ results.removeAll(filesToRemove);
+ if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
+ LOG.debug("Moving the files " + filesToRemove + " to archive");
+ this.fs.removeStoreFiles(this.getColumnFamilyName(), filesToRemove);
+ }
+
return results;
}
@@ -874,7 +891,7 @@ public class HStore implements Store {
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
if (compactedfiles != null && !compactedfiles.isEmpty()) {
- removeCompactedfiles(compactedfiles, true);
+ removeCompactedfiles(compactedfiles);
}
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
@@ -1082,7 +1099,8 @@ public class HStore implements Store {
.withMaxKeyCount(maxKeyCount)
.withFavoredNodes(favoredNodes)
.withFileContext(hFileContext)
- .withShouldDropCacheBehind(shouldDropBehind);
+ .withShouldDropCacheBehind(shouldDropBehind)
+ .withCompactedFiles(this.getCompactedfiles());
if (trt != null) {
builder.withTimeRangeTracker(trt);
}
@@ -2731,11 +2749,6 @@ public class HStore implements Store {
@Override
public synchronized void closeAndArchiveCompactedFiles() throws IOException {
- closeAndArchiveCompactedFiles(false);
- }
-
- @VisibleForTesting
- public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) throws IOException {
// ensure other threads do not attempt to archive the same files on close()
archiveLock.lock();
try {
@@ -2756,7 +2769,7 @@ public class HStore implements Store {
lock.readLock().unlock();
}
if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
- removeCompactedfiles(copyCompactedfiles, storeClosing);
+ removeCompactedfiles(copyCompactedfiles);
}
} finally {
archiveLock.unlock();
@@ -2768,7 +2781,7 @@ public class HStore implements Store {
* @param compactedfiles The compacted files in this store that are not active in reads
* @throws IOException
*/
- private void removeCompactedfiles(Collection<StoreFile> compactedfiles, boolean storeClosing)
+ private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
throws IOException {
final List<StoreFile> filesToRemove = new ArrayList<StoreFile>(compactedfiles.size());
for (final StoreFile file : compactedfiles) {
@@ -2776,16 +2789,6 @@ public class HStore implements Store {
try {
StoreFile.Reader r = file.getReader();
- //Compacted files in the list should always be marked compacted away. In the event
- //they're contradicting in order to guarantee data consistency
- //should we choose one and ignore the other?
- if (storeClosing && r != null && !r.isCompactedAway()) {
- String msg =
- "Region closing but StoreFile is in compacted list but not compacted away: " +
- file.getPath();
- throw new IllegalStateException(msg);
- }
-
if (r == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("The file " + file + " was closed but still not archived.");
@@ -2793,13 +2796,7 @@ public class HStore implements Store {
filesToRemove.add(file);
}
- //If store is closing we're ignoring any references to keep things consistent
- //and remove compacted storefiles from the region directory
- if (r != null && file.isCompactedAway() && (!r.isReferencedInReads() || storeClosing)) {
- if (storeClosing && r.isReferencedInReads()) {
- LOG.warn("Region closing but StoreFile still has references: file=" +
- file.getPath() + ", refCount=" + r.getRefCount());
- }
+ if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
// Even if deleting fails we need not bother as any new scanners won't be
// able to use the compacted file as the status is already compactedAway
if (LOG.isTraceEnabled()) {
@@ -2815,16 +2812,8 @@ public class HStore implements Store {
+ ", refCount=" + r.getRefCount() + ", skipping for now.");
}
} catch (Exception e) {
- String msg = "Exception while trying to close the compacted store file " +
- file.getPath();
- if (storeClosing) {
- msg = "Store is closing. " + msg;
- }
- LOG.error(msg, e);
- //if we get an exception let caller know so it can abort the server
- if (storeClosing) {
- throw new IOException(msg, e);
- }
+ LOG.error(
+ "Exception while trying to close the compacted store file " + file.getPath().getName());
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index a7f55f3..b9201e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -26,7 +26,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -114,6 +117,11 @@ public class StoreFile {
/** Key for timestamp of earliest-put in metadata*/
public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
+ /**
+ * Key for compaction event which contains the compacted storefiles in FileInfo
+ */
+ public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
+
private final StoreFileInfo fileInfo;
private final FileSystem fs;
@@ -167,6 +175,9 @@ public class StoreFile {
// It's set whenever you get a Reader.
private boolean excludeFromMinorCompaction = false;
+ // This file was product of these compacted store files
+ private final Set<String> compactedStoreFiles = new HashSet<>();
+
/** Meta key set when store file is a result of a bulk load */
public static final byte[] BULKLOAD_TASK_KEY =
Bytes.toBytes("BULKLOAD_SOURCE_TASK");
@@ -507,6 +518,14 @@ public class StoreFile {
"proceeding without", e);
this.reader.timeRange = null;
}
+
+ try {
+ byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
+ this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
+ } catch (IOException e) {
+ LOG.error("Error reading compacted storefiles from meta data", e);
+ }
+
// initialize so we can reuse them after reader closed.
firstKey = reader.getFirstKey();
lastKey = reader.getLastKey();
@@ -619,6 +638,7 @@ public class StoreFile {
private HFileContext fileContext;
private TimeRangeTracker trt;
private boolean shouldDropCacheBehind;
+ private Collection<StoreFile> compactedFiles = Collections.emptySet();
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
@@ -702,6 +722,11 @@ public class StoreFile {
return this;
}
+ public WriterBuilder withCompactedFiles(Collection<StoreFile> compactedFiles) {
+ this.compactedFiles = compactedFiles;
+ return this;
+ }
+
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
@@ -733,7 +758,7 @@ public class StoreFile {
}
return new Writer(fs, filePath,
conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, fileContext,
- shouldDropCacheBehind, trt);
+ shouldDropCacheBehind, trt, compactedFiles);
}
}
@@ -793,6 +818,10 @@ public class StoreFile {
return null;
}
+ Set<String> getCompactedStoreFiles() {
+ return Collections.unmodifiableSet(this.compactedStoreFiles);
+ }
+
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
@@ -820,6 +849,7 @@ public class StoreFile {
final TimeRangeTracker timeRangeTracker;
protected HFile.Writer writer;
+ private final Collection<StoreFile> compactedFiles;
/**
* Creates an HFile.Writer that also write helpful meta data.
@@ -842,7 +872,7 @@ public class StoreFile {
InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean shouldDropCacheBehind)
throws IOException {
this(fs, path, conf, cacheConf, comparator, bloomType, maxKeys, favoredNodes, fileContext,
- shouldDropCacheBehind, null);
+ shouldDropCacheBehind, null, Collections.<StoreFile>emptySet());
}
/**
@@ -858,6 +888,7 @@ public class StoreFile {
* @param fileContext - The HFile context
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
* @param trt Ready-made timetracker to use.
+ * @param compactedFilesSupplier Compacted files which not archived
* @throws IOException problem writing to FS
*/
private Writer(FileSystem fs, Path path,
@@ -865,8 +896,11 @@ public class StoreFile {
CacheConfig cacheConf,
final KVComparator comparator, BloomType bloomType, long maxKeys,
InetSocketAddress[] favoredNodes, HFileContext fileContext,
- boolean shouldDropCacheBehind, final TimeRangeTracker trt)
- throws IOException {
+ boolean shouldDropCacheBehind, final TimeRangeTracker trt,
+ Collection<StoreFile> compactedFiles)
+ throws IOException {
+ this.compactedFiles =
+ (compactedFiles == null ? Collections.<StoreFile> emptySet() : compactedFiles);
// If passed a TimeRangeTracker, use it. Set timeRangeTrackerSet so we don't destroy it.
// TODO: put the state of the TRT on the TRT; i.e. make a read-only version (TimeRange) when
// it no longer writable.
@@ -911,21 +945,61 @@ public class StoreFile {
}
/**
- * Writes meta data.
- * Call before {@link #close()} since its written as meta data to this file.
+ * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
- throws IOException {
+ throws IOException {
+ appendMetadata(maxSequenceId, majorCompaction, Collections.<StoreFile> emptySet());
+ }
+
+ /**
+ * Writes meta data. Call before {@link #close()} since its written as meta data to this file.
+ * @param maxSequenceId Maximum sequence id.
+ * @param majorCompaction True if this file is product of a major compaction
+ * @param storeFiles The compacted store files to generate this new file
+ * @throws IOException problem writing to FS
+ */
+ public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
+ final Collection<StoreFile> storeFiles) throws IOException {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
- writer.appendFileInfo(MAJOR_COMPACTION_KEY,
- Bytes.toBytes(majorCompaction));
+ writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
+ writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
appendTrackedTimestampsToMetadata();
}
/**
+ * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The
+ * compacted store files's name is needed. But if the compacted store file is a result of
+ * compaction, it's compacted files which still not archived is needed, too. And don't need to
+ * add compacted files recursively. If file A, B, C compacted to new file D, and file D
+ * compacted to new file E, will write A, B, C, D to file E's compacted files. So if file E
+ * compacted to new file F, will add E to F's compacted files first, then add E's compacted
+ * files: A, B, C, D to it. And no need to add D's compacted file, as D's compacted files has
+ * been in E's compacted files, too. See HBASE-20724 for more details.
+ * @param storeFiles The compacted store files to generate this new file
+ * @return bytes of CompactionEventTracker
+ */
+ private byte[] toCompactionEventTrackerBytes(Collection<StoreFile> storeFiles) {
+ Set<String> notArchivedCompactedStoreFiles = new HashSet<>();
+ for (StoreFile sf : this.compactedFiles) {
+ notArchivedCompactedStoreFiles.add(sf.getPath().getName());
+ }
+ Set<String> compactedStoreFiles = new HashSet<>();
+ for (StoreFile storeFile : storeFiles) {
+ compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
+ for (String csf : storeFile.getCompactedStoreFiles()) {
+ if (notArchivedCompactedStoreFiles.contains(csf)) {
+ compactedStoreFiles.add(csf);
+ }
+ }
+ }
+ return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
+ }
+
+ /**
* Add TimestampRange and earliest put timestamp to Metadata
*/
public void appendTrackedTimestampsToMetadata() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index b1203c5c..e5f5b81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -74,6 +74,6 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
@Override
protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
- return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
+ return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 9759d2b..bea8b1e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -87,7 +87,7 @@ public class DefaultCompactor extends Compactor<Writer> {
protected List<Path> commitWriter(Writer writer, FileDetails fd,
CompactionRequest request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
- writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
+ writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
writer.close();
return newFiles;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 5e796ad..526b32a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -126,7 +126,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
@Override
protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
- List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
+ List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
new file mode 100644
index 0000000..9925c04
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({LargeTests.class})
+public class TestCleanupCompactedFileAfterFailover {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestCleanupCompactedFileAfterFailover.class);
+
+ private static HBaseTestingUtility TEST_UTIL;
+ private static Admin admin;
+ private static Table table;
+
+ private static TableName TABLE_NAME = TableName.valueOf("TestCleanupCompactedFileAfterFailover");
+ private static byte[] ROW = Bytes.toBytes("row");
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+ private static byte[] QUALIFIER = Bytes.toBytes("cq");
+ private static byte[] VALUE = Bytes.toBytes("value");
+ private static final int RS_NUMBER = 5;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ // Set the scanner lease to 20min, so the scanner can't be closed by RegionServer
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 1200000);
+ TEST_UTIL.getConfiguration()
+ .setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+ TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000");
+ TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", "1024");
+ TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0");
+ TEST_UTIL.startMiniCluster(RS_NUMBER);
+ admin = TEST_UTIL.getHBaseAdmin();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void before() throws Exception {
+ HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
+ htd.addFamily(new HColumnDescriptor(FAMILY));
+ admin.createTable(htd);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
+ }
+
+ @After
+ public void after() throws Exception {
+ admin.disableTable(TABLE_NAME);
+ admin.deleteTable(TABLE_NAME);
+ }
+
+ @Test
+ public void testCleanupAfterFailoverWithCompactOnce() throws Exception {
+ testCleanupAfterFailover(1);
+ }
+
+ @Test
+ public void testCleanupAfterFailoverWithCompactTwice() throws Exception {
+ testCleanupAfterFailover(2);
+ }
+
+ @Test
+ public void testCleanupAfterFailoverWithCompactThreeTimes() throws Exception {
+ testCleanupAfterFailover(3);
+ }
+
+ private void testCleanupAfterFailover(int compactNum) throws Exception {
+ HRegionServer rsServedTable = null;
+ List<Region> regions = new ArrayList<>();
+ for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
+ .getLiveRegionServerThreads()) {
+ HRegionServer rs = rsThread.getRegionServer();
+ if (rs.getOnlineTables().contains(TABLE_NAME)) {
+ regions.addAll(rs.getOnlineRegions(TABLE_NAME));
+ rsServedTable = rs;
+ }
+ }
+ assertNotNull(rsServedTable);
+ assertEquals("Table should only have one region", 1, regions.size());
+ HRegion region = (HRegion)regions.get(0);
+ HStore store = (HStore)region.getStore(FAMILY);
+
+ writeDataAndFlush(3, region);
+ assertEquals(3, store.getStorefilesCount());
+
+ // Open a scanner and not close, then the storefile will be referenced
+ store.getScanner(new Scan(), null, Long.MAX_VALUE);
+
+ region.compact(true);
+ assertEquals(1, store.getStorefilesCount());
+ // The compacted file should not be archived as there are references by user scanner
+ assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+
+ for (int i = 1; i < compactNum; i++) {
+ // Compact again
+ region.compact(true);
+ assertEquals(1, store.getStorefilesCount());
+ store.closeAndArchiveCompactedFiles();
+ // Compacted storefiles still be 3 as the new compacted storefile was archived
+ assertEquals(3, store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+ }
+
+ int walNum = ((FSHLog)(rsServedTable.getWAL(null))).getNumLogFiles();
+ // Roll WAL
+ rsServedTable.walRoller.requestRollAll();
+ // Flush again
+ region.flush(true);
+ // The WAL which contains compaction event marker should be archived
+ assertEquals("The old WAL should be archived", walNum,
+ ((FSHLog)(rsServedTable.getWAL(null))).getNumLogFiles());
+
+ rsServedTable.kill();
+ // Sleep to wait failover
+ Thread.sleep(3000);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+
+ regions.clear();
+ for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster()
+ .getLiveRegionServerThreads()) {
+ HRegionServer rs = rsThread.getRegionServer();
+ if (rs != rsServedTable && rs.getOnlineTables().contains(TABLE_NAME)) {
+ regions.addAll(rs.getOnlineRegions(TABLE_NAME));
+ }
+ }
+ assertEquals("Table should only have one region", 1, regions.size());
+ region = (HRegion)regions.get(0);
+ store = (HStore)region.getStore(FAMILY);
+ // The compacted storefile should be cleaned and only have 1 storefile
+ assertEquals(1, store.getStorefilesCount());
+ }
+
+ private void writeDataAndFlush(int fileNum, HRegion region) throws Exception {
+ for (int i = 0; i < fileNum; i++) {
+ for (int j = 0; j < 100; j++) {
+ table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, concat(VALUE, j)));
+ }
+ region.flush(true);
+ }
+ }
+
+ private byte[] concat(byte[] base, int index) {
+ return Bytes.toBytes(Bytes.toString(base) + "-" + index);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
index 2300002..f54c636 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
@@ -23,18 +23,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -145,48 +140,4 @@ public class TestCleanupCompactedFileOnRegionClose {
assertNull(((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
.getCompactedfiles());
}
-
- @Test
- public void testIOExceptionThrownOnClose() throws Exception {
- byte[] filler = new byte[128000];
- TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
- String familyName = "f";
- byte[] familyNameBytes = Bytes.toBytes(familyName);
- util.createTable(tableName, familyName);
-
- Table table = util.getConnection().getTable(tableName);
-
- HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
- Region region = rs.getOnlineRegions(tableName).get(0);
-
- int refSFCount = 4;
- for (int i = 0; i < refSFCount; i++) {
- for (int j = 0; j < refSFCount; j++) {
- Put put = new Put(Bytes.toBytes(j));
- put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
- table.put(put);
- }
- util.flush(tableName);
- }
- assertEquals(refSFCount, region.getStoreFileList(new byte[][]{familyNameBytes}).size());
-
- HStore store = (HStore)region.getStore(familyNameBytes);
- StoreFile hsf = region.getStore(familyNameBytes).getStorefiles().iterator().next();
- long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
- StoreFileScanner preadScanner = hsf.getReader().getStoreFileScanner(
- false, true, false, readPt);
- preadScanner.seek(KeyValue.LOWESTKEY);
-
- //Major compact to produce compacted storefiles that need to be cleaned up
- util.compact(tableName, true);
- assertNotNull(preadScanner.next());
- store.closeAndArchiveCompactedFiles(true);
-
- try {
- assertNotNull(preadScanner.next());
- fail("Expected IOException");
- }catch (IOException ex) {
- ex.printStackTrace();
- }
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
index 6ec4cd4..37d3652 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -110,6 +111,13 @@ public class TestCompactor {
return null;
}
}).when(writer).appendMetadata(any(long.class), any(boolean.class));
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ realWriter.hasMetadata = true;
+ return null;
+ }
+ }).when(writer).appendMetadata(any(long.class), any(boolean.class), anyCollection());
doAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock invocation) throws Throwable {