You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/11/07 19:10:39 UTC
svn commit: r1539743 [2/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/wal/
test/java/org/apache/hadoop/hbase/coprocessor/
test/java/org/apache/hadoop/hbase/mapreduc...
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1539743&r1=1539742&r2=1539743&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Thu Nov 7 18:10:38 2013
@@ -270,31 +270,33 @@ public class TestWALReplay {
HLog wal1 = createWAL(this.conf);
// Add 1k to each family.
final int countPerFamily = 1000;
+ final AtomicLong sequenceId = new AtomicLong(1);
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
- wal1, htd);
+ wal1, htd, sequenceId);
}
wal1.close();
runWALSplit(this.conf);
HLog wal2 = createWAL(this.conf);
- // Up the sequenceid so that these edits are after the ones added above.
- wal2.setSequenceNumber(wal1.getSequenceNumber());
// Add 1k to each family.
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
- ee, wal2, htd);
+ ee, wal2, htd, sequenceId);
}
wal2.close();
runWALSplit(this.conf);
HLog wal3 = createWAL(this.conf);
- wal3.setSequenceNumber(wal2.getSequenceNumber());
try {
- long wal3SeqId = wal3.getSequenceNumber();
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
long seqid = region.getOpenSeqNum();
- assertTrue(seqid > wal3SeqId);
+ // The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
+ // When opened, this region would apply 6k edits, and increment the sequenceId by 1
+ assertTrue(seqid > sequenceId.get());
+ assertEquals(seqid - 1, sequenceId.get());
+ LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
+ + sequenceId.get());
// TODO: Scan all.
region.close();
@@ -395,8 +397,6 @@ public class TestWALReplay {
HLog wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
long seqid = region.getOpenSeqNum();
- // HRegionServer usually does this. It knows the largest seqid across all regions.
- wal.setSequenceNumber(seqid);
boolean first = true;
for (HColumnDescriptor hcd: htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
@@ -420,8 +420,6 @@ public class TestWALReplay {
HLog wal2 = createWAL(this.conf);
HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
long seqid2 = region2.getOpenSeqNum();
- // HRegionServer usually does this. It knows the largest seqid across all regions.
- wal2.setSequenceNumber(seqid2);
assertTrue(seqid + result.size() < seqid2);
final Result result1b = region2.get(g);
assertEquals(result.size(), result1b.size());
@@ -458,8 +456,6 @@ public class TestWALReplay {
}
};
long seqid3 = region3.initialize();
- // HRegionServer usually does this. It knows the largest seqid across all regions.
- wal3.setSequenceNumber(seqid3);
Result result3 = region3.get(g);
// Assert that count of cells is same as before crash.
assertEquals(result2.size(), result3.size());
@@ -513,8 +509,6 @@ public class TestWALReplay {
HLog wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
long seqid = region.getOpenSeqNum();
- // HRegionServer usually does this. It knows the largest seqid across all regions.
- wal.setSequenceNumber(seqid);
for (HColumnDescriptor hcd: htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
}
@@ -548,8 +542,6 @@ public class TestWALReplay {
HLog wal2 = createWAL(this.conf);
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
long seqid2 = region2.getOpenSeqNum();
- // HRegionServer usually does this. It knows the largest seqid across all regions.
- wal2.setSequenceNumber(seqid2);
assertTrue(seqid + result.size() < seqid2);
final Result result1b = region2.get(g);
@@ -605,12 +597,8 @@ public class TestWALReplay {
Configuration customConf = new Configuration(this.conf);
customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
CustomStoreFlusher.class.getName());
- HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, rsServices);
- long seqid = region.initialize();
- // HRegionServer usually does this. It knows the largest seqid across all
- // regions.
- wal.setSequenceNumber(seqid);
-
+ HRegion region =
+ HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
int writtenRowCount = 10;
List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
htd.getFamilies());
@@ -661,13 +649,8 @@ public class TestWALReplay {
runWALSplit(this.conf);
HLog wal2 = createWAL(this.conf);
Mockito.doReturn(false).when(rsServices).isAborted();
- HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd,
- rsServices);
- long seqid2 = region2.initialize();
- // HRegionServer usually does this. It knows the largest seqid across all
- // regions.
- wal2.setSequenceNumber(seqid2);
-
+ HRegion region2 =
+ HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
scanner = region2.getScanner(new Scan());
assertEquals(writtenRowCount, getScannedCount(scanner));
}
@@ -706,12 +689,13 @@ public class TestWALReplay {
final HLog wal = createWAL(this.conf);
final byte[] rowName = tableName.getName();
final byte[] regionName = hri.getEncodedNameAsBytes();
+ final AtomicLong sequenceId = new AtomicLong(1);
// Add 1k to each family.
final int countPerFamily = 1000;
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
- ee, wal, htd);
+ ee, wal, htd, sequenceId);
}
// Add a cache flush, shouldn't have any effect
@@ -723,14 +707,14 @@ public class TestWALReplay {
long now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
- wal.append(hri, tableName, edit, now, htd);
+ wal.append(hri, tableName, edit, now, htd, sequenceId);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
KeyValue.Type.DeleteFamily));
- wal.append(hri, tableName, edit, now, htd);
+ wal.append(hri, tableName, edit, now, htd, sequenceId);
// Sync.
wal.sync();
@@ -767,7 +751,7 @@ public class TestWALReplay {
long seqid = region.initialize();
// We flushed during init.
assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
- assertTrue(seqid > wal.getSequenceNumber());
+ assertTrue(seqid - 1 == sequenceId.get());
Get get = new Get(rowName);
Result result = region.get(get);
@@ -800,15 +784,9 @@ public class TestWALReplay {
MockHLog wal = createMockWAL(this.conf);
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
- long seqid = region.getOpenSeqNum();
- // HRegionServer usually does this. It knows the largest seqid across all
- // regions.
- wal.setSequenceNumber(seqid);
for (HColumnDescriptor hcd : htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
}
- // get the seq no after first set of entries.
- long sequenceNumber = wal.getSequenceNumber();
// Let us flush the region
// But this time completeflushcache is not yet done
@@ -816,7 +794,7 @@ public class TestWALReplay {
for (HColumnDescriptor hcd : htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
}
- long lastestSeqNumber = wal.getSequenceNumber();
+ long lastestSeqNumber = region.getSequenceId().get();
// get the current seq no
wal.doCompleteCacheFlush = true;
// allow complete cache flush with the previous seq number got after first
@@ -891,9 +869,9 @@ public class TestWALReplay {
}
}
- private void addWALEdits (final TableName tableName, final HRegionInfo hri,
- final byte [] rowName, final byte [] family,
- final int count, EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd)
+ private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
+ final byte[] family, final int count, EnvironmentEdge ee, final HLog wal,
+ final HTableDescriptor htd, final AtomicLong sequenceId)
throws IOException {
String familyStr = Bytes.toString(family);
for (int j = 0; j < count; j++) {
@@ -902,7 +880,7 @@ public class TestWALReplay {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes,
ee.currentTimeMillis(), columnBytes));
- wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd);
+ wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId);
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java?rev=1539743&r1=1539742&r2=1539743&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java Thu Nov 7 18:10:38 2013
@@ -52,6 +52,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
@Category(LargeTests.class)
@RunWith(Parameterized.class)
@@ -74,6 +75,7 @@ public class TestReplicationHLogReaderMa
private PathWatcher pathWatcher;
private int nbRows;
private int walEditKVs;
+ private final AtomicLong sequenceId = new AtomicLong(1);
@Parameters
public static Collection<Object[]> parameters() {
@@ -206,7 +208,7 @@ public class TestReplicationHLogReaderMa
}
private void appendToLogPlus(int count) throws IOException {
- log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd);
+ log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId);
}
private WALEdit getWALEdits(int count) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1539743&r1=1539742&r2=1539743&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Thu Nov 7 18:10:38 2013
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -188,7 +189,7 @@ public class TestReplicationSourceManage
listeners.add(replication);
HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), logName,
conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8"));
-
+ final AtomicLong sequenceId = new AtomicLong(1);
manager.init();
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(f1));
@@ -200,7 +201,7 @@ public class TestReplicationSourceManage
LOG.info(i);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
- hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
+ hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
}
// Simulate a rapid insert that's followed
@@ -211,7 +212,7 @@ public class TestReplicationSourceManage
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
- hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
+ hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
}
assertEquals(6, manager.getHLogs().get(slaveId).size());
@@ -221,7 +222,7 @@ public class TestReplicationSourceManage
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
- hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
+ hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
assertEquals(1, manager.getHLogs().size());