You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2016/02/26 18:03:48 UTC
[1/2] hbase git commit: HBASE-15205 Do not find the replication scope
for every WAL#append() (Ram)
Repository: hbase
Updated Branches:
refs/heads/master 538815d82 -> 8f2bd0601
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 567e09d..e9bb468 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -208,13 +210,17 @@ public class TestWALLockup {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
byte [] bytes = Bytes.toBytes(getName());
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ scopes.put(COLUMN_FAMILY_BYTES, 0);
try {
// First get something into memstore. Make a Put and then pull the Cell out of it. Will
// manage append and sync carefully in below to manufacture hang. We keep adding same
// edit. WAL subsystem doesn't care.
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
- WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName());
+ WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(),
+ scopes);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@@ -228,7 +234,7 @@ public class TestWALLockup {
LOG.info("SET throwing of exception on append");
dodgyWAL.throwException = true;
// This append provokes a WAL roll request
- dodgyWAL.append(htd, region.getRegionInfo(), key, edit, true);
+ dodgyWAL.append(region.getRegionInfo(), key, edit, true);
boolean exception = false;
try {
dodgyWAL.sync();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index fd6d535..c60b225 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -28,7 +28,9 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.NavigableMap;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.lang.mutable.MutableBoolean;
@@ -152,12 +154,9 @@ public class TestFSHLog {
}
}
- protected void addEdits(WAL log,
- HRegionInfo hri,
- HTableDescriptor htd,
- int times,
- MultiVersionConcurrencyControl mvcc)
- throws IOException {
+ protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
+ MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
+ throws IOException {
final byte[] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
@@ -165,8 +164,8 @@ public class TestFSHLog {
cols.add(new KeyValue(row, row, row, timestamp, row));
WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
- HConstants.NO_NONCE, mvcc);
- log.append(htd, hri, key, cols, true);
+ HConstants.NO_NONCE, mvcc, scopes);
+ log.append(hri, key, cols, true);
}
log.sync();
}
@@ -261,11 +260,21 @@ public class TestFSHLog {
new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// add edits and roll the wal
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : t1.getFamiliesKeys()) {
+ scopes1.put(fam, 0);
+ }
+ NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : t2.getFamiliesKeys()) {
+ scopes2.put(fam, 0);
+ }
try {
- addEdits(wal, hri1, t1, 2, mvcc);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1);
wal.rollWriter();
// add some more edits and roll the wal. This would reach the log number threshold
- addEdits(wal, hri1, t1, 2, mvcc);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1);
wal.rollWriter();
// with above rollWriter call, the max logs limit is reached.
assertTrue(wal.getNumRolledLogFiles() == 2);
@@ -276,7 +285,7 @@ public class TestFSHLog {
assertEquals(1, regionsToFlush.length);
assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
// insert edits in second region
- addEdits(wal, hri2, t2, 2, mvcc);
+ addEdits(wal, hri2, t2, 2, mvcc, scopes2);
// get the regions to flush, it should still read region1.
regionsToFlush = wal.findRegionsToForceFlush();
assertEquals(regionsToFlush.length, 1);
@@ -293,12 +302,12 @@ public class TestFSHLog {
// no wal should remain now.
assertEquals(0, wal.getNumRolledLogFiles());
// add edits both to region 1 and region 2, and roll.
- addEdits(wal, hri1, t1, 2, mvcc);
- addEdits(wal, hri2, t2, 2, mvcc);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1);
+ addEdits(wal, hri2, t2, 2, mvcc, scopes2);
wal.rollWriter();
// add edits and roll the writer, to reach the max logs limit.
assertEquals(1, wal.getNumRolledLogFiles());
- addEdits(wal, hri1, t1, 2, mvcc);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1);
wal.rollWriter();
// it should return two regions to flush, as the oldest wal file has entries
// for both regions.
@@ -310,7 +319,7 @@ public class TestFSHLog {
wal.rollWriter(true);
assertEquals(0, wal.getNumRolledLogFiles());
// Add an edit to region1, and roll the wal.
- addEdits(wal, hri1, t1, 2, mvcc);
+ addEdits(wal, hri1, t1, 2, mvcc, scopes1);
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
wal.rollWriter();
@@ -360,6 +369,11 @@ public class TestFSHLog {
HBaseTestingUtility.closeRegionAndWAL(r);
final int countPerFamily = 10;
final MutableBoolean goslow = new MutableBoolean(false);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
// subclass and doctor a method.
FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
testName, conf) {
@@ -403,9 +417,9 @@ public class TestFSHLog {
for (int i = 0; i < countPerFamily; i++) {
final HRegionInfo info = region.getRegionInfo();
final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
- wal.append(htd, info, logkey, edits, true);
- }
+ System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
+ wal.append(info, logkey, edits, true);
+ }
region.flush(true);
// FlushResult.flushSequenceId is not visible here so go get the current sequence id.
long currentSequenceId = region.getReadPoint(null);
@@ -439,11 +453,16 @@ public class TestFSHLog {
syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
HRegionInfo hri =
new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < 10; i++) {
- addEdits(log, hri, htd, 1, mvcc);
+ addEdits(log, hri, htd, 1, mvcc, scopes);
}
} finally {
log.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
index 9dccffe..c05e7f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -199,8 +202,13 @@ public class TestLogRollAbort {
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
- log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc), kvs, true);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
+ log.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Send the data to HDFS datanodes and close the HDFS writer
log.sync();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 0c68fc1..9ab7b7d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -139,8 +141,13 @@ public class TestLogRollingNoCluster {
edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
final HTableDescriptor htd = TEST_UTIL.getMetaTableDescriptor();
- final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
- TableName.META_TABLE_NAME, now, mvcc), edit, true);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
+ final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
+ TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
wal.sync(txid);
}
String msg = getName() + " finished";
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index a2c387b..b6bb7a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.ArrayList;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -98,9 +100,13 @@ public class TestWALActionsListener {
edit.add(kv);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(SOME_BYTES));
htd.addFamily(new HColumnDescriptor(b));
-
- final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
- TableName.valueOf(b), 0), edit, true);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
+ final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
+ TableName.valueOf(b), 0, scopes), edit, true);
wal.sync(txid);
if (i == 10) {
wal.registerWALActionsListener(laterobserver);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index dbc06ff..3e894d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -37,7 +37,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.NavigableMap;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -308,9 +310,14 @@ public class TestWALReplay {
// Add 1k to each family.
final int countPerFamily = 1000;
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
- wal1, htd, mvcc);
+ wal1, htd, mvcc, scopes);
}
wal1.shutdown();
runWALSplit(this.conf);
@@ -319,7 +326,7 @@ public class TestWALReplay {
// Add 1k to each family.
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
- ee, wal2, htd, mvcc);
+ ee, wal2, htd, mvcc, scopes);
}
wal2.shutdown();
runWALSplit(this.conf);
@@ -800,9 +807,14 @@ public class TestWALReplay {
// Add 1k to each family.
final int countPerFamily = 1000;
Set<byte[]> familyNames = new HashSet<byte[]>();
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
- ee, wal, htd, mvcc);
+ ee, wal, htd, mvcc, scopes);
familyNames.add(hcd.getName());
}
@@ -815,13 +827,15 @@ public class TestWALReplay {
long now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
+ wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
+ true);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
+ wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
+ true);
// Sync.
wal.sync();
@@ -1046,12 +1060,16 @@ public class TestWALReplay {
deleteDir(basedir);
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ for (byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
HBaseTestingUtility.closeRegionAndWAL(region);
final byte[] family = htd.getColumnFamilies()[0].getName();
final byte[] rowName = tableName.getName();
- FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1);
- FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2);
+ FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
+ FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
Path largeFile = new Path(logDir, "wal-1");
Path smallFile = new Path(logDir, "wal-2");
@@ -1154,8 +1172,8 @@ public class TestWALReplay {
}
private WALKey createWALKey(final TableName tableName, final HRegionInfo hri,
- final MultiVersionConcurrencyControl mvcc) {
- return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc);
+ final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
+ return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
}
private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
@@ -1169,19 +1187,20 @@ public class TestWALReplay {
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
- int index) throws IOException {
+ int index, NavigableMap<byte[], Integer> scopes) throws IOException {
FSWALEntry entry =
- new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit(
- rowName, family, ee, index), htd, hri, true);
+ new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
+ rowName, family, ee, index), hri, true);
entry.stampRegionSequenceId();
return entry;
}
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
+ final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
+ NavigableMap<byte[], Integer> scopes) throws IOException {
for (int j = 0; j < count; j++) {
- wal.append(htd, hri, createWALKey(tableName, hri, mvcc),
+ wal.append(hri, createWALKey(tableName, hri, mvcc, scopes),
createWALEdit(rowName, family, ee, j), true);
}
wal.sync();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index e52a600..a50bbc5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -18,6 +18,9 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -63,6 +66,7 @@ public class TestReplicationBase {
protected static Table htable1;
protected static Table htable2;
+ protected static NavigableMap<byte[], Integer> scopes;
protected static HBaseTestingUtility utility1;
protected static HBaseTestingUtility utility2;
@@ -140,6 +144,11 @@ public class TestReplicationBase {
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
+ scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(HColumnDescriptor f : table.getColumnFamilies()) {
+ scopes.put(f.getName(), f.getScope());
+ }
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
try (Admin admin1 = connection1.getAdmin()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index ab97238..97ccd33 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -26,6 +26,8 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -658,7 +660,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
HRegionInfo hri = new HRegionInfo(htable1.getName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
- Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
+ Replication.scopeWALEdits(new WALKey(), edit,
htable1.getConfiguration(), null);
}
@@ -767,7 +769,10 @@ public class TestReplicationSmallTests extends TestReplicationBase {
HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
HRegionInfo hri = region.getRegionInfo();
-
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) {
+ scopes.put(fam, 1);
+ }
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
@@ -778,8 +783,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
long now = EnvironmentEdgeManager.currentTime();
edit.add(new KeyValue(rowName, famName, qualifier,
now, value));
- WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc);
- wal.append(htable1.getTableDescriptor(), hri, walKey, edit, true);
+ WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
+ wal.append(hri, walKey, edit, true);
wal.sync();
Get get = new Get(rowName);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 22c421d..c906d6a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -58,19 +58,19 @@ public class TestReplicationWALEntryFilters {
// meta
WALKey key1 = new WALKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
- TableName.META_TABLE_NAME);
+ TableName.META_TABLE_NAME, null);
Entry metaEntry = new Entry(key1, null);
assertNull(filter.filter(metaEntry));
// ns table
- WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME);
+ WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME, null);
Entry nsEntry = new Entry(key2, null);
assertNull(filter.filter(nsEntry));
// user table
- WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo"));
+ WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo"), null);
Entry userEntry = new Entry(key3, null);
assertEquals(userEntry, filter.filter(userEntry));
@@ -80,33 +80,30 @@ public class TestReplicationWALEntryFilters {
public void testScopeWALEntryFilter() {
ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
- Entry userEntry = createEntry(a, b);
- Entry userEntryA = createEntry(a);
- Entry userEntryB = createEntry(b);
- Entry userEntryEmpty = createEntry();
+ Entry userEntry = createEntry(null, a, b);
+ Entry userEntryA = createEntry(null, a);
+ Entry userEntryB = createEntry(null, b);
+ Entry userEntryEmpty = createEntry(null);
// no scopes
assertEquals(null, filter.filter(userEntry));
// empty scopes
TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- userEntry = createEntry(a, b);
- userEntry.getKey().setScopes(scopes);
+ userEntry = createEntry(scopes, a, b);
assertEquals(null, filter.filter(userEntry));
// different scope
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
- userEntry = createEntry(a, b);
- userEntry.getKey().setScopes(scopes);
+ userEntry = createEntry(scopes, a, b);
// all kvs should be filtered
assertEquals(userEntryEmpty, filter.filter(userEntry));
// local scope
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
- userEntry = createEntry(a, b);
- userEntry.getKey().setScopes(scopes);
+ userEntry = createEntry(scopes, a, b);
assertEquals(userEntryEmpty, filter.filter(userEntry));
scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryEmpty, filter.filter(userEntry));
@@ -114,8 +111,7 @@ public class TestReplicationWALEntryFilters {
// only scope a
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
- userEntry = createEntry(a, b);
- userEntry.getKey().setScopes(scopes);
+ userEntry = createEntry(scopes, a, b);
assertEquals(userEntryA, filter.filter(userEntry));
scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryA, filter.filter(userEntry));
@@ -123,8 +119,7 @@ public class TestReplicationWALEntryFilters {
// only scope b
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
- userEntry = createEntry(a, b);
- userEntry.getKey().setScopes(scopes);
+ userEntry = createEntry(scopes, a, b);
assertEquals(userEntryB, filter.filter(userEntry));
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryB, filter.filter(userEntry));
@@ -132,8 +127,7 @@ public class TestReplicationWALEntryFilters {
// scope a and b
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
- userEntry = createEntry(a, b);
- userEntry.getKey().setScopes(scopes);
+ userEntry = createEntry(scopes, a, b);
assertEquals(userEntryB, filter.filter(userEntry));
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryB, filter.filter(userEntry));
@@ -155,16 +149,16 @@ public class TestReplicationWALEntryFilters {
@Test
public void testChainWALEntryFilter() {
- Entry userEntry = createEntry(a, b, c);
+ Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
- assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+ assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(passFilter, passFilter);
- assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+ assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
- assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+ assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(nullFilter);
assertEquals(null, filter.filter(userEntry));
@@ -189,7 +183,7 @@ public class TestReplicationWALEntryFilters {
new ChainWALEntryFilter(passFilter),
new ChainWALEntryFilter(passFilter)),
new ChainWALEntryFilter(passFilter));
- assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+ assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter =
@@ -206,19 +200,19 @@ public class TestReplicationWALEntryFilters {
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getTableCFs()).thenReturn(null);
- Entry userEntry = createEntry(a, b, c);
+ Entry userEntry = createEntry(null, a, b, c);
TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
- assertEquals(createEntry(a,b,c), filter.filter(userEntry));
+ assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
// empty map
- userEntry = createEntry(a, b, c);
+ userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(null, filter.filter(userEntry));
// table bar
- userEntry = createEntry(a, b, c);
+ userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs);
@@ -226,24 +220,24 @@ public class TestReplicationWALEntryFilters {
assertEquals(null, filter.filter(userEntry));
// table foo:a
- userEntry = createEntry(a, b, c);
+ userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
- assertEquals(createEntry(a), filter.filter(userEntry));
+ assertEquals(createEntry(null, a), filter.filter(userEntry));
// table foo:a,c
- userEntry = createEntry(a, b, c, d);
+ userEntry = createEntry(null, a, b, c, d);
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
- assertEquals(createEntry(a,c), filter.filter(userEntry));
+ assertEquals(createEntry(null, a,c), filter.filter(userEntry));
}
- private Entry createEntry(byte[]... kvs) {
- WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo"));
+ private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
+ WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo"), scopes);
WALEdit edit1 = new WALEdit();
for (byte[] kv : kvs) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index f042a8d..fb8cfa0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -131,6 +132,7 @@ public class TestReplicationSourceManager {
private static CountDownLatch latch;
private static List<String> files = new ArrayList<String>();
+ private static NavigableMap<byte[], Integer> scopes;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -177,6 +179,11 @@ public class TestReplicationSourceManager {
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
+ scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
hri = new HRegionInfo(htd.getTableName(), r1, r2);
}
@@ -214,15 +221,20 @@ public class TestReplicationSourceManager {
manager.init();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
htd.addFamily(new HColumnDescriptor(f1));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
// Testing normal log rolling every 20
for(long i = 1; i < 101; i++) {
if(i > 1 && i % 20 == 0) {
wal.rollWriter();
}
LOG.info(i);
- final long txid = wal.append(htd,
+ final long txid = wal.append(
hri,
- new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+ new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
wal.sync(txid);
@@ -236,8 +248,8 @@ public class TestReplicationSourceManager {
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
- wal.append(htd, hri,
- new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+ wal.append(hri,
+ new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
}
@@ -254,8 +266,8 @@ public class TestReplicationSourceManager {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
- wal.append(htd, hri,
- new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+ wal.append(hri,
+ new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
wal.sync();
@@ -427,33 +439,35 @@ public class TestReplicationSourceManager {
@Test
public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
- // 1. Create wal key
- WALKey logKey = new WALKey();
- // 2. Get the bulk load wal edit event
- WALEdit logEdit = getBulkLoadWALEdit();
+ NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ // 1. Get the bulk load wal edit event
+ WALEdit logEdit = getBulkLoadWALEdit(scope);
+ // 2. Create wal key
+ WALKey logKey = new WALKey(scope);
// 3. Get the scopes for the key
- Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager);
+ Replication.scopeWALEdits(logKey, logEdit, conf, manager);
// 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
- assertNull("No bulk load entries scope should be added if bulk load replication is diabled.",
- logKey.getScopes());
+ assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
+ logKey.getReplicationScopes());
}
@Test
public void testBulkLoadWALEdits() throws Exception {
- // 1. Create wal key
- WALKey logKey = new WALKey();
- // 2. Get the bulk load wal edit event
- WALEdit logEdit = getBulkLoadWALEdit();
+ // 1. Get the bulk load wal edit event
+ NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ WALEdit logEdit = getBulkLoadWALEdit(scope);
+ // 2. Create wal key
+ WALKey logKey = new WALKey(scope);
// 3. Enable bulk load hfile replication
Configuration bulkLoadConf = HBaseConfiguration.create(conf);
bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
// 4. Get the scopes for the key
- Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager);
+ Replication.scopeWALEdits(logKey, logEdit, bulkLoadConf, manager);
- NavigableMap<byte[], Integer> scopes = logKey.getScopes();
+ NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
// Assert family with replication scope global is present in the key scopes
assertTrue("This family scope is set to global, should be part of replication key scopes.",
scopes.containsKey(f1));
@@ -462,17 +476,16 @@ public class TestReplicationSourceManager {
scopes.containsKey(f2));
}
- private WALEdit getBulkLoadWALEdit() {
+ private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
List<Path> p = new ArrayList<>(1);
p.add(new Path(Bytes.toString(f1)));
storeFiles.put(f1, p);
-
+ scope.put(f1, 1);
p = new ArrayList<>(1);
p.add(new Path(Bytes.toString(f2)));
storeFiles.put(f2, p);
-
// 2. Create bulk load descriptor
BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
index 2ad34ea..3ef658f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
@@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager {
private static final HRegionInfo info = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
+ private static NavigableMap<byte[], Integer> scopes;
private WAL log;
private ReplicationWALReaderManager logManager;
@@ -123,6 +126,11 @@ public class TestReplicationWALReaderManager {
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
+ scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
}
@AfterClass
@@ -204,9 +212,8 @@ public class TestReplicationWALReaderManager {
}
private void appendToLogPlus(int count) throws IOException {
- final long txid = log.append(htd, info,
- new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
- getWALEdits(count), true);
+ final long txid = log.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
log.sync(txid);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
index 6eac388..79b94cf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
// imports for things that haven't moved yet
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
@@ -60,12 +59,12 @@ public class FaultyFSLog extends FSHLog {
}
@Override
- public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- boolean inMemstore) throws IOException {
+ public long append(HRegionInfo info, WALKey key,
+ WALEdit edits, boolean inMemstore) throws IOException {
if (this.ft == FailureType.APPEND) {
throw new IOException("append");
}
- return super.append(htd, info, key, edits, inMemstore);
+ return super.append(info, key, edits, inMemstore);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
index 89c63a6..9b6ac54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
@@ -25,8 +25,10 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashSet;
+import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -151,23 +153,25 @@ public class TestDefaultWALProvider {
protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd,
- int times) throws IOException {
+ int times, NavigableMap<byte[], Integer> scopes) throws IOException {
final byte[] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
- log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
- cols, true);
+ log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes),
+ cols, true);
}
log.sync();
}
/**
* used by TestDefaultWALProviderWithHLogKey
+ * @param scopes
*/
- WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
- return new WALKey(info, tableName, timestamp, mvcc);
+ WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp,
+ NavigableMap<byte[], Integer> scopes) {
+ return new WALKey(info, tableName, timestamp, mvcc, scopes);
}
/**
@@ -191,6 +195,16 @@ public class TestDefaultWALProvider {
final HTableDescriptor htd2 =
new HTableDescriptor(TableName.valueOf("testLogCleaning2"))
.addFamily(new HColumnDescriptor("row"));
+ NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes1.put(fam, 0);
+ }
+ NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd2.getFamiliesKeys()) {
+ scopes2.put(fam, 0);
+ }
final Configuration localConf = new Configuration(conf);
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
@@ -205,26 +219,26 @@ public class TestDefaultWALProvider {
// Add a single edit and make sure that rolling won't remove the file
// Before HBASE-3198 it used to delete it
- addEdits(log, hri, htd, 1);
+ addEdits(log, hri, htd, 1, scopes1);
log.rollWriter();
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
// See if there's anything wrong with more than 1 edit
- addEdits(log, hri, htd, 2);
+ addEdits(log, hri, htd, 2, scopes1);
log.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
// Now mix edits from 2 regions, still no flushing
- addEdits(log, hri, htd, 1);
- addEdits(log, hri2, htd2, 1);
- addEdits(log, hri, htd, 1);
- addEdits(log, hri2, htd2, 1);
+ addEdits(log, hri, htd, 1, scopes1);
+ addEdits(log, hri2, htd2, 1, scopes2);
+ addEdits(log, hri, htd, 1, scopes1);
+ addEdits(log, hri2, htd2, 1, scopes2);
log.rollWriter();
assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
// Flush the first region, we expect to see the first two files getting
// archived. We need to append something or writer won't be rolled.
- addEdits(log, hri2, htd2, 1);
+ addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.rollWriter();
@@ -233,7 +247,7 @@ public class TestDefaultWALProvider {
// Flush the second region, which removes all the remaining output files
// since the oldest was completely flushed and the two others only contain
// flush information
- addEdits(log, hri2, htd2, 1);
+ addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys());
log.completeCacheFlush(hri2.getEncodedNameAsBytes());
log.rollWriter();
@@ -264,6 +278,16 @@ public class TestDefaultWALProvider {
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
HTableDescriptor table2 =
new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
+ NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : table1.getFamiliesKeys()) {
+ scopes1.put(fam, 0);
+ }
+ NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : table2.getFamiliesKeys()) {
+ scopes2.put(fam, 0);
+ }
final Configuration localConf = new Configuration(conf);
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
@@ -281,31 +305,31 @@ public class TestDefaultWALProvider {
hri2.setSplit(false);
// variables to mock region sequenceIds.
// start with the testing logic: insert a waledit, and roll writer
- addEdits(wal, hri1, table1, 1);
+ addEdits(wal, hri1, table1, 1, scopes1);
wal.rollWriter();
// assert that the wal is rolled
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
// add edits in the second wal file, and roll writer.
- addEdits(wal, hri1, table1, 1);
+ addEdits(wal, hri1, table1, 1, scopes1);
wal.rollWriter();
// assert that the wal is rolled
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// add a waledit to table1, and flush the region.
- addEdits(wal, hri1, table1, 3);
+ addEdits(wal, hri1, table1, 3, scopes1);
flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys());
// roll log; all old logs should be archived.
wal.rollWriter();
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
// add an edit to table2, and roll writer
- addEdits(wal, hri2, table2, 1);
+ addEdits(wal, hri2, table2, 1, scopes2);
wal.rollWriter();
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
// add edits for table1, and roll writer
- addEdits(wal, hri1, table1, 2);
+ addEdits(wal, hri1, table1, 2, scopes1);
wal.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// add edits for table2, and flush hri1.
- addEdits(wal, hri2, table2, 2);
+ addEdits(wal, hri2, table2, 2, scopes2);
flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys());
// the log : region-sequenceId map is
// log1: region2 (unflushed)
@@ -315,7 +339,7 @@ public class TestDefaultWALProvider {
wal.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// flush region2, and all logs should be archived.
- addEdits(wal, hri2, table2, 2);
+ addEdits(wal, hri2, table2, 2, scopes2);
flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys());
wal.rollWriter();
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
index 1885d87..ef92768 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.wal;
+import java.util.NavigableMap;
+
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -28,7 +30,8 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@Category({RegionServerTests.class, LargeTests.class})
public class TestDefaultWALProviderWithHLogKey extends TestDefaultWALProvider {
@Override
- WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
- return new HLogKey(info, tableName, timestamp, mvcc);
+ WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp,
+ final NavigableMap<byte[], Integer> scopes) {
+ return new HLogKey(info, tableName, timestamp, mvcc, scopes);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
index 079e0cb..caa0a45 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@@ -79,6 +81,11 @@ public class TestSecureWAL {
TableName tableName = TableName.valueOf("TestSecureWAL");
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
final int total = 10;
@@ -95,8 +102,8 @@ public class TestSecureWAL {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
- wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis()), kvs, true);
+ wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), scopes), kvs, true);
}
wal.sync();
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 747977a..0eef3b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -181,6 +183,11 @@ public class TestWALFactory {
}
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
// Add edits for three regions.
for (int ii = 0; ii < howmany; ii++) {
@@ -196,8 +203,8 @@ public class TestWALFactory {
System.currentTimeMillis(), column));
LOG.info("Region " + i + ": " + edit);
WALKey walKey = new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc);
- log.append(htd, infos[i], walKey, edit, true);
+ System.currentTimeMillis(), mvcc, scopes);
+ log.append(infos[i], walKey, edit, true);
walKey.getWriteEntry();
}
log.sync();
@@ -249,13 +256,18 @@ public class TestWALFactory {
null,null, false);
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
final WAL wal = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
- wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc), kvs, true);
+ wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Now call sync and try reading. Opening a Reader before you sync just
// gives you EOFE.
@@ -273,8 +285,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
- wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc), kvs, true);
+ wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();
reader = wals.createReader(fs, walPath);
@@ -295,8 +307,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
- wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc), kvs, true);
+ wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
@@ -370,12 +382,17 @@ public class TestWALFactory {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
- wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis()), kvs, true);
+ wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), scopes), kvs, true);
}
// Now call sync to send the data to HDFS datanodes
wal.sync();
@@ -485,6 +502,11 @@ public class TestWALFactory {
final HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
"column"));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
final byte [] row = Bytes.toBytes("row");
WAL.Reader reader = null;
try {
@@ -503,9 +525,9 @@ public class TestWALFactory {
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
final WAL log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
- final long txid = log.append(htd, info,
+ final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
- mvcc),
+ mvcc, scopes),
cols, true);
log.sync(txid);
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
@@ -545,6 +567,11 @@ public class TestWALFactory {
final HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
"column"));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
final byte [] row = Bytes.toBytes("row");
WAL.Reader reader = null;
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
@@ -561,9 +588,9 @@ public class TestWALFactory {
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
- final long txid = log.append(htd, hri,
+ final long txid = log.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
- mvcc),
+ mvcc, scopes),
cols, true);
log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
@@ -607,7 +634,11 @@ public class TestWALFactory {
long timestamp = System.currentTimeMillis();
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
-
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
HRegionInfo hri = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
@@ -617,8 +648,8 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[]{(byte) (i + '0')}));
- log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc), cols, true);
+ log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), cols, true);
}
log.sync();
assertEquals(COL_COUNT, visitor.increments);
@@ -627,8 +658,8 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(11)),
timestamp, new byte[]{(byte) (11 + '0')}));
- log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc), cols, true);
+ log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), cols, true);
log.sync();
assertEquals(COL_COUNT, visitor.increments);
}
@@ -722,8 +753,9 @@ public class TestWALFactory {
}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
- //To change body of implemented methods use File | Settings | File Templates.
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
+ // To change body of implemented methods use File | Settings | File
+ // Templates.
increments++;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
index 9ae98c6..beac9e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.LogFactory;
@@ -96,6 +98,11 @@ public class TestWALReaderOnSecureWAL {
TableName tableName = TableName.valueOf(tblName);
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
final int total = 10;
@@ -109,8 +116,8 @@ public class TestWALReaderOnSecureWAL {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
- wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
- System.currentTimeMillis(), mvcc), kvs, true);
+ wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index e138174..4a15d3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -23,8 +23,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@@ -128,6 +130,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
private final int syncInterval;
private final HTableDescriptor htd;
private final Sampler loopSampler;
+ private final NavigableMap<byte[], Integer> scopes;
WALPutBenchmark(final HRegion region, final HTableDescriptor htd,
final long numIterations, final boolean noSync, final int syncInterval,
@@ -138,6 +141,11 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
this.numFamilies = htd.getColumnFamilies().length;
this.region = region;
this.htd = htd;
+ scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
if (spanReceivers == null || spanReceivers.isEmpty()) {
loopSampler = Sampler.NEVER;
@@ -180,8 +188,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
HRegionInfo hri = region.getRegionInfo();
final WALKey logkey =
- new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc);
- wal.append(htd, hri, logkey, walEdit, true);
+ new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
+ wal.append(hri, logkey, walEdit, true);
if (!this.noSync) {
if (++lastSync >= this.syncInterval) {
wal.sync();
@@ -498,8 +506,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
private int appends = 0;
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
- WALEdit logEdit) {
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
this.appends++;
if (this.appends % whenToRoll == 0) {
LOG.info("Rolling after " + appends + " edits");
[2/2] hbase git commit: HBASE-15205 Do not find the replication scope
for every WAL#append() (Ram)
Posted by ra...@apache.org.
HBASE-15205 Do not find the replication scope for every WAL#append() (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f2bd060
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f2bd060
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f2bd060
Branch: refs/heads/master
Commit: 8f2bd06019869a1738bcfd66066737cdb7802ca8
Parents: 538815d
Author: ramkrishna <ra...@gmail.com>
Authored: Fri Feb 26 22:30:55 2016 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Fri Feb 26 22:30:55 2016 +0530
----------------------------------------------------------------------
.../hbase/protobuf/ReplicationProtbufUtil.java | 2 +-
.../hadoop/hbase/regionserver/HRegion.java | 89 +++++++++-----
.../hadoop/hbase/regionserver/HStore.java | 2 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 13 +-
.../hbase/regionserver/wal/FSWALEntry.java | 10 +-
.../hadoop/hbase/regionserver/wal/HLogKey.java | 48 ++++++--
.../regionserver/wal/WALActionsListener.java | 8 +-
.../hadoop/hbase/regionserver/wal/WALUtil.java | 57 +++++----
.../hbase/replication/ScopeWALEntryFilter.java | 2 +-
.../replication/regionserver/Replication.java | 70 +++--------
.../hadoop/hbase/wal/DisabledWALProvider.java | 4 +-
.../java/org/apache/hadoop/hbase/wal/WAL.java | 9 +-
.../org/apache/hadoop/hbase/wal/WALKey.java | 121 +++++++++++++++----
.../apache/hadoop/hbase/wal/WALSplitter.java | 2 +-
.../org/apache/hadoop/hbase/TestIOFencing.java | 3 +-
.../hbase/coprocessor/TestWALObserver.java | 48 +++++---
.../hbase/mapreduce/TestHLogRecordReader.java | 7 +-
.../hbase/mapreduce/TestImportExport.java | 16 +--
.../hbase/mapreduce/TestWALRecordReader.java | 20 +--
.../master/TestDistributedLogSplitting.java | 9 +-
.../hadoop/hbase/regionserver/TestBulkLoad.java | 17 +--
.../hadoop/hbase/regionserver/TestHRegion.java | 16 +--
.../regionserver/TestHRegionReplayEvents.java | 6 +-
.../regionserver/TestHRegionServerBulkLoad.java | 3 +-
.../hbase/regionserver/TestWALLockup.java | 10 +-
.../hbase/regionserver/wal/TestFSHLog.java | 57 ++++++---
.../regionserver/wal/TestLogRollAbort.java | 12 +-
.../wal/TestLogRollingNoCluster.java | 11 +-
.../wal/TestWALActionsListener.java | 12 +-
.../hbase/regionserver/wal/TestWALReplay.java | 47 ++++---
.../hbase/replication/TestReplicationBase.java | 9 ++
.../replication/TestReplicationSmallTests.java | 13 +-
.../TestReplicationWALEntryFilters.java | 62 +++++-----
.../TestReplicationSourceManager.java | 57 +++++----
.../TestReplicationWALReaderManager.java | 13 +-
.../apache/hadoop/hbase/wal/FaultyFSLog.java | 7 +-
.../hbase/wal/TestDefaultWALProvider.java | 64 +++++++---
.../wal/TestDefaultWALProviderWithHLogKey.java | 7 +-
.../apache/hadoop/hbase/wal/TestSecureWAL.java | 11 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 74 ++++++++----
.../hbase/wal/TestWALReaderOnSecureWAL.java | 11 +-
.../hbase/wal/WALPerformanceEvaluation.java | 15 ++-
42 files changed, 685 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 91185af..8cb2237 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -134,7 +134,7 @@ public class ReplicationProtbufUtil {
keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
}
WALEdit edit = entry.getEdit();
- NavigableMap<byte[], Integer> scopes = key.getScopes();
+ NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
if (scopes != null && !scopes.isEmpty()) {
for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b70a4c3..406850e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -17,19 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.TextFormat;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.EOFException;
import java.io.FileNotFoundException;
@@ -195,6 +183,20 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
+
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@@ -583,6 +585,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final Durability durability;
private final boolean regionStatsEnabled;
+ // Stores the replication scope of the various column families of the table
+ // that has non-default scope
+ private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
/**
* HRegion constructor. This constructor should only be used for testing and
@@ -661,6 +667,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
+ Set<byte[]> families = this.htableDescriptor.getFamiliesKeys();
+ for (byte[] family : families) {
+ if (!replicationScope.containsKey(family)) {
+ int scope = htd.getFamily(family).getScope();
+ // Only store those families that has NON-DEFAULT scope
+ if (scope != REPLICATION_SCOPE_LOCAL) {
+ // Do a copy before storing it here.
+ replicationScope.put(Bytes.copy(family), scope);
+ }
+ }
+ }
this.rsServices = rsServices;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
setHTableSpecificConf();
@@ -971,7 +988,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
+ WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
+ mvcc);
}
private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -979,7 +997,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
- WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
+ WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
+ mvcc);
// Store SeqId in HDFS when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
@@ -2285,7 +2304,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
// No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
+ mvcc);
}
// Prepare flush (take a snapshot)
@@ -2334,7 +2354,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false,
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
mvcc);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@@ -2379,7 +2399,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
try {
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
+ mvcc);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -2449,7 +2470,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
+ mvcc);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@@ -2462,7 +2484,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
+ WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -3139,13 +3161,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!replay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
- mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
+ mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
+ this.getReplicationScope());
}
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
try {
- long txid =
- this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ long txid = this.wal.append(this.getRegionInfo(), walKey,
+ walEdit, true);
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
} catch (IOException ioe) {
@@ -3271,8 +3294,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
- currentNonceGroup, currentNonce, mvcc);
- this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
+ this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Complete the mvcc transaction started down in append else it will block others
this.mvcc.complete(walKey.getWriteEntry());
}
@@ -5389,7 +5412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
- WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(),
+ WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
if (this.rsServices != null) {
@@ -6319,6 +6342,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return r.openHRegion(reporter);
}
+ @VisibleForTesting
+ public NavigableMap<byte[], Integer> getReplicationScope() {
+ return this.replicationScope;
+ }
/**
* Useful when reopening a closed region (normally for unit tests)
@@ -7069,10 +7096,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// here instead of WALKey directly to support legacy coprocessors.
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce, mvcc);
+ nonceGroup, nonce, mvcc, this.getReplicationScope());
try {
long txid =
- this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
+ this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
@@ -7362,7 +7389,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 46 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ 47 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@@ -7385,7 +7412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
- + ClassSize.TREEMAP // maxSeqIdInStores
+ + 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
+ ClassSize.STORE_SERVICES // store services
;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 22f99e9..7c71baf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1307,7 +1307,7 @@ public class HStore implements Store {
// Fix reaching into Region to get the maxWaitForSeqId.
// Does this method belong in Region altogether given it is making so many references up there?
// Could be Region#writeCompactionMarker(compactionDescriptor);
- WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getTableDesc(),
+ WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 09da8fc..f3f869c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1083,8 +1082,8 @@ public class FSHLog implements WAL {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
@Override
- public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
- final WALEdit edits, final boolean inMemstore) throws IOException {
+ public long append(final HRegionInfo hri,
+ final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException {
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
@@ -1100,7 +1099,7 @@ public class FSHLog implements WAL {
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
// edit with its edit/sequence id.
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
- entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
+ entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
@@ -1878,14 +1877,12 @@ public class FSHLog implements WAL {
entry.getEdit())) {
if (entry.getEdit().isReplay()) {
// Set replication scope null so that this won't be replicated
- entry.getKey().setScopes(null);
+ entry.getKey().serializeReplicationScope(false);
}
}
if (!listeners.isEmpty()) {
for (WALActionsListener i: listeners) {
- // TODO: Why does listener take a table description and CPs take a regioninfo? Fix.
- i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
- entry.getEdit());
+ i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 86a3c3d..06318f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -26,7 +26,6 @@ import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
@@ -51,15 +50,13 @@ class FSWALEntry extends Entry {
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
private final transient boolean inMemstore;
- private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
private final Set<byte[]> familyNames;
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
- final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
+ final HRegionInfo hri, final boolean inMemstore) {
super(key, edit);
this.inMemstore = inMemstore;
- this.htd = htd;
this.hri = hri;
this.sequence = sequence;
if (inMemstore) {
@@ -71,6 +68,7 @@ class FSWALEntry extends Entry {
Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
for (Cell cell : cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ // TODO: Avoid this clone?
familySet.add(CellUtil.cloneFamily(cell));
}
}
@@ -89,10 +87,6 @@ class FSWALEntry extends Entry {
return this.inMemstore;
}
- HTableDescriptor getHTableDescriptor() {
- return this.htd;
- }
-
HRegionInfo getHRegionInfo() {
return this.hri;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
index 7c40323..d7bf4a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java
@@ -24,6 +24,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.NavigableMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -67,7 +68,7 @@ public class HLogKey extends WALKey implements Writable {
}
public HLogKey(final byte[] encodedRegionName, final TableName tablename) {
- super(encodedRegionName, tablename);
+ super(encodedRegionName, tablename, null);
}
@VisibleForTesting
@@ -75,11 +76,15 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename, now);
}
- public HLogKey(final byte[] encodedRegionName,
- final TableName tablename,
- final long now,
- final MultiVersionConcurrencyControl mvcc) {
- super(encodedRegionName, tablename, now, mvcc);
+ @VisibleForTesting
+ public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ final NavigableMap<byte[], Integer> replicationScope) {
+ super(encodedRegionName, tablename, now, replicationScope);
+ }
+
+ public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ final MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> scopes) {
+ super(encodedRegionName, tablename, now, mvcc, scopes);
}
/**
@@ -111,6 +116,35 @@ public class HLogKey extends WALKey implements Writable {
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
+ * <p>Used by log splitting and snapshots.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param nonceGroup the noncegroup
+ * @param nonce the nonce
+ * @param replicationScope the replicationScope of the non-default column families' of the region
+ */
+ public HLogKey(
+ final byte[] encodedRegionName,
+ final TableName tablename,
+ long logSeqNum,
+ final long now,
+ List<UUID> clusterIds,
+ long nonceGroup,
+ long nonce,
+ MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> replicationScope) {
+ super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
+ replicationScope);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
@@ -192,7 +226,7 @@ public class HLogKey extends WALKey implements Writable {
// encodes the length of encodedRegionName.
// If < 0 we just read the version and the next vint is the length.
// @see Bytes#readByteArray(DataInput)
- setScopes(null); // writable HLogKey does not contain scopes
+ serializeReplicationScope(false); // writable HLogKey does not contain scopes
int len = WritableUtils.readVInt(in);
byte[] tablenameBytes = null;
if (len < 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index db98083..a6452e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -85,7 +84,6 @@ public interface WALActionsListener {
);
/**
- * @param htd
* @param logKey
* @param logEdit TODO: Retire this in favor of
* {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
@@ -93,8 +91,7 @@ public interface WALActionsListener {
* <code>htd</code>.
* @throws IOException If failed to parse the WALEdit
*/
- void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
- throws IOException;
+ void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException;
/**
* For notification post append to the writer. Used by metrics system at least.
@@ -135,8 +132,7 @@ public interface WALActionsListener {
public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
- throws IOException {
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index f268422..197144d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -20,11 +20,11 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -58,10 +58,11 @@ public class WALUtil {
* <p>This write is for internal use only. Not for external client consumption.
* @param mvcc Used by WAL to get sequence Id for the waledit.
*/
- public static WALKey writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
- final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc)
+ public static WALKey writeCompactionMarker(WAL wal,
+ NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri, final CompactionDescriptor c,
+ MultiVersionConcurrencyControl mvcc)
throws IOException {
- WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc);
+ WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
@@ -73,11 +74,11 @@ public class WALUtil {
*
* <p>This write is for internal use only. Not for external client consumption.
*/
- public static WALKey writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
- final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
- throws IOException {
- WALKey walKey =
- doFullAppendTransaction(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
+ public static WALKey writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
+ HRegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ WALKey walKey = doFullAppendTransaction(wal, replicationScope, hri,
+ WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
@@ -88,10 +89,12 @@ public class WALUtil {
* Write a region open marker indicating that the region is opened.
* This write is for internal use only. Not for external client consumption.
*/
- public static WALKey writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
+ public static WALKey writeRegionEventMarker(WAL wal,
+ NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri,
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
throws IOException {
- WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc);
+ WALKey walKey = writeMarker(wal, replicationScope, hri,
+ WALEdit.createRegionEventWALEdit(hri, r), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
@@ -102,28 +105,30 @@ public class WALUtil {
* Write a log marker that a bulk load has succeeded and is about to be committed.
* This write is for internal use only. Not for external client consumption.
* @param wal The log to write into.
- * @param htd A description of the table that we are bulk loading into.
+ * @param replicationScope The replication scope of the families in the HRegion
* @param hri A description of the region in the table that we are bulk loading into.
* @param desc A protocol buffers based description of the client's bulk loading request
* @return walKey with sequenceid filled out for this bulk load marker
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
- public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd,
- final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc,
- final MultiVersionConcurrencyControl mvcc)
- throws IOException {
- WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
+ public static WALKey writeBulkLoadMarkerAndSync(final WAL wal,
+ final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+ final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
+ throws IOException {
+ WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc),
+ mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
return walKey;
}
- private static WALKey writeMarker(final WAL wal, final HTableDescriptor htd,
- final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
+ private static WALKey writeMarker(final WAL wal,
+ final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+ final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
- return doFullAppendTransaction(wal, htd, hri, edit, mvcc, true);
+ return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true);
}
/**
@@ -134,16 +139,16 @@ public class WALUtil {
* <p>This write is for internal use only. Not for external client consumption.
* @return WALKey that was added to the WAL.
*/
- public static WALKey doFullAppendTransaction(final WAL wal, final HTableDescriptor htd,
- final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
- final boolean sync)
+ public static WALKey doFullAppendTransaction(final WAL wal,
+ final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
+ final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
throws IOException {
// TODO: Pass in current time to use?
- WALKey walKey =
- new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc);
+ WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(),
+ System.currentTimeMillis(), mvcc, replicationScope);
long trx = MultiVersionConcurrencyControl.NONE;
try {
- trx = wal.append(htd, hri, walKey, edit, false);
+ trx = wal.append(hri, walKey, edit, false);
if (sync) {
wal.sync(trx);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index f97ec15..28a83dd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -44,7 +44,7 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
- NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
+ NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
if (scopes == null || scopes.isEmpty()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index a5d2446..bb4a5a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -20,13 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
-import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -43,7 +40,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
@@ -61,7 +57,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException;
@@ -257,72 +252,47 @@ public class Replication extends WALActionsListener.Base implements
}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
- throws IOException {
- scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
+ scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
- * @param htd Descriptor used to find the scope to use
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @param replicationManager Manager used to add bulk load events hfile references
* @throws IOException If failed to parse the WALEdit
*/
- public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
- Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
- NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
- byte[] family;
+ public static void scopeWALEdits(WALKey logKey,
+ WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
+ throws IOException {
boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
+ byte[] family;
+ boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
- scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
+ try {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ for (StoreDescriptor s : bld.getStoresList()) {
+ family = s.getFamilyName().toByteArray();
+ addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s);
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get bulk load events information from the wal file.", e);
+ throw e;
+ }
} else {
// Skip the flush/compaction/region events
continue;
}
} else {
- family = CellUtil.cloneFamily(cell);
- // Unexpected, has a tendency to happen in unit tests
- assert htd.getFamily(family) != null;
-
- if (!scopes.containsKey(family)) {
- int scope = htd.getFamily(family).getScope();
- if (scope != REPLICATION_SCOPE_LOCAL) {
- scopes.put(family, scope);
- }
- }
+ foundOtherEdits = true;
}
}
- if (!scopes.isEmpty() && !logEdit.isReplay()) {
- logKey.setScopes(scopes);
- }
- }
-
- private static void scopeBulkLoadEdits(HTableDescriptor htd,
- ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
- TableName tableName, Cell cell) throws IOException {
- byte[] family;
- try {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- for (StoreDescriptor s : bld.getStoresList()) {
- family = s.getFamilyName().toByteArray();
- if (!scopes.containsKey(family)) {
- int scope = htd.getFamily(family).getScope();
- if (scope != REPLICATION_SCOPE_LOCAL) {
- scopes.put(family, scope);
- addHFileRefsToQueue(replicationManager, tableName, family, s);
- }
- } else {
- addHFileRefsToQueue(replicationManager, tableName, family, s);
- }
- }
- } catch (IOException e) {
- LOG.error("Failed to get bulk load events information from the wal file.", e);
- throw e;
+ if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
+ logKey.serializeReplicationScope(false);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 0c41e77..c3d4b2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.FSUtils;
// imports for things that haven't moved from regionserver.wal yet.
@@ -154,8 +153,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- boolean inMemstore) {
+ public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index d2b336e..0b83528 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -25,7 +25,6 @@ import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
// imports we use from yet-to-be-moved regionsever.wal
@@ -106,21 +105,17 @@ public interface WAL {
* completes BUT on return this edit must have its region edit/sequence id assigned
* else it messes up our unification of mvcc and sequenceid. On return <code>key</code> will
* have the region edit/sequence id filled in.
- * @param info
+ * @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
- * @param htd used to give scope for replication TODO refactor out in favor of table name and
- * info
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
- long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
- boolean inMemstore)
- throws IOException;
+ long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException;
/**
* Sync what we have in the WAL.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 09096fe..86fdfbd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -193,7 +193,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
@InterfaceAudience.Private
protected List<UUID> clusterIds;
- private NavigableMap<byte[], Integer> scopes;
+ private NavigableMap<byte[], Integer> replicationScope;
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
@@ -210,7 +210,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
- new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
+ new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
+ }
+
+ public WALKey(final NavigableMap<byte[], Integer> replicationScope) {
+ init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
+ new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
}
@VisibleForTesting
@@ -220,15 +225,16 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
List<UUID> clusterIds = new ArrayList<UUID>();
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
- HConstants.NO_NONCE, HConstants.NO_NONCE, null);
+ HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
/**
* @deprecated Remove. Useless.
*/
@Deprecated // REMOVE
- public WALKey(final byte[] encodedRegionName, final TableName tablename) {
- this(encodedRegionName, tablename, System.currentTimeMillis());
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ final NavigableMap<byte[], Integer> replicationScope) {
+ this(encodedRegionName, tablename, System.currentTimeMillis(), replicationScope);
}
// TODO: Fix being able to pass in sequenceid.
@@ -240,7 +246,20 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
- null);
+ null, null);
+ }
+
+ // TODO: Fix being able to pass in sequenceid.
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ final NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
+ HConstants.NO_NONCE, null, replicationScope);
+ }
+
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
+ MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
+ HConstants.NO_NONCE, mvcc, replicationScope);
}
public WALKey(final byte[] encodedRegionName,
@@ -254,7 +273,33 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
- mvcc);
+ mvcc, null);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ * <p>Used by log splitting and snapshots.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename - name of table
+ * @param logSeqNum - log sequence number
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param nonceGroup the nonceGroup
+ * @param nonce the nonce
+ * @param mvcc the mvcc associate the WALKey
+ * @param replicationScope the non-default replication scope
+ * associated with the region's column families
+ */
+ // TODO: Fix being able to pass in sequenceid.
+ public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
+ final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
+ MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
+ replicationScope);
}
/**
@@ -279,7 +324,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
- init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
+ init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
}
/**
@@ -289,7 +334,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
- * @param tablename
+ * @param tablename the tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
@@ -299,7 +344,31 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
public WALKey(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
- init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
+ null);
+ }
+
+ /**
+ * Create the log key for writing to somewhere.
+ * We maintain the tablename mainly for debugging purposes.
+ * A regionName is always a sub-table object.
+ *
+ * @param encodedRegionName Encoded name of the region as returned by
+ * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
+ * @param tablename
+ * @param now Time at which this edit was written.
+ * @param clusterIds the clusters that have consumed the change(used in Replication)
+ * @param nonceGroup the nonceGroup
+ * @param nonce the nonce
+ * @param mvcc mvcc control used to generate sequence numbers and control read/write points
+ * @param replicationScope the non-default replication scope of the column families
+ */
+ public WALKey(final byte[] encodedRegionName, final TableName tablename,
+ final long now, List<UUID> clusterIds, long nonceGroup,
+ final long nonce, final MultiVersionConcurrencyControl mvcc,
+ NavigableMap<byte[], Integer> replicationScope) {
+ init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
+ replicationScope);
}
/**
@@ -328,7 +397,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
nonceGroup,
nonce,
- mvcc);
+ mvcc, null);
}
@InterfaceAudience.Private
@@ -339,7 +408,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
List<UUID> clusterIds,
long nonceGroup,
long nonce,
- MultiVersionConcurrencyControl mvcc) {
+ MultiVersionConcurrencyControl mvcc,
+ NavigableMap<byte[], Integer> replicationScope) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
@@ -351,6 +421,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
if (logSeqNum != NO_SEQUENCE_ID) {
setSequenceId(logSeqNum);
}
+ this.replicationScope = replicationScope;
}
// For HLogKey and deserialization. DO NOT USE. See setWriteEntry below.
@@ -418,8 +489,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return this.writeTime;
}
- public NavigableMap<byte[], Integer> getScopes() {
- return scopes;
+ public NavigableMap<byte[], Integer> getReplicationScopes() {
+ return replicationScope;
}
/** @return The nonce group */
@@ -432,8 +503,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return nonce;
}
- public void setScopes(NavigableMap<byte[], Integer> scopes) {
- this.scopes = scopes;
+ private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) {
+ this.replicationScope = replicationScope;
+ }
+
+ public void serializeReplicationScope(boolean serialize) {
+ if (!serialize) {
+ setReplicationScope(null);
+ }
}
public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
@@ -450,7 +527,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
}
if (scopes.size() > 0) {
- this.scopes = scopes;
+ this.replicationScope = scopes;
}
}
}
@@ -598,8 +675,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
builder.addClusterIds(uuidBuilder.build());
}
- if (scopes != null) {
- for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
+ if (replicationScope != null) {
+ for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
: compressor.compress(e.getKey(), compressionContext.familyDict);
builder.addScopes(FamilyScope.newBuilder()
@@ -638,13 +715,13 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
if (walKey.hasNonce()) {
this.nonce = walKey.getNonce();
}
- this.scopes = null;
+ this.replicationScope = null;
if (walKey.getScopesCount() > 0) {
- this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ this.replicationScope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walKey.getScopesList()) {
byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
- this.scopes.put(family, scope.getScopeType().getNumber());
+ this.replicationScope.put(family, scope.getScopeType().getNumber());
}
}
setSequenceId(walKey.getLogSequenceNumber());
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 010fd37..ad5774f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -1778,7 +1778,7 @@ public class WALSplitter {
WALEdit edit = entry.getEdit();
TableName table = entry.getKey().getTablename();
// clear scopes which isn't needed for recovery
- entry.getKey().setScopes(null);
+ entry.getKey().serializeReplicationScope(false);
String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
// skip edits of non-existent tables
if (nonExistentTables != null && nonExistentTables.contains(table)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3aae5d5..d8363d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -281,7 +281,8 @@ public class TestIOFencing {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
- WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
+ WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
+ ((HRegion)compactingRegion).getReplicationScope(),
oldHri, compactionDescriptor, compactingRegion.getMVCC());
// Wait till flush has happened, otherwise there won't be multiple store files
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 75fe7a2..03760d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -29,6 +29,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -184,7 +186,11 @@ public class TestWALObserver {
HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
-
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
@@ -235,8 +241,8 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
- long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
- edit, true);
+ long txid = log.append(hri,
+ new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, scopes), edit, true);
log.sync(txid);
// the edit shall have been change now by the coprocessor.
@@ -296,10 +302,15 @@ public class TestWALObserver {
assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
LOG.debug("writing to WAL with non-legacy keys.");
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ scopes.put(hcd.getName(), 0);
+ }
final int countPerFamily = 5;
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
+ EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
}
LOG.debug("Verify that only the non-legacy CP saw edits.");
@@ -323,7 +334,7 @@ public class TestWALObserver {
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
- final long txid = wal.append(htd, hri, legacyKey, edit, true);
+ final long txid = wal.append(hri, legacyKey, edit, true);
wal.sync(txid);
LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@@ -349,7 +360,11 @@ public class TestWALObserver {
final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
-
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for(byte[] fam : htd.getFamiliesKeys()) {
+ scopes.put(fam, 0);
+ }
WAL log = wals.getWAL(UNSPECIFIED_REGION, null);
try {
SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
@@ -360,8 +375,8 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
- long txid = log.append(htd, hri,
- new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
+ long txid = log.append(hri,
+ new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
new WALEdit(), true);
log.sync(txid);
@@ -400,14 +415,18 @@ public class TestWALObserver {
// Put p = creatPutWith2Families(TEST_ROW);
WALEdit edit = new WALEdit();
long now = EnvironmentEdgeManager.currentTime();
- // addFamilyMapToWALEdit(p.getFamilyMap(), edit);
final int countPerFamily = 1000;
- // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ scopes.put(hcd.getName(), 0);
+ }
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
+ EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
}
- wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
+ wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
+ true);
// sync to fs.
wal.sync();
@@ -527,7 +546,8 @@ public class TestWALObserver {
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
- final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
+ final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
+ throws IOException {
String familyStr = Bytes.toString(family);
long txid = -1;
for (int j = 0; j < count; j++) {
@@ -537,7 +557,7 @@ public class TestWALObserver {
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
// uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
- txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
ee.currentTime(), mvcc), edit, true);
}
if (-1 != txid) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index 5fa588b..752faa6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+
+import java.util.NavigableMap;
+
import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogKeyRecordReader;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -32,8 +35,8 @@ import org.junit.experimental.categories.Category;
public class TestHLogRecordReader extends TestWALRecordReader {
@Override
- protected WALKey getWalKey(final long time) {
- return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
+ protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+ return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
index 05f9f36..094fe1c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -34,6 +34,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -656,9 +657,9 @@ public class TestImportExport {
Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
// Register the wal listener for the import table
- TableWALActionListener walListener = new TableWALActionListener(importTableName);
HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
+ TableWALActionListener walListener = new TableWALActionListener(region);
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
wal.registerWALActionsListener(walListener);
@@ -678,7 +679,7 @@ public class TestImportExport {
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
- walListener = new TableWALActionListener(importTableName);
+ walListener = new TableWALActionListener(region);
wal.registerWALActionsListener(walListener);
args = new String[] { importTableName, FQ_OUTPUT_DIR };
assertTrue(runImport(args));
@@ -695,16 +696,17 @@ public class TestImportExport {
*/
private static class TableWALActionListener extends WALActionsListener.Base {
- private String tableName;
+ private HRegionInfo regionInfo;
private boolean isVisited = false;
- public TableWALActionListener(String tableName) {
- this.tableName = tableName;
+ public TableWALActionListener(HRegionInfo region) {
+ this.regionInfo = region;
}
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
- if (tableName.equalsIgnoreCase(htd.getNameAsString()) && (!logEdit.isMetaEdit())) {
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
+ if (logKey.getTablename().getNameAsString().equalsIgnoreCase(
+ this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) {
isVisited = true;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index a4381c8..aee2a06 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -77,6 +79,8 @@ public class TestWALRecordReader {
private static HTableDescriptor htd;
private static Path logDir;
protected MultiVersionConcurrencyControl mvcc;
+ protected static NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
+ Bytes.BYTES_COMPARATOR);
private static String getName() {
return "TestWALRecordReader";
@@ -128,10 +132,10 @@ public class TestWALRecordReader {
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
- log.append(htd, info, getWalKey(ts), edit, true);
+ log.append(info, getWalKey(ts, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
- log.append(htd, info, getWalKey(ts+1), edit, true);
+ log.append(info, getWalKey(ts+1, scopes), edit, true);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
@@ -142,10 +146,10 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
- log.append(htd, info, getWalKey(ts1+1), edit, true);
+ log.append(info, getWalKey(ts1+1, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
- log.append(htd, info, getWalKey(ts1+2), edit, true);
+ log.append(info, getWalKey(ts1+2, scopes), edit, true);
log.sync();
log.shutdown();
walfactory.shutdown();
@@ -187,7 +191,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
- long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
+ long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -197,7 +201,7 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
- txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
+ txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
@@ -236,8 +240,8 @@ public class TestWALRecordReader {
testSplit(splits.get(1));
}
- protected WALKey getWalKey(final long time) {
- return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
+ protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
+ return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
}
protected WALRecordReader getReader() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
index c5728cf..cff8db0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
@@ -1301,9 +1301,8 @@ public class TestDistributedLogSplitting {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
- wal.append(htd, curRegionInfo,
- new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
- e, true);
+ wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), null), e, true);
}
wal.sync();
wal.shutdown();
@@ -1397,7 +1396,7 @@ public class TestDistributedLogSplitting {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
- wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
+ wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
tableName, System.currentTimeMillis()), e, true);
}
wal.sync();
@@ -1609,7 +1608,7 @@ public class TestDistributedLogSplitting {
// key
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
- log.append(htd, curRegionInfo,
+ log.append(curRegionInfo,
new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
System.currentTimeMillis()), e, true);
if (0 == i % syncEvery) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 3a7aff0..d0633a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -87,6 +87,7 @@ public class TestBulkLoad {
private final byte[] randomBytes = new byte[100];
private final byte[] family1 = Bytes.toBytes("family1");
private final byte[] family2 = Bytes.toBytes("family2");
+
@Rule
public TestName name = new TestName();
@@ -105,12 +106,12 @@ public class TestBulkLoad {
storeFileName = (new Path(storeFileName)).getName();
List<String> storeFileNames = new ArrayList<String>();
storeFileNames.add(storeFileName);
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class),
+ when(log.append(any(HRegionInfo.class), any(WALKey.class),
argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
familyName, storeFileNames)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -132,11 +133,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadSingleFamilyHLog() throws IOException {
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+ when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -151,11 +152,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadManyFamilyHLog() throws IOException {
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+ when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@@ -171,11 +172,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
- when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
+ when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
- WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
+ WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5574d3..ed7623c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -896,7 +896,7 @@ public class TestHRegion {
storeFiles, Lists.newArrayList(newFile),
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
- WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
+ WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
@@ -4796,7 +4796,7 @@ public class TestHRegion {
//verify append called or not
verify(wal, expectAppend ? times(1) : never())
- .append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
+ .append((HRegionInfo)any(), (WALKey)any(),
(WALEdit)any(), Mockito.anyBoolean());
// verify sync called or not
@@ -5998,7 +5998,7 @@ public class TestHRegion {
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
TEST_UTIL.getConfiguration(), rss, null);
- verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+ verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
@@ -6111,7 +6111,7 @@ public class TestHRegion {
// verify that we have not appended region open event to WAL because this region is still
// recovering
- verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+ verify(wal, times(0)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
// not put the region out of recovering state
@@ -6119,7 +6119,7 @@ public class TestHRegion {
.prepare().process();
// now we should have put the entry
- verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
+ verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
@@ -6163,12 +6163,12 @@ public class TestHRegion {
*/
private WAL mockWAL() throws IOException {
WAL wal = mock(WAL.class);
- Mockito.when(wal.append((HTableDescriptor)Mockito.any(), (HRegionInfo)Mockito.any(),
+ Mockito.when(wal.append((HRegionInfo)Mockito.any(),
(WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
thenAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
- WALKey key = invocation.getArgumentAt(2, WALKey.class);
+ WALKey key = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
key.setWriteEntry(we);
return 1L;
@@ -6206,7 +6206,7 @@ public class TestHRegion {
region.close(false);
// 2 times, one for region open, the other close region
- verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
+ verify(wal, times(2)).append((HRegionInfo)any(), (WALKey)any(),
editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getAllValues().get(1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 4d5d7d8..9183e18 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -1126,7 +1126,7 @@ public class TestHRegionReplayEvents {
// test for region open and close
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
- verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
// test for replay prepare flush
@@ -1140,11 +1140,11 @@ public class TestHRegionReplayEvents {
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
.build());
- verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
secondaryRegion.close();
- verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
+ verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8f2bd060/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 87cbab7..76b4134 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@@ -411,7 +412,7 @@ public class TestHRegionServerBulkLoad {
private boolean found = false;
@Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
for (Cell cell : logEdit.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
for (Map.Entry entry : kv.toStringMap().entrySet()) {