You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/05/17 23:07:51 UTC
svn commit: r1483992 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/wal/
Author: tedyu
Date: Fri May 17 21:07:51 2013
New Revision: 1483992
URL: http://svn.apache.org/r1483992
Log:
HBASE-7210 Backport HBASE-6059 to 0.94 (Ted Yu)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java Fri May 17 21:07:51 2013
@@ -191,7 +191,10 @@ class Compactor extends Configured {
boolean hasMore;
do {
hasMore = scanner.next(kvs, compactionKVMax);
- if (writer == null && !kvs.isEmpty()) {
+ // Create the writer even if no kv(Empty store file is also ok),
+ // because we need record the max seq id for the store file, see
+ // HBASE-6059
+ if (writer == null) {
writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true,
maxMVCCReadpoint >= smallestReadPoint);
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri May 17 21:07:51 2013
@@ -572,22 +572,13 @@ public class HRegion implements HeapSize
cleanupTmpDir();
// Load in all the HStores.
- // Get minimum of the maxSeqId across all the store.
//
// Context: During replay we want to ensure that we do not lose any data. So, we
// have to be conservative in how we replay logs. For each store, we calculate
- // the maxSeqId up to which the store was flushed. But, since different stores
- // could have a different maxSeqId, we choose the
- // minimum across all the stores.
- // This could potentially result in duplication of data for stores that are ahead
- // of others. ColumnTrackers in the ScanQueryMatchers do the de-duplication, so we
- // do not have to worry.
- // TODO: If there is a store that was never flushed in a long time, we could replay
- // a lot of data. Currently, this is not a problem because we flush all the stores at
- // the same time. If we move to per-cf flushing, we might want to revisit this and send
- // in a vector of maxSeqIds instead of sending in a single number, which has to be the
- // min across all the max.
- long minSeqId = -1;
+ // the maxSeqId up to which the store was flushed. And, skip the edits which
+ // is equal to or lower than maxSeqId for each store.
+ Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+ Bytes.BYTES_COMPARATOR);
long maxSeqId = -1;
// initialized to -1 so that we pick up MemstoreTS from column families
long maxMemstoreTS = -1;
@@ -617,9 +608,8 @@ public class HRegion implements HeapSize
this.stores.put(store.getColumnFamilyName().getBytes(), store);
long storeSeqId = store.getMaxSequenceId();
- if (minSeqId == -1 || storeSeqId < minSeqId) {
- minSeqId = storeSeqId;
- }
+ maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
+ storeSeqId);
if (maxSeqId == -1 || storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
@@ -639,7 +629,7 @@ public class HRegion implements HeapSize
mvcc.initialize(maxMemstoreTS + 1);
// Recover any edits if available.
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(
- this.regiondir, minSeqId, reporter, status));
+ this.regiondir, maxSeqIdInStores, reporter, status));
status.setStatus("Cleaning up detritus from prior splits");
// Get rid of any splits or merges that were lost in-progress. Clean out
@@ -3094,8 +3084,8 @@ public class HRegion implements HeapSize
* make sense in a this single region context only -- until we online.
*
* @param regiondir
- * @param minSeqId Any edit found in split editlogs needs to be in excess of
- * this minSeqId to be applied, else its skipped.
+ * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
+ * the maxSeqId for the store to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
@@ -3103,12 +3093,19 @@ public class HRegion implements HeapSize
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(final Path regiondir,
- final long minSeqId, final CancelableProgressable reporter,
- final MonitoredTask status)
+ Map<byte[], Long> maxSeqIdInStores,
+ final CancelableProgressable reporter, final MonitoredTask status)
throws UnsupportedEncodingException, IOException {
- long seqid = minSeqId;
+ long minSeqIdForTheRegion = -1;
+ for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
+ if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
+ minSeqIdForTheRegion = maxSeqIdInStore;
+ }
+ }
+ long seqid = minSeqIdForTheRegion;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
if (files == null || files.isEmpty()) return seqid;
+
for (Path edits: files) {
if (edits == null || !this.fs.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
@@ -3119,16 +3116,16 @@ public class HRegion implements HeapSize
long maxSeqId = Long.MAX_VALUE;
String fileName = edits.getName();
maxSeqId = Math.abs(Long.parseLong(fileName));
- if (maxSeqId <= minSeqId) {
+ if (maxSeqId <= minSeqIdForTheRegion) {
String msg = "Maximum sequenceid for this log is " + maxSeqId
- + " and minimum sequenceid for the region is " + minSeqId
+ + " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
continue;
}
try {
- seqid = replayRecoveredEdits(edits, seqid, reporter);
+ seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
} catch (IOException e) {
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
if (skipErrors) {
@@ -3145,7 +3142,7 @@ public class HRegion implements HeapSize
this.rsAccounting.clearRegionReplayEditsSize(this.regionInfo.getRegionName());
}
}
- if (seqid > minSeqId) {
+ if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, status);
}
@@ -3162,18 +3159,17 @@ public class HRegion implements HeapSize
/*
* @param edits File of recovered edits.
- * @param minSeqId Minimum sequenceid found in a store file. Edits in log
- * must be larger than this to be replayed.
+ * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in log
+ * must be larger than this to be replayed for each store.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final Path edits,
- final long minSeqId, final CancelableProgressable reporter)
+ Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter)
throws IOException {
- String msg = "Replaying edits from " + edits + "; minSequenceid=" +
- minSeqId + "; path=" + edits;
+ String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
@@ -3181,7 +3177,7 @@ public class HRegion implements HeapSize
HLog.Reader reader = null;
try {
reader = HLog.getReader(this.fs, edits, conf);
- long currentEditSeqId = minSeqId;
+ long currentEditSeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
@@ -3240,12 +3236,6 @@ public class HRegion implements HeapSize
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
- // Now, figure if we should skip this edit.
- if (key.getLogSeqNum() <= currentEditSeqId) {
- skippedEdits++;
- continue;
- }
- currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
@@ -3266,6 +3256,13 @@ public class HRegion implements HeapSize
skippedEdits++;
continue;
}
+ // Now, figure if we should skip this edit.
+ if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily()
+ .getName())) {
+ skippedEdits++;
+ continue;
+ }
+ currentEditSeqId = key.getLogSeqNum();
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri May 17 21:07:51 2013
@@ -1864,6 +1864,10 @@ public class Store extends SchemaConfigu
LOG.warn("StoreFile " + f + " has a null Reader");
return;
}
+ if (r.getEntries() == 0) {
+ LOG.warn("StoreFile " + f + " is a empty store file");
+ return;
+ }
// TODO: Cache these keys rather than make each time?
byte [] fk = r.getFirstKey();
if (fk == null) return;
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri May 17 21:07:51 2013
@@ -268,7 +268,13 @@ public class TestHRegion extends HBaseTe
writer.close();
}
MonitoredTask status = TaskMonitor.get().createStatus(method);
- long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId-1, null, status);
+ Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+ Bytes.BYTES_COMPARATOR);
+ for (Store store : region.getStores().values()) {
+ maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
+ minSeqId - 1);
+ }
+ long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
Get get = new Get(row);
Result result = region.get(get, null);
@@ -314,7 +320,13 @@ public class TestHRegion extends HBaseTe
}
long recoverSeqId = 1030;
MonitoredTask status = TaskMonitor.get().createStatus(method);
- long seqId = region.replayRecoveredEditsIfAny(regiondir, recoverSeqId-1, null, status);
+ Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+ Bytes.BYTES_COMPARATOR);
+ for (Store store : region.getStores().values()) {
+ maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
+ recoverSeqId - 1);
+ }
+ long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
Get get = new Get(row);
Result result = region.get(get, null);
@@ -355,7 +367,14 @@ public class TestHRegion extends HBaseTe
recoveredEditsDir, String.format("%019d", minSeqId-1));
FSDataOutputStream dos= fs.create(recoveredEdits);
dos.close();
- long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId, null, null);
+
+ Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
+ Bytes.BYTES_COMPARATOR);
+ for (Store store : region.getStores().values()) {
+ maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
+ }
+ long seqId = region.replayRecoveredEditsIfAny(regiondir,
+ maxSeqIdInStores, null, null);
assertEquals(minSeqId, seqId);
} finally {
HRegion.closeHRegion(this.region);
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Fri May 17 21:07:51 2013
@@ -183,12 +183,18 @@ public class TestStore extends TestCase
for (int i = 1; i <= storeFileNum; i++) {
// verify the expired store file.
CompactionRequest cr = this.store.requestCompaction();
- assertEquals(1, cr.getFiles().size());
- assertTrue(cr.getFiles().get(0).getReader().getMaxTimestamp() <
- (System.currentTimeMillis() - this.store.scanInfo.getTtl()));
- // Verify that the expired the store has been deleted.
+ // the first is expired normally.
+ // If not the first compaction, there is another empty store file,
+ assertEquals(Math.min(i, 2), cr.getFiles().size());
+ for (int j = 0; i < cr.getFiles().size(); j++) {
+ assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < (System
+ .currentTimeMillis() - this.store.scanInfo.getTtl()));
+ }
+ // Verify that the expired store file is compacted to an empty store file.
this.store.compact(cr);
- assertEquals(storeFileNum - i, this.store.getStorefiles().size());
+ // It is an empty store file.
+ assertEquals(0, this.store.getStorefiles().get(0).getReader()
+ .getEntries());
// Let the next store file expired.
Thread.sleep(sleepTime);
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1483992&r1=1483991&r2=1483992&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri May 17 21:07:51 2013
@@ -45,15 +45,24 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -91,7 +100,7 @@ public class TestWALReplay {
conf.setBoolean("dfs.support.append", true);
// The below config supported by 0.20-append and CDH3b2
conf.setInt("dfs.client.block.recovery.retries", 2);
- TEST_UTIL.startMiniDFSCluster(3);
+ TEST_UTIL.startMiniCluster(3);
Path hbaseRootDir =
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
LOG.info("hbase.rootdir=" + hbaseRootDir);
@@ -100,7 +109,7 @@ public class TestWALReplay {
@AfterClass
public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniDFSCluster();
+ TEST_UTIL.shutdownMiniCluster();
}
@Before
@@ -132,6 +141,100 @@ public class TestWALReplay {
}
/**
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception {
+ final byte[] tableName = Bytes
+ .toBytes("testReplayEditsAfterRegionMovedWithMultiCF");
+ byte[] family1 = Bytes.toBytes("cf1");
+ byte[] family2 = Bytes.toBytes("cf2");
+ byte[] qualifier = Bytes.toBytes("q");
+ byte[] value = Bytes.toBytes("testV");
+ byte[][] familys = { family1, family2 };
+ TEST_UTIL.createTable(tableName, familys);
+ HTable htable = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ Put put = new Put(Bytes.toBytes("r1"));
+ put.add(family1, qualifier, value);
+ htable.put(put);
+ ResultScanner resultScanner = htable.getScanner(new Scan());
+ int count = 0;
+ while (resultScanner.next() != null) {
+ count++;
+ }
+ resultScanner.close();
+ assertEquals(1, count);
+
+ MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster();
+ List<HRegion> regions = hbaseCluster.getRegions(tableName);
+ assertEquals(1, regions.size());
+
+ // move region to another regionserver
+ HRegion destRegion = regions.get(0);
+ int originServerNum = hbaseCluster
+ .getServerWith(destRegion.getRegionName());
+ assertTrue("Please start more than 1 regionserver", hbaseCluster
+ .getRegionServerThreads().size() > 1);
+ int destServerNum = 0;
+ while (destServerNum == originServerNum) {
+ destServerNum++;
+ }
+ HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum);
+ HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum);
+ // move region to destination regionserver
+ moveRegionAndWait(destRegion, destServer);
+
+ // delete the row
+ Delete del = new Delete(Bytes.toBytes("r1"));
+ htable.delete(del);
+ resultScanner = htable.getScanner(new Scan());
+ count = 0;
+ while (resultScanner.next() != null) {
+ count++;
+ }
+ resultScanner.close();
+ assertEquals(0, count);
+
+ // flush region and make major compaction
+ destServer.getOnlineRegion(destRegion.getRegionName()).flushcache();
+ // wait to complete major compaction
+ for (Store store : destServer.getOnlineRegion(destRegion.getRegionName())
+ .getStores().values()) {
+ store.triggerMajorCompaction();
+ }
+ destServer.getOnlineRegion(destRegion.getRegionName()).compactStores();
+
+ // move region to origin regionserver
+ moveRegionAndWait(destRegion, originServer);
+ // abort the origin regionserver
+ originServer.abort("testing");
+
+ // see what we get
+ Result result = htable.get(new Get(Bytes.toBytes("r1")));
+ if (result != null) {
+ assertTrue("Row is deleted, but we get" + result.toString(),
+ (result == null) || result.isEmpty());
+ }
+ resultScanner.close();
+ }
+
+ private void moveRegionAndWait(HRegion destRegion, HRegionServer destServer)
+ throws InterruptedException, MasterNotRunningException,
+ ZooKeeperConnectionException, IOException {
+ HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+ TEST_UTIL.getHBaseAdmin().move(
+ destRegion.getRegionInfo().getEncodedNameAsBytes(),
+ Bytes.toBytes(destServer.getServerName().getServerName()));
+ while (true) {
+ ServerName serverName = master.getAssignmentManager()
+ .getRegionServerOfRegion(destRegion.getRegionInfo());
+ if (serverName != null && serverName.equals(destServer.getServerName())) break;
+ Thread.sleep(10);
+ }
+ }
+
+ /**
* Tests for hbase-2727.
* @throws Exception
* @see https://issues.apache.org/jira/browse/HBASE-2727