You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/14 12:54:21 UTC
[2/3] hbase git commit: HBASE-18453 CompactionRequest should not be
exposed to user directly
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6edf006..86a24ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -17,6 +17,59 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
+import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.text.ParseException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Optional;
+import java.util.RandomAccess;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -90,6 +143,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@@ -97,7 +151,6 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Optional;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
@@ -143,58 +196,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
-import java.io.EOFException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.text.ParseException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.RandomAccess;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
-
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
-import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@@ -254,9 +255,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// - the thread that owns the lock (allow reentrancy)
// - reference count of (reentrant) locks held by the thread
// - the row itself
- private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
+ new ConcurrentHashMap<>();
- protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
+ protected final Map<byte[], HStore> stores =
+ new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
// TODO: account for each registered handler in HeapSize computation
private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap();
@@ -513,7 +516,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** A result object from prepare flush cache stage */
@VisibleForTesting
static class PrepareFlushResult {
- final FlushResult result; // indicating a failure result from prepare
+ final FlushResultImpl result; // indicating a failure result from prepare
final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
final TreeMap<byte[], List<Path>> committedFiles;
final TreeMap<byte[], MemstoreSize> storeFlushableSize;
@@ -523,7 +526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final MemstoreSize totalFlushableSize;
/** Constructs an early exit case */
- PrepareFlushResult(FlushResult result, long flushSeqId) {
+ PrepareFlushResult(FlushResultImpl result, long flushSeqId) {
this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemstoreSize());
}
@@ -538,7 +541,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
private PrepareFlushResult(
- FlushResult result,
+ FlushResultImpl result,
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
TreeMap<byte[], List<Path>> committedFiles,
TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId,
@@ -616,7 +619,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final long rowProcessorTimeout;
// Last flush time for each Store. Useful when we are flushing for each column
- private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
final RegionServerServices rsServices;
private RegionServerAccounting rsAccounting;
@@ -802,7 +805,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.disallowWritesInRecovering =
conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING,
HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG);
- configurationManager = Optional.absent();
+ configurationManager = Optional.empty();
// disable stats tracking system tables, but check the config for everything else
this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals(
@@ -902,22 +905,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long maxSeqId = initializeStores(reporter, status);
this.mvcc.advanceTo(maxSeqId);
if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) {
- List<Store> stores = this.getStores(); // update the stores that we are replaying
+ Collection<HStore> stores = this.stores.values();
try {
- for (Store store : stores) {
- ((HStore) store).startReplayingFromWAL();
- }
+ // update the stores that we are replaying
+ stores.forEach(HStore::startReplayingFromWAL);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId,
- replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
+ replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
// Make sure mvcc is up to max.
this.mvcc.advanceTo(maxSeqId);
} finally {
- for (Store store : stores) { // update the stores that we are done replaying
- ((HStore)store).stopReplayingFromWAL();
- }
+ // update the stores that we are done replaying
+ stores.forEach(HStore::startReplayingFromWAL);
}
-
}
this.lastReplayedOpenRegionSeqId = maxSeqId;
@@ -947,7 +947,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.flushPolicy = FlushPolicyFactory.create(this, conf);
long lastFlushTime = EnvironmentEdgeManager.currentTime();
- for (Store store: stores.values()) {
+ for (HStore store: stores.values()) {
this.lastStoreFlushTimeMap.put(store, lastFlushTime);
}
@@ -988,10 +988,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return Highest sequenceId found out in a Store.
* @throws IOException
*/
- private long initializeStores(final CancelableProgressable reporter, MonitoredTask status)
- throws IOException {
+ private long initializeStores(CancelableProgressable reporter, MonitoredTask status)
+ throws IOException {
// Load in all the HStores.
-
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
@@ -1050,11 +1049,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!allStoresOpened) {
// something went wrong, close all opened stores
LOG.error("Could not initialize all stores for the region=" + this);
- for (Store store : this.stores.values()) {
+ for (HStore store : this.stores.values()) {
try {
store.close();
} catch (IOException e) {
- LOG.warn(e.getMessage());
+ LOG.warn("close store failed", e);
}
}
}
@@ -1079,11 +1078,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
private NavigableMap<byte[], List<Path>> getStoreFiles() {
NavigableMap<byte[], List<Path>> allStoreFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (Store store: getStores()) {
+ for (HStore store : stores.values()) {
Collection<StoreFile> storeFiles = store.getStorefiles();
- if (storeFiles == null) continue;
+ if (storeFiles == null) {
+ continue;
+ }
List<Path> storeFileNames = new ArrayList<>();
- for (StoreFile storeFile: storeFiles) {
+ for (StoreFile storeFile : storeFiles) {
storeFileNames.add(storeFile.getPath());
}
allStoreFiles.put(store.getColumnFamilyDescriptor().getName(), storeFileNames);
@@ -1121,10 +1122,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return True if this region has references.
*/
public boolean hasReferences() {
- for (Store store : this.stores.values()) {
- if (store.hasReferences()) return true;
- }
- return false;
+ return stores.values().stream().anyMatch(HStore::hasReferences);
}
public void blockUpdates() {
@@ -1137,19 +1135,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public HDFSBlocksDistribution getHDFSBlocksDistribution() {
- HDFSBlocksDistribution hdfsBlocksDistribution =
- new HDFSBlocksDistribution();
- synchronized (this.stores) {
- for (Store store : this.stores.values()) {
- Collection<StoreFile> storeFiles = store.getStorefiles();
- if (storeFiles == null) continue;
- for (StoreFile sf : storeFiles) {
- HDFSBlocksDistribution storeFileBlocksDistribution =
- sf.getHDFSBlockDistribution();
- hdfsBlocksDistribution.add(storeFileBlocksDistribution);
- }
- }
- }
+ HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
+ stores.values().stream().filter(s -> s.getStorefiles() != null)
+ .flatMap(s -> s.getStorefiles().stream()).map(StoreFile::getHDFSBlockDistribution)
+ .forEachOrdered(hdfsBlocksDistribution::add);
return hdfsBlocksDistribution;
}
@@ -1161,8 +1150,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return The HDFS blocks distribution for the given region.
* @throws IOException
*/
- public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
- final TableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException {
+ public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
+ TableDescriptor tableDescriptor, HRegionInfo regionInfo) throws IOException {
Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName());
return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath);
}
@@ -1176,9 +1165,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return The HDFS blocks distribution for the given region.
* @throws IOException
*/
- public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf,
- final TableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath)
- throws IOException {
+ public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
+ TableDescriptor tableDescriptor, HRegionInfo regionInfo, Path tablePath) throws IOException {
HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
FileSystem fs = tablePath.getFileSystem(conf);
@@ -1407,9 +1395,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
new Throwable("LOGGING: REMOVE"));
// REMOVE BELOW!!!!
LOG.info("DEBUG LIST ALL FILES");
- for (Store store: this.stores.values()) {
+ for (HStore store : this.stores.values()) {
LOG.info("store " + store.getColumnFamilyName());
- for (StoreFile sf: store.getStorefiles()) {
+ for (StoreFile sf : store.getStorefiles()) {
LOG.info(sf.toStringDetailed());
}
}
@@ -1667,7 +1655,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
new ExecutorCompletionService<>(storeCloserThreadPool);
// close each store in parallel
- for (final Store store : stores.values()) {
+ for (HStore store : stores.values()) {
MemstoreSize flushableSize = store.getSizeToFlush();
if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) {
if (getRegionServerServices() != null) {
@@ -1740,11 +1728,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
private long getMemstoreHeapSize() {
- long size = 0;
- for (Store s : this.stores.values()) {
- size += s.getSizeOfMemStore().getHeapSize();
- }
- return size;
+ return stores.values().stream().mapToLong(s -> s.getSizeOfMemStore().getHeapSize()).sum();
}
@Override
@@ -1902,17 +1886,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public long getOldestHfileTs(boolean majorCompactionOnly) throws IOException {
long result = Long.MAX_VALUE;
- for (Store store : getStores()) {
+ for (HStore store : stores.values()) {
Collection<StoreFile> storeFiles = store.getStorefiles();
- if (storeFiles == null) continue;
+ if (storeFiles == null) {
+ continue;
+ }
for (StoreFile file : storeFiles) {
StoreFileReader sfReader = file.getReader();
- if (sfReader == null) continue;
+ if (sfReader == null) {
+ continue;
+ }
HFile.Reader reader = sfReader.getHFileReader();
- if (reader == null) continue;
+ if (reader == null) {
+ continue;
+ }
if (majorCompactionOnly) {
byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY);
- if (val == null || !Bytes.toBoolean(val)) continue;
+ if (val == null || !Bytes.toBoolean(val)) {
+ continue;
+ }
}
result = Math.min(result, reader.getFileContext().getFileCreateTime());
}
@@ -1942,20 +1934,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// These methods are meant to be called periodically by the HRegionServer for
// upkeep.
//////////////////////////////////////////////////////////////////////////////
-
- /** @return returns size of largest HStore. */
+ /**
+ * @return returns size of largest HStore.
+ */
public long getLargestHStoreSize() {
- long size = 0;
- for (Store h : stores.values()) {
- long storeSize = h.getSize();
- if (storeSize > size) {
- size = storeSize;
- }
- }
- return size;
+ return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L);
}
- /*
+ /**
* Do preparation for pending compaction.
* @throws IOException
*/
@@ -1964,19 +1950,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void triggerMajorCompaction() throws IOException {
- for (Store s : getStores()) {
- s.triggerMajorCompaction();
- }
+ stores.values().forEach(HStore::triggerMajorCompaction);
}
@Override
- public void compact(final boolean majorCompaction) throws IOException {
+ public void compact(boolean majorCompaction) throws IOException {
if (majorCompaction) {
triggerMajorCompaction();
}
- for (Store s : getStores()) {
- CompactionContext compaction = s.requestCompaction();
- if (compaction != null) {
+ for (HStore s : stores.values()) {
+ Optional<CompactionContext> compaction = s.requestCompaction();
+ if (compaction.isPresent()) {
ThroughputController controller = null;
if (rsServices != null) {
controller = CompactionThroughputControllerFactory.create(rsServices, conf);
@@ -1984,43 +1968,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (controller == null) {
controller = NoLimitThroughputController.INSTANCE;
}
- compact(compaction, s, controller, null);
+ compact(compaction.get(), s, controller, null);
}
}
}
/**
- * This is a helper function that compact all the stores synchronously
+ * This is a helper function that compact all the stores synchronously.
+ * <p>
* It is used by utilities and testing
- *
- * @throws IOException e
*/
+ @VisibleForTesting
public void compactStores() throws IOException {
- for (Store s : getStores()) {
- CompactionContext compaction = s.requestCompaction();
- if (compaction != null) {
- compact(compaction, s, NoLimitThroughputController.INSTANCE, null);
+ for (HStore s : stores.values()) {
+ Optional<CompactionContext> compaction = s.requestCompaction();
+ if (compaction.isPresent()) {
+ compact(compaction.get(), s, NoLimitThroughputController.INSTANCE, null);
}
}
}
/**
- * This is a helper function that compact the given store
+ * This is a helper function that compact the given store.
+ * <p>
* It is used by utilities and testing
- *
- * @throws IOException e
*/
@VisibleForTesting
- void compactStore(byte[] family, ThroughputController throughputController)
- throws IOException {
- Store s = getStore(family);
- CompactionContext compaction = s.requestCompaction();
- if (compaction != null) {
- compact(compaction, s, throughputController, null);
+ void compactStore(byte[] family, ThroughputController throughputController) throws IOException {
+ HStore s = getStore(family);
+ Optional<CompactionContext> compaction = s.requestCompaction();
+ if (compaction.isPresent()) {
+ compact(compaction.get(), s, throughputController, null);
}
}
- /*
+ /**
* Called by compaction thread and after region is opened to compact the
* HStores if necessary.
*
@@ -2035,12 +2017,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param throughputController
* @return whether the compaction completed
*/
- public boolean compact(CompactionContext compaction, Store store,
+ public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController) throws IOException {
return compact(compaction, store, throughputController, null);
}
- public boolean compact(CompactionContext compaction, Store store,
+ public boolean compact(CompactionContext compaction, HStore store,
ThroughputController throughputController, User user) throws IOException {
assert compaction != null && compaction.hasSelection();
assert !compaction.getRequest().getFiles().isEmpty();
@@ -2214,7 +2196,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
- public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
+ public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
@@ -2261,10 +2243,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
try {
- Collection<Store> specificStoresToFlush =
+ Collection<HStore> specificStoresToFlush =
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
- FlushResult fs = internalFlushcache(specificStoresToFlush,
- status, writeFlushRequestWalMarker);
+ FlushResultImpl fs =
+ internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
@@ -2297,7 +2279,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* that you always flush all stores). Otherwise the method will always
* returns true which will make a lot of flush requests.
*/
- boolean shouldFlushStore(Store store) {
+ boolean shouldFlushStore(HStore store) {
long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
store.getColumnFamilyDescriptor().getName()) - 1;
if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) {
@@ -2349,7 +2331,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
//since we didn't flush in the recent past, flush now if certain conditions
//are met. Return true on first such memstore hit.
- for (Store s : getStores()) {
+ for (Store s : stores.values()) {
if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
// we have an old enough edit in the memstore, flush
whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
@@ -2361,39 +2343,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Flushing all stores.
- *
* @see #internalFlushcache(Collection, MonitoredTask, boolean)
*/
- private FlushResult internalFlushcache(MonitoredTask status)
- throws IOException {
+ private FlushResult internalFlushcache(MonitoredTask status) throws IOException {
return internalFlushcache(stores.values(), status, false);
}
/**
* Flushing given stores.
- *
* @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean)
*/
- private FlushResult internalFlushcache(final Collection<Store> storesToFlush,
- MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
- return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush,
- status, writeFlushWalMarker);
+ private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status,
+ boolean writeFlushWalMarker) throws IOException {
+ return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status,
+ writeFlushWalMarker);
}
/**
- * Flush the memstore. Flushing the memstore is a little tricky. We have a lot
- * of updates in the memstore, all of which have also been written to the wal.
- * We need to write those updates in the memstore out to disk, while being
- * able to process reads/writes as much as possible during the flush
- * operation.
+ * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
+ * memstore, all of which have also been written to the wal. We need to write those updates in the
+ * memstore out to disk, while being able to process reads/writes as much as possible during the
+ * flush operation.
* <p>
- * This method may block for some time. Every time you call it, we up the
- * regions sequence id even if we don't flush; i.e. the returned region id
- * will be at least one larger than the last edit applied to this region. The
- * returned id does not refer to an actual edit. The returned id can be used
- * for say installing a bulk loaded file just ahead of the last hfile that was
- * the result of this flush, etc.
- *
+ * This method may block for some time. Every time you call it, we up the regions sequence id even
+ * if we don't flush; i.e. the returned region id will be at least one larger than the last edit
+ * applied to this region. The returned id does not refer to an actual edit. The returned id can
+ * be used for say installing a bulk loaded file just ahead of the last hfile that was the result
+ * of this flush, etc.
* @param wal Null if we're NOT to go via wal.
* @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
* @param storesToFlush The list of stores to flush.
@@ -2401,9 +2377,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of WAL is required.
*/
- protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
- final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
- throws IOException {
+ protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, Collection<HStore> storesToFlush,
+ MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
PrepareFlushResult result
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
if (result.result == null) {
@@ -2415,9 +2390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
justification="FindBugs seems confused about trxId")
- protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
- final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
- throws IOException {
+ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
+ Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
+ throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@@ -2439,11 +2414,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// edits in the WAL sub-system. Up the sequence number so the resulting flush id is for
// sure just beyond the last appended region edit and not associated with any edit
// (useful as marker when bulk loading, etc.).
- FlushResult flushResult = null;
if (wal != null) {
writeEntry = mvcc.begin();
long flushOpSeqId = writeEntry.getWriteNumber();
- flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
+ FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
flushOpSeqId, "Nothing to flush",
writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
mvcc.completeAndWait(writeEntry);
@@ -2479,9 +2453,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
- for (Store store: storesToFlush) {
+ for (HStore store : storesToFlush) {
flushedFamilyNamesToSeq.put(store.getColumnFamilyDescriptor().getName(),
- ((HStore) store).preFlushSeqIDEstimation());
+ store.preFlushSeqIDEstimation());
}
TreeMap<byte[], StoreFlushContext> storeFlushCtxs = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -2517,7 +2491,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
flushedSeqId = flushOpSeqId = myseqid;
}
- for (Store s : storesToFlush) {
+ for (HStore s : storesToFlush) {
MemstoreSize flushableSize = s.getSizeToFlush();
totalSizeOfFlushableStores.incMemstoreSize(flushableSize);
storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId));
@@ -2555,7 +2529,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Utility method broken out of internalPrepareFlushCache so that method is smaller.
*/
- private void logFatLineOnFlush(final Collection<Store> storesToFlush, final long sequenceId) {
+ private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId) {
if (!LOG.isInfoEnabled()) {
return;
}
@@ -2563,7 +2537,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
StringBuilder perCfExtras = null;
if (!isAllFamilies(storesToFlush)) {
perCfExtras = new StringBuilder();
- for (Store store: storesToFlush) {
+ for (HStore store: storesToFlush) {
perCfExtras.append("; ").append(store.getColumnFamilyName());
perCfExtras.append("=")
.append(StringUtils.byteDesc(store.getSizeToFlush().getDataSize()));
@@ -2611,7 +2585,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* @return True if passed Set is all families in the region.
*/
- private boolean isAllFamilies(final Collection<Store> families) {
+ private boolean isAllFamilies(Collection<HStore> families) {
return families == null || this.stores.size() == families.size();
}
@@ -2639,11 +2613,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Intentional; notify is about completed flush")
- protected FlushResult internalFlushCacheAndCommit(
- final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
- final Collection<Store> storesToFlush)
- throws IOException {
-
+ protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,
+ PrepareFlushResult prepareResult, Collection<HStore> storesToFlush) throws IOException {
// prepare flush context is carried via PrepareFlushResult
TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs;
TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles;
@@ -2673,7 +2644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
- Iterator<Store> it = storesToFlush.iterator();
+ Iterator<HStore> it = storesToFlush.iterator();
// stores.values() and storeFlushCtxs have same order
for (StoreFlushContext flush : storeFlushCtxs.values()) {
boolean needsCompaction = flush.commit(status);
@@ -2746,7 +2717,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// Record latest flush time
- for (Store store: storesToFlush) {
+ for (HStore store: storesToFlush) {
this.lastStoreFlushTimeMap.put(store, startTime);
}
@@ -4002,34 +3973,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- /*
+ /**
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
- * set; when set we will run operations that make sense in the increment/append scenario but
- * that do not make sense otherwise.
- * @see #applyToMemstore(Store, Cell, long)
+ * set; when set we will run operations that make sense in the increment/append scenario
+ * but that do not make sense otherwise.
+ * @see #applyToMemstore(HStore, Cell, long)
*/
- private void applyToMemstore(final Store store, final List<Cell> cells, final boolean delta,
+ private void applyToMemstore(HStore store, List<Cell> cells, boolean delta,
MemstoreSize memstoreSize) throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
- ((HStore) store).upsert(cells, getSmallestReadPoint(), memstoreSize);
+ store.upsert(cells, getSmallestReadPoint(), memstoreSize);
} else {
- ((HStore) store).add(cells, memstoreSize);
+ store.add(cells, memstoreSize);
}
}
- /*
- * @see #applyToMemstore(Store, List, boolean, boolean, long)
+ /**
+ * @see #applyToMemstore(HStore, List, boolean, boolean, long)
*/
- private void applyToMemstore(final Store store, final Cell cell, MemstoreSize memstoreSize)
- throws IOException {
+ private void applyToMemstore(HStore store, Cell cell, MemstoreSize memstoreSize)
+ throws IOException {
// Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!!
if (store == null) {
checkFamily(CellUtil.cloneFamily(cell));
// Unreachable because checkFamily will throw exception
}
- ((HStore) store).add(cell, memstoreSize);
+ store.add(cell, memstoreSize);
}
@Override
@@ -4368,7 +4339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Figure which store the edit is meant for.
if (store == null || !CellUtil.matchingFamily(cell,
store.getColumnFamilyDescriptor().getName())) {
- store = getHStore(cell);
+ store = getStore(cell);
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
@@ -4497,7 +4468,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(Operation.REPLAY_EVENT);
try {
- HStore store = this.getHStore(compaction.getFamilyName().toByteArray());
+ HStore store = this.getStore(compaction.getFamilyName().toByteArray());
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Found Compaction WAL edit for deleted family:"
@@ -4567,10 +4538,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
long flushSeqId = flush.getFlushSequenceNumber();
- HashSet<Store> storesToFlush = new HashSet<>();
+ HashSet<HStore> storesToFlush = new HashSet<>();
for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
byte[] family = storeFlush.getFamilyName().toByteArray();
- Store store = getStore(family);
+ HStore store = getStore(family);
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush start marker from primary, but the family is not found. Ignoring"
@@ -4807,7 +4778,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throws IOException {
for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
byte[] family = storeFlush.getFamilyName().toByteArray();
- Store store = getStore(family);
+ HStore store = getStore(family);
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush commit marker from primary, but the family is not found."
@@ -4843,7 +4814,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* if the memstore edits have seqNums smaller than the given seq id
* @throws IOException
*/
- private MemstoreSize dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException {
+ private MemstoreSize dropMemstoreContentsForSeqId(long seqId, HStore store) throws IOException {
MemstoreSize totalFreedSize = new MemstoreSize();
this.updatesLock.writeLock().lock();
try {
@@ -4857,7 +4828,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Prepare flush (take a snapshot) and then abort (drop the snapshot)
if (store == null) {
- for (Store s : stores.values()) {
+ for (HStore s : stores.values()) {
totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(s, currentSeqId));
}
} else {
@@ -4874,7 +4845,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return totalFreedSize;
}
- private MemstoreSize doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId)
+ private MemstoreSize doDropStoreMemstoreContentsForSeqId(HStore s, long currentSeqId)
throws IOException {
MemstoreSize flushableSize = s.getSizeToFlush();
this.decrMemstoreSize(flushableSize);
@@ -4965,7 +4936,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) {
// stores of primary may be different now
byte[] family = storeDescriptor.getFamilyName().toByteArray();
- Store store = getStore(family);
+ HStore store = getStore(family);
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a region open marker from primary, but the family is not found. "
@@ -5081,7 +5052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) {
// stores of primary may be different now
family = storeDescriptor.getFamilyName().toByteArray();
- HStore store = getHStore(family);
+ HStore store = getStore(family);
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a bulk load marker from primary, but the family is not found. "
@@ -5119,9 +5090,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (writestate.flushing) {
boolean canDrop = true;
if (prepareFlushResult.storeFlushCtxs != null) {
- for (Entry<byte[], StoreFlushContext> entry
- : prepareFlushResult.storeFlushCtxs.entrySet()) {
- Store store = getStore(entry.getKey());
+ for (Entry<byte[], StoreFlushContext> entry : prepareFlushResult.storeFlushCtxs
+ .entrySet()) {
+ HStore store = getStore(entry.getKey());
if (store == null) {
continue;
}
@@ -5164,9 +5135,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
startRegionOperation(); // obtain region close lock
try {
- Map<Store, Long> map = new HashMap<>();
+ Map<HStore, Long> map = new HashMap<>();
synchronized (writestate) {
- for (Store store : getStores()) {
+ for (HStore store : stores.values()) {
// TODO: some stores might see new data from flush, while others do not which
// MIGHT break atomic edits across column families.
long maxSeqIdBefore = store.getMaxSequenceId();
@@ -5207,10 +5178,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
dropPrepareFlushIfPossible();
// advance the mvcc read point so that the new flushed files are visible.
- // either greater than flush seq number or they were already picked up via flush.
- for (Store s : getStores()) {
- mvcc.advanceTo(s.getMaxMemstoreTS());
- }
+ // either greater than flush seq number or they were already picked up via flush.
+ for (HStore s : stores.values()) {
+ mvcc.advanceTo(s.getMaxMemstoreTS());
+ }
// smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely
@@ -5222,7 +5193,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (!map.isEmpty()) {
- for (Map.Entry<Store, Long> entry : map.entrySet()) {
+ for (Map.Entry<HStore, Long> entry : map.entrySet()) {
// Drop the memstore contents if they are now smaller than the latest seen flushed file
totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
.getDataSize();
@@ -5242,13 +5213,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private void logRegionFiles() {
if (LOG.isTraceEnabled()) {
LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: ");
- for (Store s : stores.values()) {
- Collection<StoreFile> storeFiles = s.getStorefiles();
- if (storeFiles == null) continue;
- for (StoreFile sf : storeFiles) {
- LOG.trace(getRegionInfo().getEncodedName() + " : " + sf);
- }
- }
+ stores.values().stream().filter(s -> s.getStorefiles() != null)
+ .flatMap(s -> s.getStorefiles().stream())
+ .forEachOrdered(sf -> LOG.trace(getRegionInfo().getEncodedName() + " : " + sf));
}
}
@@ -5272,17 +5239,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ " does not match this region: " + this.getRegionInfo());
}
- /*
+ /**
* Used by tests
* @param s Store to add edit too.
* @param cell Cell to add.
* @param memstoreSize
*/
- protected void restoreEdit(final HStore s, final Cell cell, MemstoreSize memstoreSize) {
+ @VisibleForTesting
+ protected void restoreEdit(HStore s, Cell cell, MemstoreSize memstoreSize) {
s.add(cell, memstoreSize);
}
- /*
+ /**
* @param fs
* @param p File to check.
* @return True if file was zero-length (and if so, we'll delete it in here).
@@ -5291,7 +5259,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
throws IOException {
FileStatus stat = fs.getFileStatus(p);
- if (stat.getLen() > 0) return false;
+ if (stat.getLen() > 0) {
+ return false;
+ }
LOG.warn("File " + p + " is zero-length, deleting.");
fs.delete(p, false);
return true;
@@ -5311,49 +5281,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public Store getStore(final byte[] column) {
- return getHStore(column);
- }
-
- public HStore getHStore(final byte[] column) {
- return (HStore) this.stores.get(column);
+ public HStore getStore(byte[] column) {
+ return this.stores.get(column);
}
/**
- * Return HStore instance. Does not do any copy: as the number of store is limited, we
- * iterate on the list.
+ * Return HStore instance. Does not do any copy: as the number of store is limited, we iterate on
+ * the list.
*/
- private HStore getHStore(Cell cell) {
- for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
- if (CellUtil.matchingFamily(cell, famStore.getKey(), 0, famStore.getKey().length)) {
- return (HStore) famStore.getValue();
- }
- }
-
- return null;
+ private HStore getStore(Cell cell) {
+ return stores.entrySet().stream().filter(e -> CellUtil.matchingFamily(cell, e.getKey()))
+ .map(e -> e.getValue()).findFirst().orElse(null);
}
@Override
- public List<Store> getStores() {
- List<Store> list = new ArrayList<>(stores.size());
- list.addAll(stores.values());
- return list;
+ public List<HStore> getStores() {
+ return new ArrayList<>(stores.values());
}
@Override
- public List<String> getStoreFileList(final byte [][] columns)
- throws IllegalArgumentException {
+ public List<String> getStoreFileList(byte[][] columns) throws IllegalArgumentException {
List<String> storeFileNames = new ArrayList<>();
- synchronized(closeLock) {
- for(byte[] column : columns) {
- Store store = this.stores.get(column);
+ synchronized (closeLock) {
+ for (byte[] column : columns) {
+ HStore store = this.stores.get(column);
if (store == null) {
- throw new IllegalArgumentException("No column family : " +
- new String(column) + " available");
+ throw new IllegalArgumentException(
+ "No column family : " + new String(column) + " available");
}
Collection<StoreFile> storeFiles = store.getStorefiles();
- if (storeFiles == null) continue;
- for (StoreFile storeFile: storeFiles) {
+ if (storeFiles == null) {
+ continue;
+ }
+ for (StoreFile storeFile : storeFiles) {
storeFileNames.add(storeFile.getPath().toString());
}
@@ -5368,7 +5328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
//////////////////////////////////////////////////////////////////////////////
/** Make sure this is a valid row for the HRegion */
- void checkRow(final byte [] row, String op) throws IOException {
+ void checkRow(byte[] row, String op) throws IOException {
if (!rowIsInRange(getRegionInfo(), row)) {
throw new WrongRegionException("Requested row out of range for " +
op + " on HRegion " + this + ", startKey='" +
@@ -5637,7 +5597,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte[] familyName = p.getFirst();
String path = p.getSecond();
- HStore store = getHStore(familyName);
+ HStore store = getStore(familyName);
if (store == null) {
IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException(
"No such column family " + Bytes.toStringBinary(familyName));
@@ -5697,7 +5657,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Pair<byte[], String> p : familyPaths) {
byte[] familyName = p.getFirst();
String path = p.getSecond();
- HStore store = getHStore(familyName);
+ HStore store = getStore(familyName);
if (!familyWithFinalPath.containsKey(familyName)) {
familyWithFinalPath.put(familyName, new ArrayList<>());
}
@@ -5737,7 +5697,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Pair<Path, Path> p : entry.getValue()) {
String path = p.getFirst().toString();
Path commitedStoreFile = p.getSecond();
- HStore store = getHStore(familyName);
+ HStore store = getStore(familyName);
try {
store.bulkLoadHFile(familyName, path, commitedStoreFile);
// Note the size of the store file
@@ -5912,7 +5872,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
- Store store = stores.get(entry.getKey());
+ HStore store = stores.get(entry.getKey());
KeyValueScanner scanner;
try {
scanner = store.getScanner(scan, entry.getValue(), this.readPt);
@@ -7145,7 +7105,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// If no WAL, need to stamp it here.
CellUtil.setSequenceId(cell, sequenceId);
}
- applyToMemstore(getHStore(cell), cell, memstoreSize);
+ applyToMemstore(getStore(cell), cell, memstoreSize);
}
}
@@ -7296,7 +7256,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return returnResults? cpResult: null;
}
Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());
- Map<Store, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
+ Map<HStore, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
// Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and
// what to return back to the client (in 'forMemStore' and 'results' respectively).
WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);
@@ -7311,7 +7271,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
}
// Now write to MemStore. Do it a column family at a time.
- for (Map.Entry<Store, List<Cell>> e : forMemStore.entrySet()) {
+ for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) {
applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize);
}
mvcc.completeAndWait(writeEntry);
@@ -7419,18 +7379,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param forMemStore Fill in here what to apply to the MemStore (by Store).
* @return A WALEdit to apply to WAL or null if we are to skip the WAL.
*/
- private WALEdit reckonDeltas(final Operation op, final Mutation mutation,
- final Durability effectiveDurability, final Map<Store, List<Cell>> forMemStore,
- final List<Cell> results)
- throws IOException {
+ private WALEdit reckonDeltas(Operation op, Mutation mutation, Durability effectiveDurability,
+ Map<HStore, List<Cell>> forMemStore, List<Cell> results) throws IOException {
WALEdit walEdit = null;
long now = EnvironmentEdgeManager.currentTime();
final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
// Process a Store/family at a time.
for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
- final byte [] columnFamilyName = entry.getKey();
+ final byte[] columnFamilyName = entry.getKey();
List<Cell> deltas = entry.getValue();
- Store store = this.stores.get(columnFamilyName);
+ HStore store = this.stores.get(columnFamilyName);
// Reckon for the Store what to apply to WAL and MemStore.
List<Cell> toApply =
reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);
@@ -7462,11 +7420,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return Resulting Cells after <code>deltas</code> have been applied to current
* values. Side effect is our filling out of the <code>results</code> List.
*/
- private List<Cell> reckonDeltasByStore(final Store store, final Operation op,
- final Mutation mutation, final Durability effectiveDurability, final long now,
- final List<Cell> deltas, final List<Cell> results)
- throws IOException {
- byte [] columnFamily = store.getColumnFamilyDescriptor().getName();
+ private List<Cell> reckonDeltasByStore(HStore store, Operation op, Mutation mutation,
+ Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)
+ throws IOException {
+ byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
List<Cell> toApply = new ArrayList<>(deltas.size());
// Get previous values for all columns in this family.
List<Cell> currentValues = get(mutation, store, deltas,
@@ -7576,9 +7533,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
* @return Return list of Cells found.
*/
- private List<Cell> get(final Mutation mutation, final Store store,
- final List<Cell> coordinates, final IsolationLevel isolation, final TimeRange tr)
- throws IOException {
+ private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates,
+ IsolationLevel isolation, TimeRange tr) throws IOException {
// Sort the cells so that they match the order that they appear in the Get results. Otherwise,
// we won't be able to find the existing values if the cells are not specified in order by the
// client since cells are in an array list.
@@ -7653,12 +7609,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public long heapSize() {
- long heapSize = DEEP_OVERHEAD;
- for (Store store : this.stores.values()) {
- heapSize += store.heapSize();
- }
// this does not take into account row locks, recent flushes, mvcc entries, and more
- return heapSize;
+ return DEEP_OVERHEAD + stores.values().stream().mapToLong(HStore::heapSize).sum();
}
@Override
@@ -7813,14 +7765,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return The priority that this region should have in the compaction queue
*/
public int getCompactPriority() {
- int count = Integer.MAX_VALUE;
- for (Store store : stores.values()) {
- count = Math.min(count, store.getCompactPriority());
- }
- return count;
+ return stores.values().stream().mapToInt(HStore::getCompactPriority).min()
+ .orElse(Store.NO_PRIORITY);
}
-
/** @return the coprocessor host */
@Override
public RegionCoprocessorHost getCoprocessorHost() {
@@ -7881,11 +7829,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// The unit for snapshot is a region. So, all stores for this region must be
// prepared for snapshot operation before proceeding.
if (op == Operation.SNAPSHOT) {
- for (Store store : stores.values()) {
- if (store instanceof HStore) {
- ((HStore)store).preSnapshotOperation();
- }
- }
+ stores.values().forEach(HStore::preSnapshotOperation);
}
try {
if (coprocessorHost != null) {
@@ -7905,11 +7849,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void closeRegionOperation(Operation operation) throws IOException {
if (operation == Operation.SNAPSHOT) {
- for (Store store: stores.values()) {
- if (store instanceof HStore) {
- ((HStore)store).postSnapshotOperation();
- }
- }
+ stores.values().forEach(HStore::postSnapshotOperation);
}
lock.readLock().unlock();
if (coprocessorHost != null) {
@@ -8142,9 +8082,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void registerChildren(ConfigurationManager manager) {
configurationManager = Optional.of(manager);
- for (Store s : this.stores.values()) {
- configurationManager.get().registerObserver(s);
- }
+ stores.values().forEach(manager::registerObserver);
}
/**
@@ -8152,9 +8090,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
@Override
public void deregisterChildren(ConfigurationManager manager) {
- for (Store s : this.stores.values()) {
- configurationManager.get().deregisterObserver(s);
- }
+ stores.values().forEach(configurationManager.get()::deregisterObserver);
}
@Override
@@ -8175,7 +8111,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " ");
buf.append(getRegionInfo().isMetaTable() ? " meta table " : " ");
buf.append("stores: ");
- for (Store s : getStores()) {
+ for (HStore s : stores.values()) {
buf.append(s.getColumnFamilyDescriptor().getNameAsString());
buf.append(" size: ");
buf.append(s.getSizeOfMemStore().getDataSize());
@@ -8188,4 +8124,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new RuntimeException(buf.toString());
}
}
+
+ @Override
+ public void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker,
+ User user) throws IOException {
+ ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, why, priority, tracker,
+ user);
+ }
+
+ @Override
+ public void requestCompaction(byte[] family, String why, int priority,
+ CompactionLifeCycleTracker tracker, User user) throws IOException {
+ ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this,
+ Preconditions.checkNotNull(stores.get(family)), why, priority, tracker, user);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6bbff36..62987c0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -56,8 +56,8 @@ import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.servlet.http.HttpServlet;
-import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -140,6 +141,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@@ -210,10 +214,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
-
import sun.misc.Signal;
import sun.misc.SignalHandler;
@@ -1686,7 +1686,7 @@ public class HRegionServer extends HasThread implements
int totalStaticBloomSizeKB = 0;
long totalCompactingKVs = 0;
long currentCompactedKVs = 0;
- List<Store> storeList = r.getStores();
+ List<? extends Store> storeList = r.getStores();
stores += storeList.size();
for (Store store : storeList) {
storefiles += store.getStorefilesCount();
@@ -1772,27 +1772,32 @@ public class HRegionServer extends HasThread implements
@Override
protected void chore() {
for (Region r : this.instance.onlineRegions.values()) {
- if (r == null)
+ if (r == null) {
continue;
- for (Store s : r.getStores()) {
+ }
+ HRegion hr = (HRegion) r;
+ for (HStore s : hr.stores.values()) {
try {
long multiplier = s.getCompactionCheckMultiplier();
assert multiplier > 0;
- if (iteration % multiplier != 0) continue;
+ if (iteration % multiplier != 0) {
+ continue;
+ }
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
- this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
- + " requests compaction");
+ this.instance.compactSplitThread.requestSystemCompaction(hr, s,
+ getName() + " requests compaction");
} else if (s.isMajorCompaction()) {
s.triggerMajorCompaction();
- if (majorCompactPriority == DEFAULT_PRIORITY
- || majorCompactPriority > ((HRegion)r).getCompactPriority()) {
- this.instance.compactSplitThread.requestCompaction(r, s, getName()
- + " requests major compaction; use default priority", null);
+ if (majorCompactPriority == DEFAULT_PRIORITY ||
+ majorCompactPriority > hr.getCompactPriority()) {
+ this.instance.compactSplitThread.requestCompaction(hr, s,
+ getName() + " requests major compaction; use default priority", Store.NO_PRIORITY,
+ CompactionLifeCycleTracker.DUMMY, null);
} else {
- this.instance.compactSplitThread.requestCompaction(r, s, getName()
- + " requests major compaction; use configured priority",
- this.majorCompactPriority, null, null);
+ this.instance.compactSplitThread.requestCompaction(hr, s,
+ getName() + " requests major compaction; use configured priority",
+ this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null);
}
}
} catch (IOException e) {
@@ -2146,15 +2151,14 @@ public class HRegionServer extends HasThread implements
@Override
public void postOpenDeployTasks(final PostOpenDeployContext context)
throws KeeperException, IOException {
- Region r = context.getRegion();
+ HRegion r = (HRegion) context.getRegion();
long masterSystemTime = context.getMasterSystemTime();
- Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion");
rpcServices.checkOpen();
LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString());
// Do checks to see if we need to compact (references or too many files)
- for (Store s : r.getStores()) {
+ for (HStore s : r.stores.values()) {
if (s.hasReferences() || s.needsCompaction()) {
- this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
+ this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
}
}
long openSeqNum = r.getOpenSeqNum();
@@ -2863,11 +2867,6 @@ public class HRegionServer extends HasThread implements
return serverName;
}
- @Override
- public CompactionRequestor getCompactionRequester() {
- return this.compactSplitThread;
- }
-
public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){
return this.rsHost;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f011c18..daad241 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@@ -52,13 +53,12 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.FailedArchiveException;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
@@ -82,8 +83,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -92,14 +91,16 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
/**
* A Store holds a column family in a Region. Its a memstore and a set of zero
@@ -477,7 +478,7 @@ public class HStore implements Store {
/**
* @param tabledir {@link Path} to where the table is being stored
* @param hri {@link HRegionInfo} for the region.
- * @param family {@link HColumnDescriptor} describing the column family
+ * @param family {@link ColumnFamilyDescriptor} describing the column family
* @return Path to family/Store home directory.
*/
@Deprecated
@@ -489,7 +490,7 @@ public class HStore implements Store {
/**
* @param tabledir {@link Path} to where the table is being stored
* @param encodedName Encoded region name.
- * @param family {@link HColumnDescriptor} describing the column family
+ * @param family {@link ColumnFamilyDescriptor} describing the column family
* @return Path to family/Store home directory.
*/
@Deprecated
@@ -1386,15 +1387,14 @@ public class HStore implements Store {
}
}
- private List<StoreFile> moveCompatedFilesIntoPlace(
- final CompactionRequest cr, List<Path> newFiles, User user) throws IOException {
+ private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles,
+ User user) throws IOException {
List<StoreFile> sfs = new ArrayList<>(newFiles.size());
for (Path newFile : newFiles) {
assert newFile != null;
- final StoreFile sf = moveFileIntoPlace(newFile);
+ StoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
- final Store thisStore = this;
- getCoprocessorHost().postCompact(thisStore, sf, cr, user);
+ getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user);
}
assert sf != null;
sfs.add(sf);
@@ -1636,23 +1636,12 @@ public class HStore implements Store {
}
@Override
- public CompactionContext requestCompaction() throws IOException {
- return requestCompaction(Store.NO_PRIORITY, null);
- }
-
- @Override
- public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
- throws IOException {
- return requestCompaction(priority, baseRequest, null);
- }
- @Override
- public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
- User user) throws IOException {
+ public Optional<CompactionContext> requestCompaction(int priority,
+ CompactionLifeCycleTracker tracker, User user) throws IOException {
// don't even select for compaction if writes are disabled
if (!this.areWritesEnabled()) {
- return null;
+ return Optional.empty();
}
-
// Before we do compaction, try to get rid of unneeded files to simplify things.
removeUnneededFiles();
@@ -1666,7 +1655,7 @@ public class HStore implements Store {
final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
boolean override = false;
override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc,
- baseRequest, user);
+ tracker, user);
if (override) {
// Coprocessor is overriding normal file selection.
compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
@@ -1695,21 +1684,13 @@ public class HStore implements Store {
}
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompactSelection(
- this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user);
- }
-
- // Selected files; see if we have a compaction with some custom base request.
- if (baseRequest != null) {
- // Update the request with what the system thinks the request should be;
- // its up to the request if it wants to listen.
- compaction.forceSelect(
- baseRequest.combineWith(compaction.getRequest()));
+ this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user);
}
// Finally, we have the resulting files list. Check if we have any files at all.
request = compaction.getRequest();
- final Collection<StoreFile> selectedFiles = request.getFiles();
+ Collection<StoreFile> selectedFiles = request.getFiles();
if (selectedFiles.isEmpty()) {
- return null;
+ return Optional.empty();
}
addToCompactingFiles(selectedFiles);
@@ -1721,6 +1702,7 @@ public class HStore implements Store {
// Set priority, either override value supplied by caller or from store.
request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
+ request.setTracker(tracker);
}
} finally {
this.lock.readLock().unlock();
@@ -1730,7 +1712,7 @@ public class HStore implements Store {
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
+ (request.isAllFiles() ? " (all files)" : ""));
this.region.reportCompactionRequestStart(request.isMajor());
- return compaction;
+ return Optional.of(compaction);
}
/** Adds the files to compacting files. filesCompacting must be locked. */
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
index 020142d..8fa686c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
@@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.util.StringUtils.humanReadableInt;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
-import java.lang.management.MemoryType;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
@@ -50,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
@@ -448,8 +446,8 @@ class MemStoreFlusher implements FlushRequester {
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
if (!this.server.compactSplitThread.requestSplit(region)) {
try {
- this.server.compactSplitThread.requestSystemCompaction(
- region, Thread.currentThread().getName());
+ this.server.compactSplitThread.requestSystemCompaction((HRegion) region,
+ Thread.currentThread().getName());
} catch (IOException e) {
e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e;
@@ -503,8 +501,8 @@ class MemStoreFlusher implements FlushRequester {
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
- server.compactSplitThread.requestSystemCompaction(
- region, Thread.currentThread().getName());
+ server.compactSplitThread.requestSystemCompaction((HRegion) region,
+ Thread.currentThread().getName());
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 2611f69..e30ed8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -761,7 +761,7 @@ class MetricsRegionServerWrapperImpl
tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed();
tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed();
tempBlockedRequestsCount += r.getBlockedRequestsCount();
- List<Store> storeList = r.getStores();
+ List<? extends Store> storeList = r.getStores();
tempNumStores += storeList.size();
for (Store store : storeList) {
tempNumStoreFiles += store.getStorefilesCount();
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
index 667b46c..dc7d3cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java
@@ -95,7 +95,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
@Override
public long getNumStores() {
- Map<byte[],Store> stores = this.region.stores;
+ Map<byte[], HStore> stores = this.region.stores;
if (stores == null) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 02662c4..61c725b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
@@ -1538,7 +1539,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
requestCount.increment();
- Region region = getRegion(request.getRegion());
+ HRegion region = (HRegion) getRegion(request.getRegion());
// Quota support is enabled, the requesting user is not system/super user
// and a quota policy is enforced that disables compactions.
if (QuotaUtil.isQuotaEnabled(getConfiguration()) &&
@@ -1552,7 +1553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString());
boolean major = false;
byte [] family = null;
- Store store = null;
+ HStore store = null;
if (request.hasFamily()) {
family = request.getFamily().toByteArray();
store = region.getStore(family);
@@ -1579,12 +1580,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
+ region.getRegionInfo().getRegionNameAsString() + familyLogMsg);
}
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
- if(family != null) {
- regionServer.compactSplitThread.requestCompaction(region, store, log,
- Store.PRIORITY_USER, null, RpcServer.getRequestUser());
+ if (family != null) {
+ regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER,
+ CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
} else {
- regionServer.compactSplitThread.requestCompaction(region, log,
- Store.PRIORITY_USER, null, RpcServer.getRequestUser());
+ regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER,
+ CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser());
}
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
@@ -1606,7 +1607,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
requestCount.increment();
- Region region = getRegion(request.getRegion());
+ HRegion region = (HRegion) getRegion(request.getRegion());
LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString());
boolean shouldFlush = true;
if (request.hasIfOlderThanTs()) {
@@ -1617,8 +1618,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false;
// Go behind the curtain so we can manage writing of the flush WAL marker
- HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl)
- ((HRegion)region).flushcache(true, writeFlushWalMarker);
+ HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker);
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,