You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/09 16:40:11 UTC
[hbase] 13/13: HBASE-26271 Cleanup the broken store files under
data directory (#3786)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26067
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 36b6088a26e03cba9d070479a95c5934240ca637
Author: BukrosSzabolcs <sz...@cloudera.com>
AuthorDate: Tue Nov 9 17:19:00 2021 +0100
HBASE-26271 Cleanup the broken store files under data directory (#3786)
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Josh Elser <el...@apache.org>
Signed-off-by: Wellington Ramos Chevreuil <wc...@apache.org>
---
.../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 6 +-
.../regionserver/AbstractMultiFileWriter.java | 6 +-
.../hbase/regionserver/BrokenStoreFileCleaner.java | 202 ++++++++++++++++++
.../regionserver/DateTieredMultiFileWriter.java | 2 +-
.../hbase/regionserver/HRegionFileSystem.java | 2 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 27 +++
.../apache/hadoop/hbase/regionserver/HStore.java | 6 +
.../hadoop/hbase/regionserver/StoreEngine.java | 21 ++
.../hbase/regionserver/StripeMultiFileWriter.java | 2 +-
.../compactions/AbstractMultiOutputCompactor.java | 4 +-
.../hbase/regionserver/compactions/Compactor.java | 45 ++++-
.../compactions/DateTieredCompactor.java | 6 +-
.../regionserver/compactions/DefaultCompactor.java | 9 +-
.../regionserver/compactions/StripeCompactor.java | 2 +-
.../FileBasedStoreFileTracker.java | 2 +-
.../MigrationStoreFileTracker.java | 2 +-
.../storefiletracker/StoreFileTracker.java | 6 +
.../storefiletracker/StoreFileTrackerBase.java | 6 -
.../hadoop/hbase/mob/FaultyMobStoreCompactor.java | 3 +-
.../regionserver/TestBrokenStoreFileCleaner.java | 225 +++++++++++++++++++++
.../hbase/regionserver/TestCompactorMemLeak.java | 4 +-
.../storefiletracker/TestStoreFileTracker.java | 1 -
22 files changed, 555 insertions(+), 34 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 01fe000..15f0a73 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
@@ -286,7 +285,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* </ol>
* @param fd File details
* @param scanner Where to read from.
- * @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
* @param throughputController The compaction throughput controller.
@@ -295,7 +293,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
* @return Whether compaction ended; false if it was interrupted for any reason.
*/
@Override
- protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
long bytesWrittenProgressForLog = 0;
@@ -665,7 +663,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
@Override
- protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
+ protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index f250304..82c3867 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -110,7 +110,11 @@ public abstract class AbstractMultiFileWriter implements CellSink, ShipperListen
return paths;
}
- protected abstract Collection<StoreFileWriter> writers();
+ /**
+ * Returns all writers. This is used to prevent deleting currently writen storefiles
+ * during cleanup.
+ */
+ public abstract Collection<StoreFileWriter> writers();
/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
new file mode 100644
index 0000000..0c4807d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BrokenStoreFileCleaner.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Chore, every time it runs, will clear the unsused HFiles in the data
+ * folder.
+ */
+@InterfaceAudience.Private
+public class BrokenStoreFileCleaner extends ScheduledChore {
+ private static final Logger LOG = LoggerFactory.getLogger(BrokenStoreFileCleaner.class);
+ public static final String BROKEN_STOREFILE_CLEANER_ENABLED =
+ "hbase.region.broken.storefilecleaner.enabled";
+ public static final boolean DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED = false;
+ public static final String BROKEN_STOREFILE_CLEANER_TTL =
+ "hbase.region.broken.storefilecleaner.ttl";
+ public static final long DEFAULT_BROKEN_STOREFILE_CLEANER_TTL = 1000 * 60 * 60 * 12; //12h
+ public static final String BROKEN_STOREFILE_CLEANER_DELAY =
+ "hbase.region.broken.storefilecleaner.delay";
+ public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY = 1000 * 60 * 60 * 2; //2h
+ public static final String BROKEN_STOREFILE_CLEANER_DELAY_JITTER =
+ "hbase.region.broken.storefilecleaner.delay.jitter";
+ public static final double DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER = 0.25D;
+ public static final String BROKEN_STOREFILE_CLEANER_PERIOD =
+ "hbase.region.broken.storefilecleaner.period";
+ public static final int DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD = 1000 * 60 * 60 * 6; //6h
+
+ private HRegionServer regionServer;
+ private final AtomicBoolean enabled = new AtomicBoolean(true);
+ private long fileTtl;
+
+ public BrokenStoreFileCleaner(final int delay, final int period, final Stoppable stopper,
+ Configuration conf, HRegionServer regionServer) {
+ super("BrokenStoreFileCleaner", stopper, period, delay);
+ this.regionServer = regionServer;
+ setEnabled(
+ conf.getBoolean(BROKEN_STOREFILE_CLEANER_ENABLED, DEFAULT_BROKEN_STOREFILE_CLEANER_ENABLED));
+ fileTtl = conf.getLong(BROKEN_STOREFILE_CLEANER_TTL, DEFAULT_BROKEN_STOREFILE_CLEANER_TTL);
+ }
+
+ public boolean setEnabled(final boolean enabled) {
+ return this.enabled.getAndSet(enabled);
+ }
+
+ public boolean getEnabled() {
+ return this.enabled.get();
+ }
+
+ @Override
+ public void chore() {
+ if (getEnabled()) {
+ long start = EnvironmentEdgeManager.currentTime();
+ AtomicLong deletedFiles = new AtomicLong(0);
+ AtomicLong failedDeletes = new AtomicLong(0);
+ for (HRegion region : regionServer.getRegions()) {
+ for (HStore store : region.getStores()) {
+ //only do cleanup in stores not using tmp directories
+ if (store.getStoreEngine().requireWritingToTmpDirFirst()) {
+ continue;
+ }
+ Path storePath =
+ new Path(region.getRegionFileSystem().getRegionDir(), store.getColumnFamilyName());
+
+ try {
+ List<FileStatus> fsStoreFiles =
+ Arrays.asList(region.getRegionFileSystem().fs.listStatus(storePath));
+ fsStoreFiles.forEach(
+ file -> cleanFileIfNeeded(file, store, deletedFiles, failedDeletes));
+ } catch (IOException e) {
+ LOG.warn("Failed to list files in {}, cleanup is skipped there",storePath);
+ continue;
+ }
+ }
+ }
+ LOG.debug(
+ "BrokenStoreFileCleaner on {} run for: {}ms. It deleted {} files and tried but failed "
+ + "to delete {}",
+ regionServer.getServerName().getServerName(), EnvironmentEdgeManager.currentTime() - start,
+ deletedFiles.get(), failedDeletes.get());
+ } else {
+ LOG.trace("Broken storefile Cleaner chore disabled! Not cleaning.");
+ }
+ }
+
+ private void cleanFileIfNeeded(FileStatus file, HStore store,
+ AtomicLong deletedFiles, AtomicLong failedDeletes) {
+ if(file.isDirectory()){
+ LOG.trace("This is a Directory {}, skip cleanup", file.getPath());
+ return;
+ }
+
+ if(!validate(file.getPath())){
+ LOG.trace("Invalid file {}, skip cleanup", file.getPath());
+ return;
+ }
+
+ if(!isOldEnough(file)){
+ LOG.trace("Fresh file {}, skip cleanup", file.getPath());
+ return;
+ }
+
+ if(isActiveStorefile(file, store)){
+ LOG.trace("Actively used storefile file {}, skip cleanup", file.getPath());
+ return;
+ }
+
+ // Compacted files can still have readers and are cleaned by a separate chore, so they have to
+ // be skipped here
+ if(isCompactedFile(file, store)){
+ LOG.trace("Cleanup is done by a different chore for file {}, skip cleanup", file.getPath());
+ return;
+ }
+
+ if(isCompactionResultFile(file, store)){
+ LOG.trace("The file is the result of an ongoing compaction {}, skip cleanup", file.getPath());
+ return;
+ }
+
+ deleteFile(file, store, deletedFiles, failedDeletes);
+ }
+
+ private boolean isCompactionResultFile(FileStatus file, HStore store) {
+ return store.getStoreEngine().getCompactor().getCompactionTargets().contains(file.getPath());
+ }
+
+ // Compacted files can still have readers and are cleaned by a separate chore, so they have to
+ // be skipped here
+ private boolean isCompactedFile(FileStatus file, HStore store) {
+ return store.getStoreEngine().getStoreFileManager().getCompactedfiles().stream()
+ .anyMatch(sf -> sf.getPath().equals(file.getPath()));
+ }
+
+ private boolean isActiveStorefile(FileStatus file, HStore store) {
+ return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
+ .anyMatch(sf -> sf.getPath().equals(file.getPath()));
+ }
+
+ boolean validate(Path file) {
+ if (HFileLink.isBackReferencesDir(file) || HFileLink.isBackReferencesDir(file.getParent())) {
+ return true;
+ }
+ return StoreFileInfo.validateStoreFileName(file.getName());
+ }
+
+ boolean isOldEnough(FileStatus file){
+ return file.getModificationTime() + fileTtl < EnvironmentEdgeManager.currentTime();
+ }
+
+ private void deleteFile(FileStatus file, HStore store, AtomicLong deletedFiles,
+ AtomicLong failedDeletes) {
+ Path filePath = file.getPath();
+ LOG.debug("Removing {} from store", filePath);
+ try {
+ boolean success = store.getFileSystem().delete(filePath, false);
+ if (!success) {
+ failedDeletes.incrementAndGet();
+ LOG.warn("Attempted to delete:" + filePath
+ + ", but couldn't. Attempt to delete on next pass.");
+ }
+ else{
+ deletedFiles.incrementAndGet();
+ }
+ } catch (IOException e) {
+ e = e instanceof RemoteException ?
+ ((RemoteException)e).unwrapRemoteException() : e;
+ LOG.warn("Error while deleting: " + filePath, e);
+ }
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
index 8201cb1..1e10eb2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredMultiFileWriter.java
@@ -71,7 +71,7 @@ public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
- protected Collection<StoreFileWriter> writers() {
+ public Collection<StoreFileWriter> writers() {
return lowerBoundary2Writer.values();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 4bcebd9..7cd5c53 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -606,7 +606,7 @@ public class HRegionFileSystem {
writeRegionInfoFileContent(conf, fs, regionInfoFile, regionInfoContent);
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, getTableDir(), regionInfo, false);
- insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
+ insertRegionFilesIntoStoreTracker(allRegionFiles, env, regionFs);
}
return regionDir;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 4bf2d9c..02944a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -432,6 +432,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
*/
final ServerNonceManager nonceManager;
+ private BrokenStoreFileCleaner brokenStoreFileCleaner;
+
@InterfaceAudience.Private
CompactedHFilesDischarger compactedFileDischarger;
@@ -1831,6 +1833,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
if (this.slowLogTableOpsChore != null) {
choreService.scheduleChore(slowLogTableOpsChore);
}
+ if (this.brokenStoreFileCleaner != null) {
+ choreService.scheduleChore(brokenStoreFileCleaner);
+ }
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@@ -1910,6 +1915,22 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
onlyMetaRefresh, this, this);
}
+
+ int brokenStoreFileCleanerPeriod = conf.getInt(
+ BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD,
+ BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_PERIOD);
+ int brokenStoreFileCleanerDelay = conf.getInt(
+ BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY,
+ BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY);
+ double brokenStoreFileCleanerDelayJitter = conf.getDouble(
+ BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY_JITTER,
+ BrokenStoreFileCleaner.DEFAULT_BROKEN_STOREFILE_CLEANER_DELAY_JITTER);
+ double jitterRate = (RandomUtils.nextDouble() - 0.5D) * brokenStoreFileCleanerDelayJitter;
+ long jitterValue = Math.round(brokenStoreFileCleanerDelay * jitterRate);
+ this.brokenStoreFileCleaner =
+ new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay + jitterValue),
+ brokenStoreFileCleanerPeriod, this, conf, this);
+
registerConfigurationObservers();
}
@@ -3484,6 +3505,11 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
return !conf.getBoolean(MASTERLESS_CONFIG_NAME, false);
}
+ @InterfaceAudience.Private
+ public BrokenStoreFileCleaner getBrokenStoreFileCleaner(){
+ return brokenStoreFileCleaner;
+ }
+
@Override
protected void stopChores() {
shutdownChore(nonceManagerChore);
@@ -3494,5 +3520,6 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
shutdownChore(storefileRefresher);
shutdownChore(fsUtilizationChore);
shutdownChore(slowLogTableOpsChore);
+ shutdownChore(brokenStoreFileCleaner);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 4bb43eb..11effea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1157,6 +1157,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
}
replaceStoreFiles(filesToCompact, sfs, true);
+
+ // This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
+ // CleanerChore know that compaction is done and the file can be cleaned up if compaction
+ // have failed.
+ storeEngine.resetCompactionWriter();
+
if (cr.isMajor()) {
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index e6f9395..318b701 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -517,4 +519,23 @@ public abstract class StoreEngine<SF extends StoreFlusher, CP extends Compaction
throw new IOException("Unable to load configured store engine '" + className + "'", e);
}
}
+
+ /**
+ * Whether the implementation of the used storefile tracker requires you to write to temp
+ * directory first, i.e, does not allow broken store files under the actual data directory.
+ */
+ public boolean requireWritingToTmpDirFirst() {
+ return storeFileTracker.requireWritingToTmpDirFirst();
+ }
+
+ /**
+ * Resets the compaction writer when the new file is committed and used as active storefile.
+ * This step is necessary for the correctness of BrokenStoreFileCleanerChore. It lets the
+ * CleanerChore know that compaction is done and the file can be cleaned up if compaction
+ * have failed. Currently called in
+ * @see HStore#doCompaction(CompactionRequestImpl, Collection, User, long, List)
+ */
+ public void resetCompactionWriter(){
+ compactor.resetWriter();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
index fc0598d..a4e943a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
@@ -58,7 +58,7 @@ public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
}
@Override
- protected Collection<StoreFileWriter> writers() {
+ public Collection<StoreFileWriter> writers() {
return existingWriters;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 533be17..19b7a98 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -68,7 +68,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
}
@Override
- protected void abortWriter(T writer) throws IOException {
+ protected void abortWriter() throws IOException {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
@@ -79,5 +79,7 @@ public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWr
e);
}
}
+ //this step signals that the target file is no longer writen and can be cleaned up
+ writer = null;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 47ef0f2..0ee7d34 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -25,9 +25,12 @@ import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELET
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hbase.PrivateConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
@@ -92,6 +96,8 @@ public abstract class Compactor<T extends CellSink> {
private final boolean dropCacheMajor;
private final boolean dropCacheMinor;
+ protected T writer = null;
+
//TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(Configuration conf, HStore store) {
this.conf = conf;
@@ -324,7 +330,6 @@ public abstract class Compactor<T extends CellSink> {
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
- T writer = null;
boolean dropCache;
if (request.isMajor() || request.isAllFiles()) {
dropCache = this.dropCacheMajor;
@@ -348,8 +353,13 @@ public abstract class Compactor<T extends CellSink> {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
+ if (writer != null){
+ LOG.warn("Writer exists when it should not: " + getCompactionTargets().stream()
+ .map(n -> n.toString())
+ .collect(Collectors.joining(", ", "{ ", " }")));
+ }
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor());
- finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
+ finished = performCompaction(fd, scanner, smallestReadPoint, cleanSeqId,
throughputController, request.isAllFiles(), request.getFiles().size());
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
@@ -369,24 +379,23 @@ public abstract class Compactor<T extends CellSink> {
Closeables.close(scanner, true);
}
if (!finished && writer != null) {
- abortWriter(writer);
+ abortWriter();
}
}
assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error";
- return commitWriter(writer, fd, request);
+ return commitWriter(fd, request);
}
- protected abstract List<Path> commitWriter(T writer, FileDetails fd,
+ protected abstract List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException;
- protected abstract void abortWriter(T writer) throws IOException;
+ protected abstract void abortWriter() throws IOException;
/**
* Performs the compaction.
* @param fd FileDetails of cell sink writer
* @param scanner Where to read from.
- * @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <=
* smallestReadPoint
@@ -394,7 +403,7 @@ public abstract class Compactor<T extends CellSink> {
* @param numofFilesToCompact the number of files to compact
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
- protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
assert writer instanceof ShipperListener;
@@ -537,4 +546,24 @@ public abstract class Compactor<T extends CellSink> {
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
dropDeletesFromRow, dropDeletesToRow);
}
+
+ public List<Path> getCompactionTargets(){
+ if (writer == null){
+ return Collections.emptyList();
+ }
+ synchronized (writer){
+ if (writer instanceof StoreFileWriter){
+ return Arrays.asList(((StoreFileWriter)writer).getPath());
+ }
+ return ((AbstractMultiFileWriter)writer).writers().stream().map(sfw -> sfw.getPath()).collect(
+ Collectors.toList());
+ }
+ }
+
+ /**
+ * Reset the Writer when the new storefiles were successfully added
+ */
+ public void resetWriter(){
+ writer = null;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index fd54330..43e037c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -79,8 +79,10 @@ public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTiered
}
@Override
- protected List<Path> commitWriter(DateTieredMultiFileWriter writer, FileDetails fd,
+ protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException {
- return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
+ List<Path> pathList =
+ writer.commitWriters(fd.maxSeqId, request.isAllFiles(), request.getFiles());
+ return pathList;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index afa2429..ad2384a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -63,7 +63,7 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
@Override
- protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
+ protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = Lists.newArrayList(writer.getPath());
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
@@ -72,12 +72,19 @@ public class DefaultCompactor extends Compactor<StoreFileWriter> {
}
@Override
+ protected void abortWriter() throws IOException {
+ abortWriter(writer);
+ }
+
protected void abortWriter(StoreFileWriter writer) throws IOException {
Path leftoverFile = writer.getPath();
try {
writer.close();
} catch (IOException e) {
LOG.warn("Failed to close the writer after an unfinished compaction.", e);
+ } finally {
+ //this step signals that the target file is no longer writen and can be cleaned up
+ writer = null;
}
try {
store.getFileSystem().delete(leftoverFile, false);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 547555e..060a11b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -125,7 +125,7 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
}
@Override
- protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails fd,
+ protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), request.getFiles());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
index 079b59b..8d9b66e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/FileBasedStoreFileTracker.java
@@ -95,7 +95,7 @@ class FileBasedStoreFileTracker extends StoreFileTrackerBase {
}
@Override
- protected boolean requireWritingToTmpDirFirst() {
+ public boolean requireWritingToTmpDirFirst() {
return false;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java
index a6648f2..53a474d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/MigrationStoreFileTracker.java
@@ -57,7 +57,7 @@ class MigrationStoreFileTracker extends StoreFileTrackerBase {
}
@Override
- protected boolean requireWritingToTmpDirFirst() {
+ public boolean requireWritingToTmpDirFirst() {
// Returns true if either of the two StoreFileTracker returns true.
// For example, if we want to migrate from a tracker implementation which can ignore the broken
// files under data directory to a tracker implementation which can not, if we still allow
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
index f56a0dd..aabbe8d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
@@ -88,4 +88,10 @@ public interface StoreFileTracker {
* @param builder The table descriptor builder for the given table.
*/
TableDescriptorBuilder updateWithTrackerConfigs(TableDescriptorBuilder builder);
+
+ /**
+ * Whether the implementation of this tracker requires you to write to temp directory first, i.e,
+ * does not allow broken store files under the actual data directory.
+ */
+ boolean requireWritingToTmpDirFirst();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
index b6de32b..db10f4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -173,12 +173,6 @@ abstract class StoreFileTrackerBase implements StoreFileTracker {
return builder.build();
}
- /**
- * Whether the implementation of this tracker requires you to write to temp directory first, i.e,
- * does not allow broken store files under the actual data directory.
- */
- protected abstract boolean requireWritingToTmpDirFirst();
-
protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException;
protected abstract void doAddCompactionResults(Collection<StoreFileInfo> compactedFiles,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index 50530da..d178d56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
-import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@@ -89,7 +88,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
}
@Override
- protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
+ protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java
new file mode 100644
index 0000000..78755a4
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBrokenStoreFileCleaner.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, RegionServerTests.class })
+public class TestBrokenStoreFileCleaner {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBrokenStoreFileCleaner.class);
+
+ private final HBaseTestingUtil testUtil = new HBaseTestingUtil();
+ private final static byte[] fam = Bytes.toBytes("cf_1");
+ private final static byte[] qual1 = Bytes.toBytes("qf_1");
+ private final static byte[] val = Bytes.toBytes("val");
+ private final static String junkFileName = "409fad9a751c4e8c86d7f32581bdc156";
+ TableName tableName;
+
+
+ @Before
+ public void setUp() throws Exception {
+ testUtil.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
+ "org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+ testUtil.getConfiguration()
+ .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_ENABLED, "true");
+ testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "0");
+ testUtil.getConfiguration()
+ .set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_PERIOD, "15000000");
+ testUtil.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_DELAY, "0");
+ testUtil.startMiniCluster(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ testUtil.deleteTable(tableName);
+ testUtil.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testDeletingJunkFile() throws Exception {
+ tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
+ createTableWithData(tableName);
+
+ HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
+ ServerName sn = testUtil.getMiniHBaseCluster()
+ .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
+ HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
+ BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
+
+ //create junk file
+ HStore store = region.getStore(fam);
+ Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
+ Path junkFilePath = new Path(cfPath, junkFileName);
+
+ FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
+ junkFileOS.writeUTF("hello");
+ junkFileOS.close();
+
+ int storeFiles = store.getStorefilesCount();
+ assertTrue(storeFiles > 0);
+
+ //verify the file exist before the chore and missing afterwards
+ assertTrue(store.getFileSystem().exists(junkFilePath));
+ cleaner.chore();
+ assertFalse(store.getFileSystem().exists(junkFilePath));
+
+ //verify no storefile got deleted
+ int currentStoreFiles = store.getStorefilesCount();
+ assertEquals(currentStoreFiles, storeFiles);
+
+ }
+
+ @Test
+ public void testSkippingCompactedFiles() throws Exception {
+ tableName = TableName.valueOf(getClass().getSimpleName() + "testSkippningCompactedFiles");
+ createTableWithData(tableName);
+
+ HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
+
+ ServerName sn = testUtil.getMiniHBaseCluster()
+ .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
+ HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
+ BrokenStoreFileCleaner cleaner = rs.getBrokenStoreFileCleaner();
+
+ //run major compaction to generate compaced files
+ region.compact(true);
+
+ //make sure there are compacted files
+ HStore store = region.getStore(fam);
+ int compactedFiles = store.getCompactedFilesCount();
+ assertTrue(compactedFiles > 0);
+
+ cleaner.chore();
+
+ //verify none of the compacted files were deleted
+ int existingCompactedFiles = store.getCompactedFilesCount();
+ assertEquals(compactedFiles, existingCompactedFiles);
+
+ //verify adding a junk file does not break anything
+ Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
+ Path junkFilePath = new Path(cfPath, junkFileName);
+
+ FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
+ junkFileOS.writeUTF("hello");
+ junkFileOS.close();
+
+ assertTrue(store.getFileSystem().exists(junkFilePath));
+ cleaner.setEnabled(true);
+ cleaner.chore();
+ assertFalse(store.getFileSystem().exists(junkFilePath));
+
+ //verify compacted files are still intact
+ existingCompactedFiles = store.getCompactedFilesCount();
+ assertEquals(compactedFiles, existingCompactedFiles);
+ }
+
+ @Test
+ public void testJunkFileTTL() throws Exception {
+ tableName = TableName.valueOf(getClass().getSimpleName() + "testDeletingJunkFile");
+ createTableWithData(tableName);
+
+ HRegion region = testUtil.getMiniHBaseCluster().getRegions(tableName).get(0);
+ ServerName sn = testUtil.getMiniHBaseCluster()
+ .getServerHoldingRegion(tableName, region.getRegionInfo().getRegionName());
+ HRegionServer rs = testUtil.getMiniHBaseCluster().getRegionServer(sn);
+
+ //create junk file
+ HStore store = region.getStore(fam);
+ Path cfPath = store.getRegionFileSystem().getStoreDir(store.getColumnFamilyName());
+ Path junkFilePath = new Path(cfPath, junkFileName);
+
+ FSDataOutputStream junkFileOS = store.getFileSystem().create(junkFilePath);
+ junkFileOS.writeUTF("hello");
+ junkFileOS.close();
+
+ int storeFiles = store.getStorefilesCount();
+ assertTrue(storeFiles > 0);
+
+ //verify the file exist before the chore
+ assertTrue(store.getFileSystem().exists(junkFilePath));
+
+ //set a 5 sec ttl
+ rs.getConfiguration().set(BrokenStoreFileCleaner.BROKEN_STOREFILE_CLEANER_TTL, "5000");
+ BrokenStoreFileCleaner cleaner = new BrokenStoreFileCleaner(15000000,
+ 0, rs, rs.getConfiguration(), rs);
+ cleaner.chore();
+ //file is still present after chore run
+ assertTrue(store.getFileSystem().exists(junkFilePath));
+ Thread.sleep(5000);
+ cleaner.chore();
+ assertFalse(store.getFileSystem().exists(junkFilePath));
+
+ //verify no storefile got deleted
+ int currentStoreFiles = store.getStorefilesCount();
+ assertEquals(currentStoreFiles, storeFiles);
+ }
+
+ private Table createTableWithData(TableName tableName) throws IOException {
+ Table table = testUtil.createTable(tableName, fam);
+ try {
+ for (int i = 1; i < 10; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ table.put(p);
+ }
+ // flush them
+ testUtil.getAdmin().flush(tableName);
+ for (int i = 11; i < 20; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ table.put(p);
+ }
+ // flush them
+ testUtil.getAdmin().flush(tableName);
+ for (int i = 21; i < 30; i++) {
+ Put p = new Put(Bytes.toBytes("row" + i));
+ p.addColumn(fam, qual1, val);
+ table.put(p);
+ }
+ // flush them
+ testUtil.getAdmin().flush(tableName);
+ } catch (IOException e) {
+ table.close();
+ throw e;
+ }
+ return table;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
index e0fca1f..6a0a8ba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactorMemLeak.java
@@ -128,13 +128,13 @@ public class TestCompactorMemLeak {
}
@Override
- protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
+ protected List<Path> commitWriter(FileDetails fd,
CompactionRequestImpl request) throws IOException {
HFileWriterImpl writerImpl = (HFileWriterImpl) writer.writer;
Cell cell = writerImpl.getLastCell();
// The cell should be backend with an KeyOnlyKeyValue.
IS_LAST_CELL_ON_HEAP.set(cell instanceof KeyOnlyKeyValue);
- return super.commitWriter(writer, fd, request);
+ return super.commitWriter(fd, request);
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
index b30ca47..9818972 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/storefiletracker/TestStoreFileTracker.java
@@ -47,7 +47,6 @@ public class TestStoreFileTracker extends DefaultStoreFileTracker {
} else {
LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
}
-
}
@Override