You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/04/25 19:49:01 UTC
svn commit: r1475870 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/wal/
Author: sershe
Date: Thu Apr 25 17:49:01 2013
New Revision: 1475870
URL: http://svn.apache.org/r1475870
Log:
HBASE-8024 Make Store flush algorithm pluggable
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java Thu Apr 25 17:49:01 2013
@@ -37,13 +37,17 @@ import org.apache.hadoop.hbase.regionser
*/
@InterfaceAudience.Private
public class DefaultStoreEngine extends StoreEngine<
- RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
+ DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
+ public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
+ "hbase.hstore.defaultengine.storeflusher.class";
public static final String DEFAULT_COMPACTOR_CLASS_KEY =
"hbase.hstore.defaultengine.compactor.class";
public static final String DEFAULT_COMPACTION_POLICY_CLASS_KEY =
"hbase.hstore.defaultengine.compactionpolicy.class";
+ private static final Class<? extends DefaultStoreFlusher>
+ DEFAULT_STORE_FLUSHER_CLASS = DefaultStoreFlusher.class;
private static final Class<? extends DefaultCompactor>
DEFAULT_COMPACTOR_CLASS = DefaultCompactor.class;
private static final Class<? extends RatioBasedCompactionPolicy>
@@ -69,8 +73,17 @@ public class DefaultStoreEngine extends
} catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '" + className + "'", e);
}
+ className = conf.get(
+ DEFAULT_STORE_FLUSHER_CLASS_KEY, DEFAULT_STORE_FLUSHER_CLASS.getName());
+ try {
+ storeFlusher = ReflectionUtils.instantiateWithCustomCtor(className,
+ new Class[] { Configuration.class, Store.class }, new Object[] { conf, store });
+ } catch (Exception e) {
+ throw new IOException("Unable to load configured store flusher '" + className + "'", e);
+ }
}
+
@Override
public CompactionContext createCompaction() {
return new DefaultCompactionContext();
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java?rev=1475870&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java Thu Apr 25 17:49:01 2013
@@ -0,0 +1,93 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Default implementation of StoreFlusher.
+ */
+public class DefaultStoreFlusher extends StoreFlusher {
+ private static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class);
+ private final Object flushLock = new Object();
+
+ public DefaultStoreFlusher(Configuration conf, Store store) {
+ super(conf, store);
+ }
+
+ @Override
+ public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
+ TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize,
+ MonitoredTask status) throws IOException {
+ ArrayList<Path> result = new ArrayList<Path>();
+ if (snapshot.size() == 0) return result; // don't flush if there are no entries
+
+ // Use a store scanner to find which rows to flush.
+ long smallestReadPoint = store.getSmallestReadPoint();
+ KeyValueScanner memstoreScanner =
+ new CollectionBackedScanner(snapshot, store.getComparator());
+ InternalScanner scanner = preCreateCoprocScanner(memstoreScanner);
+ if (scanner == null) {
+ scanner = createStoreScanner(smallestReadPoint, memstoreScanner);
+ }
+ scanner = postCreateCoprocScanner(scanner);
+ if (scanner == null) {
+ return result; // NULL scanner returned from coprocessor hooks means skip normal processing
+ }
+
+ StoreFile.Writer writer;
+ long flushed = 0;
+ try {
+ // TODO: We can fail in the below block before we complete adding this flush to
+ // list of store files. Add cleanup of anything put on filesystem if we fail.
+ synchronized (flushLock) {
+ status.setStatus("Flushing " + store + ": creating writer");
+ // Write the map out to the disk
+ writer = store.createWriterInTmp(
+ snapshot.size(), store.getFamily().getCompression(), false, true);
+ writer.setTimeRangeTracker(snapshotTimeRangeTracker);
+ try {
+ flushed = performFlush(scanner, writer, smallestReadPoint);
+ } finally {
+ finalizeWriter(writer, cacheFlushId, status);
+ }
+ }
+ } finally {
+ flushedSize.set(flushed);
+ scanner.close();
+ }
+ LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
+ + StringUtils.humanReadableInt(flushed) +", into tmp file " + writer.getPath());
+ result.add(writer.getPath());
+ return result;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Apr 25 17:49:01 2013
@@ -1410,7 +1410,7 @@ public class HRegion implements HeapSize
this.updatesLock.writeLock().lock();
long flushsize = this.memstoreSize.get();
status.setStatus("Preparing to flush by snapshotting stores");
- List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
+ List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
long flushSeqId = -1L;
try {
// Record the mvcc for all transactions in progress.
@@ -1430,12 +1430,12 @@ public class HRegion implements HeapSize
}
for (Store s : stores.values()) {
- storeFlushers.add(s.getStoreFlusher(flushSeqId));
+ storeFlushCtxs.add(s.createFlushContext(flushSeqId));
}
// prepare flush (take a snapshot)
- for (StoreFlusher flusher : storeFlushers) {
- flusher.prepare();
+ for (StoreFlushContext flush : storeFlushCtxs) {
+ flush.prepare();
}
} finally {
this.updatesLock.writeLock().unlock();
@@ -1472,19 +1472,19 @@ public class HRegion implements HeapSize
// just-made new flush store file. The new flushed file is still in the
// tmp directory.
- for (StoreFlusher flusher : storeFlushers) {
- flusher.flushCache(status);
+ for (StoreFlushContext flush : storeFlushCtxs) {
+ flush.flushCache(status);
}
// Switch snapshot (in memstore) -> new hfile (thus causing
// all the store scanners to reset/reseek).
- for (StoreFlusher flusher : storeFlushers) {
- boolean needsCompaction = flusher.commit(status);
+ for (StoreFlushContext flush : storeFlushCtxs) {
+ boolean needsCompaction = flush.commit(status);
if (needsCompaction) {
compactionRequested = true;
}
}
- storeFlushers.clear();
+ storeFlushCtxs.clear();
// Set down the memstore size by amount of flush.
this.addAndGetGlobalMemstoreSize(-flushsize);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Apr 25 17:49:01 2013
@@ -124,7 +124,6 @@ public class HStore implements Store {
static int closeCheckInterval = 0;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
- private final Object flushLock = new Object();
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final boolean verifyBulkLoads;
@@ -146,14 +145,14 @@ public class HStore implements Store {
// Comparing KeyValues
private final KeyValue.KVComparator comparator;
- final StoreEngine<?, ?, ?> storeEngine;
+ final StoreEngine<?, ?, ?, ?> storeEngine;
private static final AtomicBoolean offPeakCompactionTracker = new AtomicBoolean();
private final OffPeakHours offPeakHours;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
- private static int flush_retries_number;
- private static int pauseTime;
+ private int flushRetriesNumber;
+ private int pauseTime;
private long blockingFileCount;
@@ -223,17 +222,13 @@ public class HStore implements Store {
this.checksumType = getChecksumType(conf);
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
- // Create a compaction manager.
- if (HStore.flush_retries_number == 0) {
- HStore.flush_retries_number = conf.getInt(
- "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
- HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE,
- HConstants.DEFAULT_HBASE_SERVER_PAUSE);
- if (HStore.flush_retries_number <= 0) {
- throw new IllegalArgumentException(
- "hbase.hstore.flush.retries.number must be > 0, not "
- + HStore.flush_retries_number);
- }
+ flushRetriesNumber = conf.getInt(
+ "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
+ pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, HConstants.DEFAULT_HBASE_SERVER_PAUSE);
+ if (flushRetriesNumber <= 0) {
+ throw new IllegalArgumentException(
+ "hbase.hstore.flush.retries.number must be > 0, not "
+ + flushRetriesNumber);
}
}
@@ -645,10 +640,10 @@ public class HStore implements Store {
* @param snapshotTimeRangeTracker
* @param flushedSize The number of bytes flushed
* @param status
- * @return Path The path name of the tmp file to which the store was flushed
+ * @return The path name of the tmp file to which the store was flushed
* @throws IOException
*/
- protected Path flushCache(final long logCacheFlushId,
+ protected List<Path> flushCache(final long logCacheFlushId,
SortedSet<KeyValue> snapshot,
TimeRangeTracker snapshotTimeRangeTracker,
AtomicLong flushedSize,
@@ -658,20 +653,21 @@ public class HStore implements Store {
// 'snapshot', the next time flush comes around.
// Retry after catching exception when flushing, otherwise server will abort
// itself
+ StoreFlusher flusher = storeEngine.getStoreFlusher();
IOException lastException = null;
- for (int i = 0; i < HStore.flush_retries_number; i++) {
+ for (int i = 0; i < flushRetriesNumber; i++) {
try {
- Path pathName = internalFlushCache(snapshot, logCacheFlushId,
- snapshotTimeRangeTracker, flushedSize, status);
+ List<Path> pathNames = flusher.flushSnapshot(
+ snapshot, logCacheFlushId, snapshotTimeRangeTracker, flushedSize, status);
+ Path lastPathName = null;
try {
- // Path name is null if there is no entry to flush
- if (pathName != null) {
+ for (Path pathName : pathNames) {
+ lastPathName = pathName;
validateStoreFile(pathName);
}
- return pathName;
+ return pathNames;
} catch (Exception e) {
- LOG.warn("Failed validating store file " + pathName
- + ", retring num=" + i, e);
+ LOG.warn("Failed validating store file " + lastPathName + ", retrying num=" + i, e);
if (e instanceof IOException) {
lastException = (IOException) e;
} else {
@@ -696,109 +692,6 @@ public class HStore implements Store {
}
/*
- * @param cache
- * @param logCacheFlushId
- * @param snapshotTimeRangeTracker
- * @param flushedSize The number of bytes flushed
- * @return Path The path name of the tmp file to which the store was flushed
- * @throws IOException
- */
- private Path internalFlushCache(final SortedSet<KeyValue> set,
- final long logCacheFlushId,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize,
- MonitoredTask status)
- throws IOException {
- StoreFile.Writer writer;
- // Find the smallest read point across all the Scanners.
- long smallestReadPoint = region.getSmallestReadPoint();
- long flushed = 0;
- Path pathName;
- // Don't flush if there are no entries.
- if (set.size() == 0) {
- return null;
- }
- // Use a store scanner to find which rows to flush.
- // Note that we need to retain deletes, hence
- // treat this as a minor compaction.
- InternalScanner scanner = null;
- KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
- if (this.getCoprocessorHost() != null) {
- scanner = this.getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
- }
- if (scanner == null) {
- Scan scan = new Scan();
- scan.setMaxVersions(scanInfo.getMaxVersions());
- scanner = new StoreScanner(this, scanInfo, scan,
- Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
- smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
- }
- if (this.getCoprocessorHost() != null) {
- InternalScanner cpScanner =
- this.getCoprocessorHost().preFlush(this, scanner);
- // NULL scanner returned from coprocessor hooks means skip normal processing
- if (cpScanner == null) {
- return null;
- }
- scanner = cpScanner;
- }
- try {
- int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
- // TODO: We can fail in the below block before we complete adding this
- // flush to list of store files. Add cleanup of anything put on filesystem
- // if we fail.
- synchronized (flushLock) {
- status.setStatus("Flushing " + this + ": creating writer");
- // A. Write the map out to the disk
- writer = createWriterInTmp(set.size());
- writer.setTimeRangeTracker(snapshotTimeRangeTracker);
- pathName = writer.getPath();
- try {
- List<KeyValue> kvs = new ArrayList<KeyValue>();
- boolean hasMore;
- do {
- hasMore = scanner.next(kvs, compactionKVMax);
- if (!kvs.isEmpty()) {
- for (KeyValue kv : kvs) {
- // If we know that this KV is going to be included always, then let us
- // set its memstoreTS to 0. This will help us save space when writing to
- // disk.
- if (kv.getMemstoreTS() <= smallestReadPoint) {
- // let us not change the original KV. It could be in the memstore
- // changing its memstoreTS could affect other threads/scanners.
- kv = kv.shallowCopy();
- kv.setMemstoreTS(0);
- }
- writer.append(kv);
- flushed += this.memstore.heapSizeChange(kv, true);
- }
- kvs.clear();
- }
- } while (hasMore);
- } finally {
- // Write out the log sequence number that corresponds to this output
- // hfile. Also write current time in metadata as minFlushTime.
- // The hfile is current up to and including logCacheFlushId.
- status.setStatus("Flushing " + this + ": appending metadata");
- writer.appendMetadata(logCacheFlushId, false);
- status.setStatus("Flushing " + this + ": closing flushed file");
- writer.close();
- }
- }
- } finally {
- flushedSize.set(flushed);
- scanner.close();
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("Flushed " +
- ", sequenceid=" + logCacheFlushId +
- ", memsize=" + StringUtils.humanReadableInt(flushed) +
- ", into tmp file " + pathName);
- }
- return pathName;
- }
-
- /*
* @param path The pathname of the tmp file into which the store was flushed
* @param logCacheFlushId
* @return StoreFile created.
@@ -872,17 +765,18 @@ public class HStore implements Store {
/*
* Change storeFiles adding into place the Reader produced by this new flush.
- * @param sf
- * @param set That was used to make the passed file <code>p</code>.
+ * @param sfs Store files
+ * @param set That was used to make the passed file.
* @throws IOException
* @return Whether compaction is required.
*/
- private boolean updateStorefiles(final StoreFile sf,
- final SortedSet<KeyValue> set)
- throws IOException {
+ private boolean updateStorefiles(
+ final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
this.lock.writeLock().lock();
try {
- this.storeEngine.getStoreFileManager().insertNewFile(sf);
+ for (StoreFile sf : sfs) {
+ this.storeEngine.getStoreFileManager().insertNewFile(sf);
+ }
this.memstore.clearSnapshot(set);
} finally {
// We need the lock, as long as we are updating the storeFiles
@@ -1747,22 +1641,20 @@ public class HStore implements Store {
}
}
- public StoreFlusher getStoreFlusher(long cacheFlushId) {
+ public StoreFlushContext createFlushContext(long cacheFlushId) {
return new StoreFlusherImpl(cacheFlushId);
}
- private class StoreFlusherImpl implements StoreFlusher {
+ private class StoreFlusherImpl implements StoreFlushContext {
- private long cacheFlushId;
+ private long cacheFlushSeqNum;
private SortedSet<KeyValue> snapshot;
- private StoreFile storeFile;
- private Path storeFilePath;
+ private List<Path> tempFiles;
private TimeRangeTracker snapshotTimeRangeTracker;
- private AtomicLong flushedSize;
+ private final AtomicLong flushedSize = new AtomicLong();
- private StoreFlusherImpl(long cacheFlushId) {
- this.cacheFlushId = cacheFlushId;
- this.flushedSize = new AtomicLong();
+ private StoreFlusherImpl(long cacheFlushSeqNum) {
+ this.cacheFlushSeqNum = cacheFlushSeqNum;
}
@Override
@@ -1774,24 +1666,43 @@ public class HStore implements Store {
@Override
public void flushCache(MonitoredTask status) throws IOException {
- storeFilePath = HStore.this.flushCache(
- cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status);
+ tempFiles = HStore.this.flushCache(
+ cacheFlushSeqNum, snapshot, snapshotTimeRangeTracker, flushedSize, status);
}
@Override
public boolean commit(MonitoredTask status) throws IOException {
- if (storeFilePath == null) {
+ if (this.tempFiles == null || this.tempFiles.isEmpty()) {
return false;
}
- storeFile = HStore.this.commitFile(storeFilePath, cacheFlushId,
- snapshotTimeRangeTracker, flushedSize, status);
- if (HStore.this.getCoprocessorHost() != null) {
- HStore.this.getCoprocessorHost().postFlush(HStore.this, storeFile);
+ List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
+ for (Path storeFilePath : tempFiles) {
+ try {
+ storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum,
+ snapshotTimeRangeTracker, flushedSize, status));
+ } catch (IOException ex) {
+ LOG.error("Failed to commit store file " + storeFilePath, ex);
+ // Try to delete the files we have committed before.
+ for (StoreFile sf : storeFiles) {
+ Path pathToDelete = sf.getPath();
+ try {
+ sf.deleteReader();
+ } catch (IOException deleteEx) {
+ LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
+ Runtime.getRuntime().halt(1);
+ }
+ }
+ throw new IOException("Failed to commit the flush", ex);
+ }
}
- // Add new file to store files. Clear snapshot too while we have
- // the Store write lock.
- return HStore.this.updateStorefiles(storeFile, snapshot);
+ if (HStore.this.getCoprocessorHost() != null) {
+ for (StoreFile sf : storeFiles) {
+ HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
+ }
+ }
+ // Add new file to store files. Clear snapshot too while we have the Store write lock.
+ return HStore.this.updateStorefiles(storeFiles, snapshot);
}
}
@@ -1807,8 +1718,8 @@ public class HStore implements Store {
}
public static final long FIXED_OVERHEAD =
- ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
- + (2 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
+ ClassSize.align(ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+ + (4 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Apr 25 17:49:01 2013
@@ -958,7 +958,7 @@ public class MemStore implements HeapSiz
* @param notpresent True if the kv was NOT present in the set.
* @return Size
*/
- long heapSizeChange(final KeyValue kv, final boolean notpresent) {
+ static long heapSizeChange(final KeyValue kv, final boolean notpresent) {
return notpresent ?
ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
0;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Apr 25 17:49:01 2013
@@ -183,7 +183,7 @@ public interface Store extends HeapSize,
public int getCompactPriority();
- public StoreFlusher getStoreFlusher(long cacheFlushId);
+ public StoreFlushContext createFlushContext(long cacheFlushId);
// Split oriented methods
@@ -320,4 +320,4 @@ public interface Store extends HeapSize,
* @return Whether this store has too many store files.
*/
public boolean hasTooManyStoreFiles();
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java Thu Apr 25 17:49:01 2013
@@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.util.Refl
* they are tied together and replaced together via StoreEngine-s.
*/
@InterfaceAudience.Private
-public abstract class StoreEngine<
- CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
+public abstract class StoreEngine<SF extends StoreFlusher,
+ CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
+ protected SF storeFlusher;
protected CP compactionPolicy;
protected C compactor;
protected SFM storeFileManager;
@@ -47,7 +48,7 @@ public abstract class StoreEngine<
*/
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
- private static final Class<? extends StoreEngine<?, ?, ?>>
+ private static final Class<? extends StoreEngine<?, ?, ?, ?>>
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
/**
@@ -72,6 +73,13 @@ public abstract class StoreEngine<
}
/**
+ * @return Store flusher to use.
+ */
+ public StoreFlusher getStoreFlusher() {
+ return this.storeFlusher;
+ }
+
+ /**
* Creates an instance of a compaction context specific to this engine.
* Doesn't actually select or start a compaction. See CompactionContext class comment.
* @return New CompactionContext object.
@@ -86,9 +94,11 @@ public abstract class StoreEngine<
private void createComponentsOnce(
Configuration conf, Store store, KVComparator kvComparator) throws IOException {
- assert compactor == null && compactionPolicy == null && storeFileManager == null;
+ assert compactor == null && compactionPolicy == null
+ && storeFileManager == null && storeFlusher == null;
createComponents(conf, store, kvComparator);
- assert compactor != null && compactionPolicy != null && storeFileManager != null;
+ assert compactor != null && compactionPolicy != null
+ && storeFileManager != null && storeFlusher != null;
}
/**
@@ -99,11 +109,11 @@ public abstract class StoreEngine<
* @param kvComparator KVComparator for storeFileManager.
* @return StoreEngine to use.
*/
- public static StoreEngine<?, ?, ?> create(
+ public static StoreEngine<?, ?, ?, ?> create(
Store store, Configuration conf, KVComparator kvComparator) throws IOException {
String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName());
try {
- StoreEngine<?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
+ StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
className, new Class[] { }, new Object[] { });
se.createComponentsOnce(conf, store, kvComparator);
return se;
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java?rev=1475870&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java Thu Apr 25 17:49:01 2013
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+
+/**
+ * A package protected interface for a store flushing.
+ * A store flush context carries the state required to prepare/flush/commit the store's cache.
+ */
+@InterfaceAudience.Private
+interface StoreFlushContext {
+
+ /**
+ * Prepare for a store flush (create snapshot)
+ *
+ * Requires pausing writes.
+ *
+ * A very short operation.
+ */
+ void prepare();
+
+ /**
+ * Flush the cache (create the new store file)
+ *
+ * A length operation which doesn't require locking out any function
+ * of the store.
+ *
+ * @throws IOException in case the flush fails
+ */
+ void flushCache(MonitoredTask status) throws IOException;
+
+ /**
+ * Commit the flush - add the store file to the store and clear the
+ * memstore snapshot.
+ *
+ * Requires pausing scans.
+ *
+ * A very short operation
+ *
+ * @return
+ * @throws IOException
+ */
+ boolean commit(MonitoredTask status) throws IOException;
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Thu Apr 25 17:49:01 2013
@@ -19,47 +19,125 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
/**
- * A package protected interface for a store flushing.
- * A store flusher carries the state required to prepare/flush/commit the
- * store's cache.
+ * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
+ * Custom implementation can be provided.
*/
@InterfaceAudience.Private
-interface StoreFlusher {
+abstract class StoreFlusher {
+ protected Configuration conf;
+ protected Store store;
+
+ public StoreFlusher(Configuration conf, Store store) {
+ this.conf = conf;
+ this.store = store;
+ }
/**
- * Prepare for a store flush (create snapshot)
- *
- * Requires pausing writes.
- *
- * A very short operation.
+ * Turns a snapshot of memstore into a set of store files.
+ * @param snapshot Memstore snapshot.
+ * @param cacheFlushSeqNum Log cache flush sequence number.
+ * @param snapshotTimeRangeTracker Time range tracker from the memstore
+ * pertaining to the snapshot.
+ * @param flushedSize Out parameter for the size of the KVs flushed.
+ * @param status Task that represents the flush operation and may be updated with status.
+ * @return List of files written. Can be empty; must not be null.
*/
- void prepare();
+ public abstract List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
+ TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
+ throws IOException;
+
+ protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
+ MonitoredTask status) throws IOException {
+ // Write out the log sequence number that corresponds to this output
+ // hfile. Also write current time in metadata as minFlushTime.
+ // The hfile is current up to and including cacheFlushSeqNum.
+ status.setStatus("Flushing " + store + ": appending metadata");
+ writer.appendMetadata(cacheFlushSeqNum, false);
+ status.setStatus("Flushing " + store + ": closing flushed file");
+ writer.close();
+ }
+
+ /** Calls coprocessor to create a flush scanner based on memstore scanner */
+ protected InternalScanner preCreateCoprocScanner(
+ KeyValueScanner memstoreScanner) throws IOException {
+ if (store.getCoprocessorHost() != null) {
+ return store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
+ }
+ return null;
+ }
+
+ /** Creates the default flush scanner based on memstore scanner */
+ protected InternalScanner createStoreScanner(long smallestReadPoint,
+ KeyValueScanner memstoreScanner) throws IOException {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getScanInfo().getMaxVersions());
+ return new StoreScanner(store, store.getScanInfo(), scan,
+ Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+ smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+ }
/**
- * Flush the cache (create the new store file)
- *
- * A length operation which doesn't require locking out any function
- * of the store.
- *
- * @throws IOException in case the flush fails
+ * Calls coprocessor to create a scanner based on default flush scanner
+ * @return new or default scanner; if null, flush should not proceed.
*/
- void flushCache(MonitoredTask status) throws IOException;
+ protected InternalScanner postCreateCoprocScanner(InternalScanner scanner)
+ throws IOException {
+ if (store.getCoprocessorHost() != null) {
+ return store.getCoprocessorHost().preFlush(store, scanner);
+ }
+ return scanner;
+ }
/**
- * Commit the flush - add the store file to the store and clear the
- * memstore snapshot.
- *
- * Requires pausing scans.
- *
- * A very short operation
- *
- * @return
- * @throws IOException
- */
- boolean commit(MonitoredTask status) throws IOException;
+ * Performs memstore flush, writing data from scanner into sink.
+ * @param scanner Scanner to get data from.
+ * @param sink Sink to write data to. Could be StoreFile.Writer.
+ * @param smallestReadPoint Smallest read point used for the flush.
+ * @return Bytes flushed.
+s */
+ protected long performFlush(InternalScanner scanner,
+ Compactor.CellSink sink, long smallestReadPoint) throws IOException {
+ int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ boolean hasMore;
+ long flushed = 0;
+ do {
+ hasMore = scanner.next(kvs, compactionKVMax);
+ if (!kvs.isEmpty()) {
+ for (KeyValue kv : kvs) {
+ // If we know that this KV is going to be included always, then let us
+ // set its memstoreTS to 0. This will help us save space when writing to
+ // disk.
+ if (kv.getMemstoreTS() <= smallestReadPoint) {
+ // let us not change the original KV. It could be in the memstore
+ // changing its memstoreTS could affect other threads/scanners.
+ kv = kv.shallowCopy();
+ kv.setMemstoreTS(0);
+ }
+ sink.append(kv);
+ flushed += MemStore.heapSizeChange(kv, true);
+ }
+ kvs.clear();
+ }
+ } while (hasMore);
+ return flushed;
+ }
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java Thu Apr 25 17:49:01 2013
@@ -32,6 +32,12 @@ import org.mockito.Mockito;
@Category(SmallTests.class)
public class TestDefaultStoreEngine {
+ public static class DummyStoreFlusher extends DefaultStoreFlusher {
+ public DummyStoreFlusher(Configuration conf, Store store) {
+ super(conf, store);
+ }
+ }
+
public static class DummyCompactor extends DefaultCompactor {
public DummyCompactor(Configuration conf, Store store) {
super(conf, store);
@@ -45,15 +51,18 @@ public class TestDefaultStoreEngine {
}
@Test
- public void testCustomPolicyAndCompactor() throws Exception {
+ public void testCustomParts() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY, DummyCompactor.class.getName());
conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
DummyCompactionPolicy.class.getName());
+ conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+ DummyStoreFlusher.class.getName());
Store mockStore = Mockito.mock(Store.class);
- StoreEngine<?, ?, ?> se = StoreEngine.create(mockStore, conf, new KVComparator());
+ StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, new KVComparator());
Assert.assertTrue(se instanceof DefaultStoreEngine);
Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy);
+ Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
Assert.assertTrue(se.getCompactor() instanceof DummyCompactor);
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Thu Apr 25 17:49:01 2013
@@ -732,10 +732,10 @@ public class TestStore extends TestCase
private static void flushStore(HStore store, long id) throws IOException {
- StoreFlusher storeFlusher = store.getStoreFlusher(id);
- storeFlusher.prepare();
- storeFlusher.flushCache(Mockito.mock(MonitoredTask.class));
- storeFlusher.commit(Mockito.mock(MonitoredTask.class));
+ StoreFlushContext storeFlushCtx = store.createFlushContext(id);
+ storeFlushCtx.prepare();
+ storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
+ storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1475870&r1=1475869&r2=1475870&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Thu Apr 25 17:49:01 2013
@@ -61,10 +61,11 @@ import org.apache.hadoop.hbase.exception
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -548,6 +549,29 @@ public class TestWALReplay {
assertEquals(result.size(), result1b.size());
}
+
+ // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush.
+ // Only throws exception if throwExceptionWhenFlushing is set true.
+ public static class CustomStoreFlusher extends DefaultStoreFlusher {
+ // Switch between throw and not throw exception in flush
+ static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
+
+ public CustomStoreFlusher(Configuration conf, Store store) {
+ super(conf, store);
+ }
+ @Override
+ public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushId,
+ TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status)
+ throws IOException {
+ if (throwExceptionWhenFlushing.get()) {
+ throw new IOException("Simulated exception by tests");
+ }
+ return super.flushSnapshot(snapshot, cacheFlushId, snapshotTimeRangeTracker,
+ flushedSize, status);
+ }
+
+ };
+
/**
* Test that we could recover the data correctly after aborting flush. In the
* test, first we abort flush after writing some data, then writing more data
@@ -568,28 +592,12 @@ public class TestWALReplay {
// of the families during the load of edits so its seqid is not same as
// others to test we do right thing when different seqids.
HLog wal = createWAL(this.conf);
- final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false);
RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
Mockito.doReturn(false).when(rsServices).isAborted();
- HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd,
- rsServices) {
- @Override
- protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException {
- return new HStore(this, family, conf) {
- @Override
- protected Path flushCache(final long logCacheFlushId,
- SortedSet<KeyValue> snapshot,
- TimeRangeTracker snapshotTimeRangeTracker,
- AtomicLong flushedSize, MonitoredTask status) throws IOException {
- if (throwExceptionWhenFlushing.get()) {
- throw new IOException("Simulated exception by tests");
- }
- return super.flushCache(logCacheFlushId, snapshot,
- snapshotTimeRangeTracker, flushedSize, status);
- }
- };
- }
- };
+ Configuration customConf = new Configuration(this.conf);
+ customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
+ CustomStoreFlusher.class.getName());
+ HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, rsServices);
long seqid = region.initialize();
// HRegionServer usually does this. It knows the largest seqid across all
// regions.
@@ -610,7 +618,7 @@ public class TestWALReplay {
assertEquals(writtenRowCount, getScannedCount(scanner));
// Let us flush the region
- throwExceptionWhenFlushing.set(true);
+ CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
try {
region.flushcache();
fail("Injected exception hasn't been thrown");
@@ -630,7 +638,7 @@ public class TestWALReplay {
}
writtenRowCount += moreRow;
// call flush again
- throwExceptionWhenFlushing.set(false);
+ CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
try {
region.flushcache();
} catch (IOException t) {