You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:49 UTC
[10/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index b52a258..013f0ef 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -204,7 +204,7 @@ class MockRegionServerServices implements RegionServerServices {
}
@Override
- public HLog getWAL(HRegionInfo regionInfo) throws IOException {
+ public WAL getWAL(HRegionInfo regionInfo) throws IOException {
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 00d263c..7fc8e7a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.Test;
@@ -83,7 +83,6 @@ public class TestIOFencing {
//((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
// .getLogger().setLevel(Level.ALL);
//((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
- //((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
}
public abstract static class CompactionBlockerRegion extends HRegion {
@@ -92,7 +91,7 @@ public class TestIOFencing {
volatile CountDownLatch compactionsWaiting = new CountDownLatch(0);
@SuppressWarnings("deprecation")
- public CompactionBlockerRegion(Path tableDir, HLog log,
+ public CompactionBlockerRegion(Path tableDir, WAL log,
FileSystem fs, Configuration confParam, HRegionInfo info,
HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
@@ -139,7 +138,7 @@ public class TestIOFencing {
*/
public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion {
- public BlockCompactionsInPrepRegion(Path tableDir, HLog log,
+ public BlockCompactionsInPrepRegion(Path tableDir, WAL log,
FileSystem fs, Configuration confParam, HRegionInfo info,
HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
@@ -162,7 +161,7 @@ public class TestIOFencing {
* entry to go the WAL before blocking, but blocks afterwards
*/
public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion {
- public BlockCompactionsInCompletionRegion(Path tableDir, HLog log,
+ public BlockCompactionsInCompletionRegion(Path tableDir, WAL log,
FileSystem fs, Configuration confParam, HRegionInfo info,
HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
@@ -277,7 +276,7 @@ public class TestIOFencing {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
- HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(),
+ WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100));
// Wait till flush has happened, otherwise there won't be multiple store files
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
index a486f4b..929f9aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java
@@ -437,7 +437,7 @@ public class TestHFileArchiving {
// remove all the non-storefile named files for the region
for (int i = 0; i < storeFiles.size(); i++) {
String file = storeFiles.get(i);
- if (file.contains(HRegionFileSystem.REGION_INFO_FILE) || file.contains("hlog")) {
+ if (file.contains(HRegionFileSystem.REGION_INFO_FILE) || file.contains("wal")) {
storeFiles.remove(i--);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index fbe33a1..1f4d865 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -54,9 +54,9 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtilsForTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -520,7 +520,7 @@ public class TestAdmin2 {
}
@Test (timeout=300000)
- public void testHLogRollWriting() throws Exception {
+ public void testWALRollWriting() throws Exception {
setUpforLogRolling();
String className = this.getClass().getName();
StringBuilder v = new StringBuilder(className);
@@ -530,7 +530,7 @@ public class TestAdmin2 {
byte[] value = Bytes.toBytes(v.toString());
HRegionServer regionServer = startAndWriteData(TableName.valueOf("TestLogRolling"), value);
LOG.info("after writing there are "
- + HLogUtilsForTests.getNumRolledLogFiles(regionServer.getWAL()) + " log files");
+ + DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files");
// flush all regions
@@ -539,8 +539,8 @@ public class TestAdmin2 {
for (HRegion r : regions) {
r.flushcache();
}
- admin.rollHLogWriter(regionServer.getServerName().getServerName());
- int count = HLogUtilsForTests.getNumRolledLogFiles(regionServer.getWAL());
+ admin.rollWALWriter(regionServer.getServerName());
+ int count = DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
LOG.info("after flushing all regions and rolling logs there are " +
count + " log files");
assertTrue(("actual count: " + count), count <= 2);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
index 3f983ed..d7852f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@@ -60,6 +61,12 @@ implements WALObserver {
private boolean preWALRestoreCalled = false;
private boolean postWALRestoreCalled = false;
+ // Deprecated versions
+ private boolean preWALWriteDeprecatedCalled = false;
+ private boolean postWALWriteDeprecatedCalled = false;
+ private boolean preWALRestoreDeprecatedCalled = false;
+ private boolean postWALRestoreDeprecatedCalled = false;
+
/**
* Set values: with a table name, a column name which will be ignored, and
* a column name which will be added to WAL.
@@ -74,18 +81,32 @@ implements WALObserver {
this.addedQualifier = addq;
this.changedFamily = chf;
this.changedQualifier = chq;
+ preWALWriteCalled = false;
+ postWALWriteCalled = false;
+ preWALRestoreCalled = false;
+ postWALRestoreCalled = false;
+ preWALWriteDeprecatedCalled = false;
+ postWALWriteDeprecatedCalled = false;
+ preWALRestoreDeprecatedCalled = false;
+ postWALRestoreDeprecatedCalled = false;
}
+ @Override
+ public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ postWALWriteCalled = true;
+ }
@Override
public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
- postWALWriteCalled = true;
+ postWALWriteDeprecatedCalled = true;
+ postWALWrite(env, info, (WALKey)logKey, logEdit);
}
@Override
- public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> env,
- HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+ public boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
boolean bypass = false;
// check table name matches or not.
if (!Bytes.equals(info.getTableName(), this.tableName)) {
@@ -122,14 +143,28 @@ implements WALObserver {
return bypass;
}
+ @Override
+ public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+ preWALWriteDeprecatedCalled = true;
+ return preWALWrite(env, info, (WALKey)logKey, logEdit);
+ }
+
/**
* Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is
* Restoreed.
*/
@Override
+ public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ preWALRestoreCalled = true;
+ }
+
+ @Override
public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
- preWALRestoreCalled = true;
+ preWALRestoreDeprecatedCalled = true;
+ preWALRestore(env, info, (WALKey)logKey, logEdit);
}
/**
@@ -137,9 +172,16 @@ implements WALObserver {
* Restoreed.
*/
@Override
+ public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ postWALRestoreCalled = true;
+ }
+
+ @Override
public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
- postWALRestoreCalled = true;
+ postWALRestoreDeprecatedCalled = true;
+ postWALRestore(env, info, (WALKey)logKey, logEdit);
}
public boolean isPreWALWriteCalled() {
@@ -161,4 +203,27 @@ implements WALObserver {
".isPostWALRestoreCalled is called.");
return postWALRestoreCalled;
}
+
+ public boolean isPreWALWriteDeprecatedCalled() {
+ return preWALWriteDeprecatedCalled;
+ }
+
+ public boolean isPostWALWriteDeprecatedCalled() {
+ return postWALWriteDeprecatedCalled;
+ }
+
+ public boolean isPreWALRestoreDeprecatedCalled() {
+ return preWALRestoreDeprecatedCalled;
+ }
+
+ public boolean isPostWALRestoreDeprecatedCalled() {
+ return postWALRestoreDeprecatedCalled;
+ }
+
+ /**
+ * This class should trigger our legacy support since it does not directly implement the
+ * newer API methods.
+ */
+ static class Legacy extends SampleRegionWALObserver {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
index bf53518..7100ae7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -115,8 +116,6 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0);
final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0);
final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0);
- final AtomicInteger ctPreWALRestored = new AtomicInteger(0);
- final AtomicInteger ctPostWALRestored = new AtomicInteger(0);
final AtomicInteger ctPreScannerNext = new AtomicInteger(0);
final AtomicInteger ctPostScannerNext = new AtomicInteger(0);
final AtomicInteger ctPreScannerClose = new AtomicInteger(0);
@@ -130,6 +129,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
final AtomicInteger ctPostBatchMutate = new AtomicInteger(0);
final AtomicInteger ctPreWALRestore = new AtomicInteger(0);
final AtomicInteger ctPostWALRestore = new AtomicInteger(0);
+ final AtomicInteger ctPreWALRestoreDeprecated = new AtomicInteger(0);
+ final AtomicInteger ctPostWALRestoreDeprecated = new AtomicInteger(0);
final AtomicInteger ctPreSplitBeforePONR = new AtomicInteger(0);
final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0);
final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0);
@@ -661,8 +662,8 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
- public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
- HLogKey logKey, WALEdit logEdit) throws IOException {
+ public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
String tableName = logKey.getTablename().getNameAsString();
if (tableName.equals(TABLE_SKIPPED)) {
// skip recovery of TABLE_SKIPPED for testing purpose
@@ -673,9 +674,23 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
+ public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ preWALRestore(env, info, (WALKey)logKey, logEdit);
+ ctPreWALRestoreDeprecated.incrementAndGet();
+ }
+
+ @Override
+ public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env,
+ HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+ ctPostWALRestore.incrementAndGet();
+ }
+
+ @Override
public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
- ctPostWALRestore.incrementAndGet();
+ postWALRestore(env, info, (WALKey)logKey, logEdit);
+ ctPostWALRestoreDeprecated.incrementAndGet();
}
@Override
@@ -794,13 +809,14 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return ctPrePrepareDeleteTS.get() > 0;
}
- public boolean hadPreWALRestored() {
- return ctPreWALRestored.get() > 0;
+ public boolean hadPreWALRestore() {
+ return ctPreWALRestore.get() > 0;
}
- public boolean hadPostWALRestored() {
- return ctPostWALRestored.get() > 0;
+ public boolean hadPostWALRestore() {
+ return ctPostWALRestore.get() > 0;
}
+
public boolean wasScannerNextCalled() {
return ctPreScannerNext.get() > 0 && ctPostScannerNext.get() > 0;
}
@@ -939,7 +955,22 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return ctPostWALRestore.get();
}
+ public int getCtPreWALRestoreDeprecated() {
+ return ctPreWALRestoreDeprecated.get();
+ }
+
+ public int getCtPostWALRestoreDeprecated() {
+ return ctPostWALRestoreDeprecated.get();
+ }
+
public boolean wasStoreFileReaderOpenCalled() {
return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0;
}
+
+ /**
+ * This implementation should trigger our legacy support because it does not directly
+ * implement the newer API calls.
+ */
+ public static class Legacy extends SimpleRegionObserver {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 310d875..31da5aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -98,8 +98,9 @@ public class TestRegionObserverInterface {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.setBoolean("hbase.master.distributed.log.replay", true);
- conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver");
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver",
+ "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver$Legacy");
util.startMiniCluster();
cluster = util.getMiniHBaseCluster();
@@ -618,9 +619,10 @@ public class TestRegionObserverInterface {
);
verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
+ new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut",
+ "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"},
tableName,
- new Integer[] {0, 0, 1, 1});
+ new Integer[] {0, 0, 1, 1, 0, 0});
cluster.killRegionServer(rs1.getRegionServer().getServerName());
Threads.sleep(1000); // Let the kill soak in.
@@ -628,9 +630,60 @@ public class TestRegionObserverInterface {
LOG.info("All regions assigned");
verifyMethodResult(SimpleRegionObserver.class,
- new String[]{"getCtPrePut", "getCtPostPut"},
+ new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut",
+ "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"},
tableName,
- new Integer[]{0, 0});
+ new Integer[]{1, 1, 0, 0, 0, 0});
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
+ }
+
+ @Test
+ public void testLegacyRecovery() throws Exception {
+ LOG.info(TestRegionObserverInterface.class.getName() +".testLegacyRecovery");
+ TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testLegacyRecovery");
+ HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+ try {
+ JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
+ ServerName sn2 = rs1.getRegionServer().getServerName();
+ String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
+
+ util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
+ while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
+ Thread.sleep(100);
+ }
+
+ Put put = new Put(ROW);
+ put.add(A, A, A);
+ put.add(B, B, B);
+ put.add(C, C, C);
+ table.put(put);
+
+ verifyMethodResult(SimpleRegionObserver.Legacy.class,
+ new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+ "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+ tableName,
+ new Boolean[] {false, false, true, true, true, true, false}
+ );
+
+ verifyMethodResult(SimpleRegionObserver.Legacy.class,
+ new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut",
+ "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"},
+ tableName,
+ new Integer[] {0, 0, 1, 1, 0, 0});
+
+ cluster.killRegionServer(rs1.getRegionServer().getServerName());
+ Threads.sleep(1000); // Let the kill soak in.
+ util.waitUntilAllRegionsAssigned(tableName);
+ LOG.info("All regions assigned");
+
+ verifyMethodResult(SimpleRegionObserver.Legacy.class,
+ new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut",
+ "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"},
+ tableName,
+ new Integer[]{1, 1, 0, 0, 1, 1});
} finally {
util.deleteTable(tableName);
table.close();
@@ -664,7 +717,9 @@ public class TestRegionObserverInterface {
util.waitUntilAllRegionsAssigned(tableName);
verifyMethodResult(SimpleRegionObserver.class, new String[] { "getCtPreWALRestore",
- "getCtPostWALRestore" }, tableName, new Integer[] { 0, 0 });
+ "getCtPostWALRestore", "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"},
+ tableName,
+ new Integer[] {0, 0, 0, 0});
util.deleteTable(tableName);
table.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index 3365a95..1f278e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -61,7 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -216,7 +216,7 @@ public class TestRegionObserverScannerOpenHook {
private static volatile CountDownLatch compactionStateChangeLatch = null;
@SuppressWarnings("deprecation")
- public CompactionCompletionNotifyingRegion(Path tableDir, HLog log,
+ public CompactionCompletionNotifyingRegion(Path tableDir, WAL log,
FileSystem fs, Configuration confParam, HRegionInfo info,
HTableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, log, fs, confParam, info, htd, rsServices);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 1eddc8a..95a194d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -49,11 +49,14 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -63,7 +66,9 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import org.junit.experimental.categories.Category;
/**
@@ -85,6 +90,9 @@ public class TestWALObserver {
Bytes.toBytes("v2"), Bytes.toBytes("v3"), };
private static byte[] TEST_ROW = Bytes.toBytes("testRow");
+ @Rule
+ public TestName currentTest = new TestName();
+
private Configuration conf;
private FileSystem fs;
private Path dir;
@@ -92,12 +100,13 @@ public class TestWALObserver {
private String logName;
private Path oldLogDir;
private Path logDir;
+ private WALFactory wals;
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
- conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
- SampleRegionWALObserver.class.getName());
+ conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName(), SampleRegionWALObserver.Legacy.class.getName());
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
SampleRegionWALObserver.class.getName());
conf.setBoolean("dfs.support.append", true);
@@ -124,16 +133,25 @@ public class TestWALObserver {
this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
this.oldLogDir = new Path(this.hbaseRootDir,
HConstants.HREGION_OLDLOGDIR_NAME);
- this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
+ this.logDir = new Path(this.hbaseRootDir,
+ DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
this.logName = HConstants.HREGION_LOGDIR_NAME;
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
+ this.wals = new WALFactory(conf, null, currentTest.getMethodName());
}
@After
public void tearDown() throws Exception {
+ try {
+ wals.shutdown();
+ } catch (IOException exception) {
+ // one of our tests splits out from under our wals.
+ LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage());
+ LOG.debug("details of failure to close wal factory.", exception);
+ }
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
@@ -144,7 +162,23 @@ public class TestWALObserver {
*/
@Test
public void testWALObserverWriteToWAL() throws Exception {
+ final WAL log = wals.getWAL(UNSPECIFIED_REGION);
+ verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.class), false);
+ }
+
+ /**
+ * Test WAL write behavior with WALObserver. The coprocessor monitors a
+ * WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
+ * WALEdit.
+ */
+ @Test
+ public void testLegacyWALObserverWriteToWAL() throws Exception {
+ final WAL log = wals.getWAL(UNSPECIFIED_REGION);
+ verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.Legacy.class), true);
+ }
+ private void verifyWritesSeen(final WAL log, final SampleRegionWALObserver cp,
+ final boolean seesLegacy) throws Exception {
HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
@@ -154,10 +188,6 @@ public class TestWALObserver {
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
final AtomicLong sequenceId = new AtomicLong(0);
- HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir,
- TestWALObserver.class.getName(), this.conf);
- SampleRegionWALObserver cp = getCoprocessor(log);
-
// TEST_FAMILY[0] shall be removed from WALEdit.
// TEST_FAMILY[1] value shall be changed.
// TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
@@ -166,6 +196,8 @@ public class TestWALObserver {
assertFalse(cp.isPreWALWriteCalled());
assertFalse(cp.isPostWALWriteCalled());
+ assertFalse(cp.isPreWALWriteDeprecatedCalled());
+ assertFalse(cp.isPostWALWriteDeprecatedCalled());
// TEST_FAMILY[2] is not in the put, however it shall be added by the tested
// coprocessor.
@@ -201,7 +233,10 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime();
- log.append(hri, hri.getTable(), edit, now, htd, sequenceId);
+ // we use HLogKey here instead of WALKey directly to support legacy coprocessors.
+ long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
+ edit, sequenceId, true, null);
+ log.sync(txid);
// the edit shall have been change now by the coprocessor.
foundFamily0 = false;
@@ -226,6 +261,83 @@ public class TestWALObserver {
assertTrue(cp.isPreWALWriteCalled());
assertTrue(cp.isPostWALWriteCalled());
+ assertEquals(seesLegacy, cp.isPreWALWriteDeprecatedCalled());
+ assertEquals(seesLegacy, cp.isPostWALWriteDeprecatedCalled());
+ }
+
+ @Test
+ public void testNonLegacyWALKeysDoNotExplode() throws Exception {
+ TableName tableName = TableName.valueOf(TEST_TABLE);
+ final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
+ .toString(TEST_TABLE));
+ final HRegionInfo hri = new HRegionInfo(tableName, null, null);
+ final AtomicLong sequenceId = new AtomicLong(0);
+
+ fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName()));
+
+ final Configuration newConf = HBaseConfiguration.create(this.conf);
+
+ final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
+ final SampleRegionWALObserver newApi = getCoprocessor(wal, SampleRegionWALObserver.class);
+ newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
+ final SampleRegionWALObserver oldApi = getCoprocessor(wal,
+ SampleRegionWALObserver.Legacy.class);
+ oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
+
+ LOG.debug("ensuring wal entries haven't happened before we start");
+ assertFalse(newApi.isPreWALWriteCalled());
+ assertFalse(newApi.isPostWALWriteCalled());
+ assertFalse(newApi.isPreWALWriteDeprecatedCalled());
+ assertFalse(newApi.isPostWALWriteDeprecatedCalled());
+ assertFalse(oldApi.isPreWALWriteCalled());
+ assertFalse(oldApi.isPostWALWriteCalled());
+ assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
+ assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
+
+ LOG.debug("writing to WAL with non-legacy keys.");
+ final int countPerFamily = 5;
+ for (HColumnDescriptor hcd : htd.getFamilies()) {
+ addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
+ EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
+ }
+
+ LOG.debug("Verify that only the non-legacy CP saw edits.");
+ assertTrue(newApi.isPreWALWriteCalled());
+ assertTrue(newApi.isPostWALWriteCalled());
+ assertFalse(newApi.isPreWALWriteDeprecatedCalled());
+ assertFalse(newApi.isPostWALWriteDeprecatedCalled());
+ // wish we could test that the log message happened :/
+ assertFalse(oldApi.isPreWALWriteCalled());
+ assertFalse(oldApi.isPostWALWriteCalled());
+ assertFalse(oldApi.isPreWALWriteDeprecatedCalled());
+ assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
+
+ LOG.debug("reseting cp state.");
+ newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
+ oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null);
+
+ LOG.debug("write a log edit that supports legacy cps.");
+ final long now = EnvironmentEdgeManager.currentTime();
+ final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now);
+ final WALEdit edit = new WALEdit();
+ final byte[] nonce = Bytes.toBytes("1772");
+ edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
+ final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null);
+ wal.sync(txid);
+
+ LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
+ assertTrue("non-legacy WALObserver didn't see pre-write.", newApi.isPreWALWriteCalled());
+ assertTrue("non-legacy WALObserver didn't see post-write.", newApi.isPostWALWriteCalled());
+ assertFalse("non-legacy WALObserver shouldn't have seen legacy pre-write.",
+ newApi.isPreWALWriteDeprecatedCalled());
+ assertFalse("non-legacy WALObserver shouldn't have seen legacy post-write.",
+ newApi.isPostWALWriteDeprecatedCalled());
+ assertTrue("legacy WALObserver didn't see pre-write.", oldApi.isPreWALWriteCalled());
+ assertTrue("legacy WALObserver didn't see post-write.", oldApi.isPostWALWriteCalled());
+ assertTrue("legacy WALObserver didn't see legacy pre-write.",
+ oldApi.isPreWALWriteDeprecatedCalled());
+ assertTrue("legacy WALObserver didn't see legacy post-write.",
+ oldApi.isPostWALWriteDeprecatedCalled());
}
/**
@@ -237,10 +349,9 @@ public class TestWALObserver {
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
final AtomicLong sequenceId = new AtomicLong(0);
- HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir,
- TestWALObserver.class.getName(), this.conf);
+ WAL log = wals.getWAL(UNSPECIFIED_REGION);
try {
- SampleRegionWALObserver cp = getCoprocessor(log);
+ SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null);
@@ -248,13 +359,14 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
- log.append(hri, hri.getTable(), new WALEdit(), now, htd, sequenceId);
- log.sync();
+ long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
+ new WALEdit(), sequenceId, true, null);
+ log.sync(txid);
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled());
assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled());
} finally {
- log.closeAndDelete();
+ log.close();
}
}
@@ -281,8 +393,8 @@ public class TestWALObserver {
final Configuration newConf = HBaseConfiguration.create(this.conf);
- // HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
- HLog wal = createWAL(this.conf);
+ // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf);
+ WAL wal = wals.getWAL(UNSPECIFIED_REGION);
// Put p = creatPutWith2Families(TEST_ROW);
WALEdit edit = new WALEdit();
long now = EnvironmentEdgeManager.currentTime();
@@ -290,12 +402,11 @@ public class TestWALObserver {
final int countPerFamily = 1000;
// for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
- // addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
- // EnvironmentEdgeManager.getDelegate(), wal);
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
}
- wal.append(hri, tableName, edit, now, htd, sequenceId);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId,
+ true, null);
// sync to fs.
wal.sync();
@@ -307,7 +418,8 @@ public class TestWALObserver {
LOG.info("WALSplit path == " + p);
FileSystem newFS = FileSystem.get(newConf);
// Make a new wal for new region open.
- HLog wal2 = createWAL(newConf);
+ final WALFactory wals2 = new WALFactory(conf, null, currentTest.getMethodName()+"2");
+ WAL wal2 = wals2.getWAL(UNSPECIFIED_REGION);;
HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir,
hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null);
long seqid2 = region.getOpenSeqNum();
@@ -319,8 +431,10 @@ public class TestWALObserver {
assertNotNull(cp2);
assertTrue(cp2.isPreWALRestoreCalled());
assertTrue(cp2.isPostWALRestoreCalled());
+ assertFalse(cp2.isPreWALRestoreDeprecatedCalled());
+ assertFalse(cp2.isPostWALRestoreDeprecatedCalled());
region.close();
- wal2.closeAndDelete();
+ wals2.close();
return null;
}
});
@@ -329,19 +443,18 @@ public class TestWALObserver {
/**
* Test to see CP loaded successfully or not. There is a duplication at
* TestHLog, but the purpose of that one is to see whether the loaded CP will
- * impact existing HLog tests or not.
+ * impact existing WAL tests or not.
*/
@Test
public void testWALObserverLoaded() throws Exception {
- HLog log = HLogFactory.createHLog(fs, hbaseRootDir,
- TestWALObserver.class.getName(), conf);
- assertNotNull(getCoprocessor(log));
+ WAL log = wals.getWAL(UNSPECIFIED_REGION);
+ assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class));
}
- private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
+ private SampleRegionWALObserver getCoprocessor(WAL wal,
+ Class<? extends SampleRegionWALObserver> clazz) throws Exception {
WALCoprocessorHost host = wal.getCoprocessorHost();
- Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class
- .getName());
+ Coprocessor c = host.findCoprocessor(clazz.getName());
return (SampleRegionWALObserver) c;
}
@@ -399,8 +512,8 @@ public class TestWALObserver {
}
private Path runWALSplit(final Configuration c) throws IOException {
- List<Path> splits = HLogSplitter.split(
- hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c);
+ List<Path> splits = WALSplitter.split(
+ hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());
// Make sure the file exists
@@ -409,21 +522,25 @@ public class TestWALObserver {
return splits.get(0);
}
- private HLog createWAL(final Configuration c) throws IOException {
- return HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c);
- }
+ private static final byte[] UNSPECIFIED_REGION = new byte[]{};
- private void addWALEdits(final TableName tableName, final HRegionInfo hri,
- final byte[] rowName, final byte[] family, final int count,
- EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, final AtomicLong sequenceId)
- throws IOException {
+ private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
+ final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
+ final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException {
String familyStr = Bytes.toString(family);
+ long txid = -1;
for (int j = 0; j < count; j++) {
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
- wal.append(hri, tableName, edit, ee.currentTime(), htd, sequenceId);
+ // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
+ // about legacy coprocessors
+ txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ ee.currentTime()), edit, sequenceId, true, null);
+ }
+ if (-1 != txid) {
+ wal.sync(txid);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
index b71473e..a4782b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
@@ -217,9 +217,9 @@ public class TestFilter {
@After
public void tearDown() throws Exception {
- HLog hlog = region.getLog();
+ WAL wal = region.getWAL();
region.close();
- hlog.closeAndDelete();
+ wal.close();
}
@Test
@@ -1488,9 +1488,9 @@ public class TestFilter {
assertEquals(2, resultCount);
scanner.close();
- HLog hlog = testRegion.getLog();
+ WAL wal = testRegion.getWAL();
testRegion.close();
- hlog.closeAndDelete();
+ wal.close();
}
@Test
@@ -2096,8 +2096,8 @@ public class TestFilter {
results.clear();
}
assertFalse(scanner.next(results));
- HLog hlog = testRegion.getLog();
+ WAL wal = testRegion.getWAL();
testRegion.close();
- hlog.closeAndDelete();
+ wal.close();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
index ba712f3..7223573 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
@@ -149,9 +149,9 @@ public class TestInvocationRecordFilter {
@After
public void tearDown() throws Exception {
- HLog hlog = region.getLog();
+ WAL wal = region.getWAL();
region.close();
- hlog.closeAndDelete();
+ wal.close();
}
/**
@@ -179,4 +179,4 @@ public class TestInvocationRecordFilter {
return true;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
index ead2c47..9b610e0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
@@ -18,12 +18,15 @@
package org.apache.hadoop.hbase.fs;
-
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.net.ServerSocket;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +38,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LargeTests;
@@ -42,8 +46,10 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -278,7 +284,32 @@ public class TestBlockReorder {
int nbTest = 0;
while (nbTest < 10) {
- htu.getHBaseAdmin().rollHLogWriter(targetRs.getServerName().toString());
+ final List<HRegion> regions = targetRs.getOnlineRegions(h.getName());
+ final CountDownLatch latch = new CountDownLatch(regions.size());
+ // listen for successful log rolls
+ final WALActionsListener listener = new WALActionsListener.Base() {
+ @Override
+ public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ latch.countDown();
+ }
+ };
+ for (HRegion region : regions) {
+ region.getWAL().registerWALActionsListener(listener);
+ }
+
+ htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName());
+
+ // wait
+ try {
+ latch.await();
+ } catch (InterruptedException exception) {
+ LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " +
+ "tests fail, it's probably because we should still be waiting.");
+ Thread.currentThread().interrupt();
+ }
+ for (HRegion region : regions) {
+ region.getWAL().unregisterWALActionsListener(listener);
+ }
// We need a sleep as the namenode is informed asynchronously
Thread.sleep(100);
@@ -294,37 +325,52 @@ public class TestBlockReorder {
// As we wrote a put, we should have at least one log file.
Assert.assertTrue(hfs.length >= 1);
for (HdfsFileStatus hf : hfs) {
- LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
- String logFile = rootDir + "/" + hf.getLocalName();
- FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
-
- LOG.info("Checking log file: " + logFile);
- // Now checking that the hook is up and running
- // We can't call directly getBlockLocations, it's not available in HFileSystem
- // We're trying multiple times to be sure, as the order is random
-
- BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
- if (bls.length > 0) {
- BlockLocation bl = bls[0];
-
- LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
- for (int i = 0; i < bl.getHosts().length - 1; i++) {
- LOG.info(bl.getHosts()[i] + " " + logFile);
- Assert.assertNotSame(bl.getHosts()[i], host4);
- }
- String last = bl.getHosts()[bl.getHosts().length - 1];
- LOG.info(last + " " + logFile);
- if (host4.equals(last)) {
- nbTest++;
- LOG.info(logFile + " is on the new datanode and is ok");
- if (bl.getHosts().length == 3) {
- // We can test this case from the file system as well
- // Checking the underlying file system. Multiple times as the order is random
- testFromDFS(dfs, logFile, repCount, host4);
-
- // now from the master
- testFromDFS(mdfs, logFile, repCount, host4);
+ // Because this is a live cluster, log files might get archived while we're processing
+ try {
+ LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
+ String logFile = rootDir + "/" + hf.getLocalName();
+ FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
+
+ LOG.info("Checking log file: " + logFile);
+ // Now checking that the hook is up and running
+ // We can't call directly getBlockLocations, it's not available in HFileSystem
+ // We're trying multiple times to be sure, as the order is random
+
+ BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1);
+ if (bls.length > 0) {
+ BlockLocation bl = bls[0];
+
+ LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " ");
+ for (int i = 0; i < bl.getHosts().length - 1; i++) {
+ LOG.info(bl.getHosts()[i] + " " + logFile);
+ Assert.assertNotSame(bl.getHosts()[i], host4);
}
+ String last = bl.getHosts()[bl.getHosts().length - 1];
+ LOG.info(last + " " + logFile);
+ if (host4.equals(last)) {
+ nbTest++;
+ LOG.info(logFile + " is on the new datanode and is ok");
+ if (bl.getHosts().length == 3) {
+ // We can test this case from the file system as well
+ // Checking the underlying file system. Multiple times as the order is random
+ testFromDFS(dfs, logFile, repCount, host4);
+
+ // now from the master
+ testFromDFS(mdfs, logFile, repCount, host4);
+ }
+ }
+ }
+ } catch (FileNotFoundException exception) {
+ LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
+ "archived out from under us so we'll ignore and retry. If this test hangs " +
+ "indefinitely you should treat this failure as a symptom.", exception);
+ } catch (RemoteException exception) {
+ if (exception.unwrapRemoteException() instanceof FileNotFoundException) {
+ LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " +
+ "archived out from under us so we'll ignore and retry. If this test hangs " +
+ "indefinitely you should treat this failure as a symptom.", exception);
+ } else {
+ throw exception;
}
}
}
@@ -414,7 +460,7 @@ public class TestBlockReorder {
// Check that it will be possible to extract a ServerName from our construction
Assert.assertNotNull("log= " + pseudoLogFile,
- HLogUtil.getServerNameFromHLogDirectoryName(dfs.getConf(), pseudoLogFile));
+ DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));
// And check we're doing the right reorder.
lrb.reorderBlocks(conf, l, pseudoLogFile);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index 2d5060e..478fde3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -65,7 +65,7 @@ public class TestHeapSize {
static final Log LOG = LogFactory.getLog(TestHeapSize.class);
// List of classes implementing HeapSize
// BatchOperation, BatchUpdate, BlockIndex, Entry, Entry<K,V>, HStoreKey
- // KeyValue, LruBlockCache, LruHashMap<K,V>, Put, HLogKey
+ // KeyValue, LruBlockCache, LruHashMap<K,V>, Put, WALKey
@BeforeClass
public static void beforeClass() throws Exception {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
index 22acfa9..5f890d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java
@@ -17,232 +17,26 @@
*/
package org.apache.hadoop.hbase.mapreduce;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.MapReduceTestUtil;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogKeyRecordReader;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.experimental.categories.Category;
/**
- * JUnit tests for the HLogRecordReader
+ * JUnit tests for the record reader in HLogInputFormat
*/
@Category(MediumTests.class)
-public class TestHLogRecordReader {
- private final Log LOG = LogFactory.getLog(getClass());
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static Configuration conf;
- private static FileSystem fs;
- private static Path hbaseDir;
- private static final TableName tableName =
- TableName.valueOf(getName());
- private static final byte [] rowName = tableName.getName();
- private static final HRegionInfo info = new HRegionInfo(tableName,
- Bytes.toBytes(""), Bytes.toBytes(""), false);
- private static final byte [] family = Bytes.toBytes("column");
- private static final byte [] value = Bytes.toBytes("value");
- private static HTableDescriptor htd;
- private static Path logDir;
- private static String logName;
-
- private static String getName() {
- return "TestHLogRecordReader";
- }
-
- @Before
- public void setUp() throws Exception {
- FileStatus[] entries = fs.listStatus(hbaseDir);
- for (FileStatus dir : entries) {
- fs.delete(dir.getPath(), true);
- }
-
- }
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- // Make block sizes small.
- conf = TEST_UTIL.getConfiguration();
- conf.setInt("dfs.blocksize", 1024 * 1024);
- conf.setInt("dfs.replication", 1);
- TEST_UTIL.startMiniDFSCluster(1);
-
- conf = TEST_UTIL.getConfiguration();
- fs = TEST_UTIL.getDFSCluster().getFileSystem();
+public class TestHLogRecordReader extends TestWALRecordReader {
- hbaseDir = TEST_UTIL.createRootDir();
-
- logName = HConstants.HREGION_LOGDIR_NAME;
- logDir = new Path(hbaseDir, logName);
-
- htd = new HTableDescriptor(tableName);
- htd.addFamily(new HColumnDescriptor(family));
+ @Override
+ protected WALKey getWalKey(final long sequenceid) {
+ return new HLogKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
+ @Override
+ protected WALRecordReader getReader() {
+ return new HLogKeyRecordReader();
}
-
- /**
- * Test partial reads from the log based on passed time range
- * @throws Exception
- */
- @Test
- public void testPartialRead() throws Exception {
- HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
- // This test depends on timestamp being millisecond based and the filename of the WAL also
- // being millisecond based.
- long ts = System.currentTimeMillis();
- WALEdit edit = new WALEdit();
- final AtomicLong sequenceId = new AtomicLong(0);
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
- log.append(info, tableName, edit, ts, htd, sequenceId);
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
- log.append(info, tableName, edit, ts+1, htd, sequenceId);
- LOG.info("Before 1st WAL roll " + log.getFilenum());
- log.rollWriter();
- LOG.info("Past 1st WAL roll " + log.getFilenum());
-
- Thread.sleep(1);
- long ts1 = System.currentTimeMillis();
-
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
- log.append(info, tableName, edit, ts1+1, htd, sequenceId);
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
- log.append(info, tableName, edit, ts1+2, htd, sequenceId);
- log.close();
- LOG.info("Closed WAL " + log.getFilenum());
-
-
- HLogInputFormat input = new HLogInputFormat();
- Configuration jobConf = new Configuration(conf);
- jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
- jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts);
-
- // only 1st file is considered, and only its 1st entry is used
- List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
-
- assertEquals(1, splits.size());
- testSplit(splits.get(0), Bytes.toBytes("1"));
-
- jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1);
- jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1);
- splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- // both files need to be considered
- assertEquals(2, splits.size());
- // only the 2nd entry from the 1st file is used
- testSplit(splits.get(0), Bytes.toBytes("2"));
- // only the 1nd entry from the 2nd file is used
- testSplit(splits.get(1), Bytes.toBytes("3"));
- }
-
- /**
- * Test basic functionality
- * @throws Exception
- */
- @Test
- public void testHLogRecordReader() throws Exception {
- HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
- byte [] value = Bytes.toBytes("value");
- final AtomicLong sequenceId = new AtomicLong(0);
- WALEdit edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
- System.currentTimeMillis(), value));
- log.append(info, tableName, edit,
- System.currentTimeMillis(), htd, sequenceId);
-
- Thread.sleep(1); // make sure 2nd log gets a later timestamp
- long secondTs = System.currentTimeMillis();
- log.rollWriter();
-
- edit = new WALEdit();
- edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
- System.currentTimeMillis(), value));
- log.append(info, tableName, edit,
- System.currentTimeMillis(), htd, sequenceId);
- log.close();
- long thirdTs = System.currentTimeMillis();
-
- // should have 2 log files now
- HLogInputFormat input = new HLogInputFormat();
- Configuration jobConf = new Configuration(conf);
- jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
-
- // make sure both logs are found
- List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- assertEquals(2, splits.size());
-
- // should return exactly one KV
- testSplit(splits.get(0), Bytes.toBytes("1"));
- // same for the 2nd split
- testSplit(splits.get(1), Bytes.toBytes("2"));
-
- // now test basic time ranges:
-
- // set an endtime, the 2nd log file can be ignored completely.
- jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1);
- splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- assertEquals(1, splits.size());
- testSplit(splits.get(0), Bytes.toBytes("1"));
-
- // now set a start time
- jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE);
- jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs);
- splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
- // both logs need to be considered
- assertEquals(2, splits.size());
- // but both readers skip all edits
- testSplit(splits.get(0));
- testSplit(splits.get(1));
- }
-
- /**
- * Create a new reader from the split, and match the edits against the passed columns.
- */
- private void testSplit(InputSplit split, byte[]... columns) throws Exception {
- HLogRecordReader reader = new HLogRecordReader();
- reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
-
- for (byte[] column : columns) {
- assertTrue(reader.nextKeyValue());
- Cell cell = reader.getCurrentValue().getCells().get(0);
- if (!Bytes.equals(column, cell.getQualifier())) {
- assertTrue("expected [" + Bytes.toString(column) + "], actual ["
- + Bytes.toString(cell.getQualifier()) + "]", false);
- }
- }
- assertFalse(reader.nextKeyValue());
- reader.close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
index 3ce657e..7dfc7d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java
@@ -59,10 +59,10 @@ import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.mapreduce.Job;
@@ -643,10 +643,10 @@ public class TestImportExport {
String importTableName = "importTestDurability1";
Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
- // Register the hlog listener for the import table
+ // Register the wal listener for the import table
TableWALActionListener walListener = new TableWALActionListener(importTableName);
- HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL();
- hLog.registerWALActionsListener(walListener);
+ WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(null);
+ wal.registerWALActionsListener(walListener);
// Run the import with SKIP_WAL
args =
@@ -661,9 +661,9 @@ public class TestImportExport {
// Run the import with the default durability option
importTableName = "importTestDurability2";
importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
- hLog.unregisterWALActionsListener(walListener);
+ wal.unregisterWALActionsListener(walListener);
walListener = new TableWALActionListener(importTableName);
- hLog.registerWALActionsListener(walListener);
+ wal.registerWALActionsListener(walListener);
args = new String[] { importTableName, FQ_OUTPUT_DIR };
assertTrue(runImport(args));
//Assert that the wal is visisted
@@ -673,10 +673,10 @@ public class TestImportExport {
}
/**
- * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, HLogKey, WALEdit)} to
+ * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, WALKey, WALEdit)} to
* identify that an entry is written to the Write Ahead Log for the given table.
*/
- private static class TableWALActionListener implements WALActionsListener {
+ private static class TableWALActionListener extends WALActionsListener.Base {
private String tableName;
private boolean isVisited = false;
@@ -686,42 +686,7 @@ public class TestImportExport {
}
@Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- // Not interested in this method.
- }
-
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- // Not interested in this method.
- }
-
- @Override
- public void preLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested in this method.
- }
-
- @Override
- public void postLogArchive(Path oldPath, Path newPath) throws IOException {
- // Not interested in this method.
- }
-
- @Override
- public void logRollRequested() {
- // Not interested in this method.
- }
-
- @Override
- public void logCloseRequested() {
- // Not interested in this method.
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
- // Not interested in this method.
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
if (tableName.equalsIgnoreCase(htd.getNameAsString())) {
isVisited = true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
index a2ec2ec..5bb672d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java
@@ -45,8 +45,8 @@ public class TestTableMapReduceUtil {
Job job = new Job(configuration, "tableName");
// test
TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
- Text.class, job, false, HLogInputFormat.class);
- assertEquals(HLogInputFormat.class, job.getInputFormatClass());
+ Text.class, job, false, WALInputFormat.class);
+ assertEquals(WALInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
@@ -59,8 +59,8 @@ public class TestTableMapReduceUtil {
Configuration configuration = new Configuration();
Job job = new Job(configuration, "tableName");
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
- Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class);
- assertEquals(HLogInputFormat.class, job.getInputFormatClass());
+ Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class);
+ assertEquals(WALInputFormat.class, job.getInputFormatClass());
assertEquals(Import.Importer.class, job.getMapperClass());
assertEquals(LongWritable.class, job.getOutputKeyClass());
assertEquals(Text.class, job.getOutputValueClass());
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index 03fa9f2..9669df9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
@@ -106,7 +106,7 @@ public class TestWALPlayer {
t1.delete(d);
// replay the WAL, map table 1 to table 2
- HLog log = cluster.getRegionServer(0).getWAL();
+ WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
.getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
@@ -129,17 +129,26 @@ public class TestWALPlayer {
}
/**
- * Test HLogKeyValueMapper setup and map
+ * Test WALKeyValueMapper setup and map
*/
@Test
- public void testHLogKeyValueMapper() throws Exception {
+ public void testWALKeyValueMapper() throws Exception {
+ testWALKeyValueMapper(WALPlayer.TABLES_KEY);
+ }
+
+ @Test
+ public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception {
+ testWALKeyValueMapper("hlog.input.tables");
+ }
+
+ private void testWALKeyValueMapper(final String tableConfigKey) throws Exception {
Configuration configuration = new Configuration();
- configuration.set(WALPlayer.TABLES_KEY, "table");
- HLogKeyValueMapper mapper = new HLogKeyValueMapper();
- HLogKey key = mock(HLogKey.class);
+ configuration.set(tableConfigKey, "table");
+ WALKeyValueMapper mapper = new WALKeyValueMapper();
+ WALKey key = mock(WALKey.class);
when(key.getTablename()).thenReturn(TableName.valueOf("table"));
@SuppressWarnings("unchecked")
- Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
+ Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context =
mock(Context.class);
when(context.getConfiguration()).thenReturn(configuration);
@@ -191,7 +200,7 @@ public class TestWALPlayer {
assertTrue(data.toString().contains("ERROR: Wrong number of arguments:"));
assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" +
" <tables> [<tableMappings>]"));
- assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output"));
+ assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output"));
}
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
new file mode 100644
index 0000000..62b0f1d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * JUnit tests for the WALRecordReader
+ */
+@Category(MediumTests.class)
+public class TestWALRecordReader {
+ private final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static Configuration conf;
+ private static FileSystem fs;
+ private static Path hbaseDir;
+ // visible for TestHLogRecordReader
+ static final TableName tableName = TableName.valueOf(getName());
+ private static final byte [] rowName = tableName.getName();
+ // visible for TestHLogRecordReader
+ static final HRegionInfo info = new HRegionInfo(tableName,
+ Bytes.toBytes(""), Bytes.toBytes(""), false);
+ private static final byte [] family = Bytes.toBytes("column");
+ private static final byte [] value = Bytes.toBytes("value");
+ private static HTableDescriptor htd;
+ private static Path logDir;
+
+ private static String getName() {
+ return "TestWALRecordReader";
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ FileStatus[] entries = fs.listStatus(hbaseDir);
+ for (FileStatus dir : entries) {
+ fs.delete(dir.getPath(), true);
+ }
+
+ }
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Make block sizes small.
+ conf = TEST_UTIL.getConfiguration();
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ conf.setInt("dfs.replication", 1);
+ TEST_UTIL.startMiniDFSCluster(1);
+
+ conf = TEST_UTIL.getConfiguration();
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+
+ hbaseDir = TEST_UTIL.createRootDir();
+
+ logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
+
+ htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Test partial reads from the log based on passed time range
+ * @throws Exception
+ */
+ @Test
+ public void testPartialRead() throws Exception {
+ final WALFactory walfactory = new WALFactory(conf, null, getName());
+ WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
+ // This test depends on timestamp being millisecond based and the filename of the WAL also
+ // being millisecond based.
+ long ts = System.currentTimeMillis();
+ WALEdit edit = new WALEdit();
+ final AtomicLong sequenceId = new AtomicLong(0);
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
+ log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null);
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
+ log.append(htd, info, getWalKey(ts+1), edit, sequenceId,
+ true, null);
+ log.sync();
+ LOG.info("Before 1st WAL roll " + log.toString());
+ log.rollWriter();
+ LOG.info("Past 1st WAL roll " + log.toString());
+
+ Thread.sleep(1);
+ long ts1 = System.currentTimeMillis();
+
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
+ log.append(htd, info, getWalKey(ts1+1), edit, sequenceId,
+ true, null);
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
+ log.append(htd, info, getWalKey(ts1+2), edit, sequenceId,
+ true, null);
+ log.sync();
+ log.shutdown();
+ walfactory.shutdown();
+ LOG.info("Closed WAL " + log.toString());
+
+
+ WALInputFormat input = new WALInputFormat();
+ Configuration jobConf = new Configuration(conf);
+ jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, ts);
+
+ // only 1st file is considered, and only its 1st entry is used
+ List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+
+ assertEquals(1, splits.size());
+ testSplit(splits.get(0), Bytes.toBytes("1"));
+
+ jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1);
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1);
+ splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ // both files need to be considered
+ assertEquals(2, splits.size());
+ // only the 2nd entry from the 1st file is used
+ testSplit(splits.get(0), Bytes.toBytes("2"));
+ // only the 1nd entry from the 2nd file is used
+ testSplit(splits.get(1), Bytes.toBytes("3"));
+ }
+
+ /**
+ * Test basic functionality
+ * @throws Exception
+ */
+ @Test
+ public void testWALRecordReader() throws Exception {
+ final WALFactory walfactory = new WALFactory(conf, null, getName());
+ WAL log = walfactory.getWAL(info.getEncodedNameAsBytes());
+ byte [] value = Bytes.toBytes("value");
+ final AtomicLong sequenceId = new AtomicLong(0);
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
+ System.currentTimeMillis(), value));
+ long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
+ null);
+ log.sync(txid);
+
+ Thread.sleep(1); // make sure 2nd log gets a later timestamp
+ long secondTs = System.currentTimeMillis();
+ log.rollWriter();
+
+ edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
+ System.currentTimeMillis(), value));
+ txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true,
+ null);
+ log.sync(txid);
+ log.shutdown();
+ walfactory.shutdown();
+ long thirdTs = System.currentTimeMillis();
+
+ // should have 2 log files now
+ WALInputFormat input = new WALInputFormat();
+ Configuration jobConf = new Configuration(conf);
+ jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString());
+
+ // make sure both logs are found
+ List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ assertEquals(2, splits.size());
+
+ // should return exactly one KV
+ testSplit(splits.get(0), Bytes.toBytes("1"));
+ // same for the 2nd split
+ testSplit(splits.get(1), Bytes.toBytes("2"));
+
+ // now test basic time ranges:
+
+ // set an endtime, the 2nd log file can be ignored completely.
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1);
+ splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ assertEquals(1, splits.size());
+ testSplit(splits.get(0), Bytes.toBytes("1"));
+
+ // now set a start time
+ jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE);
+ jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs);
+ splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf));
+ // both logs need to be considered
+ assertEquals(2, splits.size());
+ // but both readers skip all edits
+ testSplit(splits.get(0));
+ testSplit(splits.get(1));
+ }
+
+ protected WALKey getWalKey(final long sequenceid) {
+ return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid);
+ }
+
+ protected WALRecordReader getReader() {
+ return new WALKeyRecordReader();
+ }
+
+ /**
+ * Create a new reader from the split, and match the edits against the passed columns.
+ */
+ private void testSplit(InputSplit split, byte[]... columns) throws Exception {
+ final WALRecordReader reader = getReader();
+ reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf));
+
+ for (byte[] column : columns) {
+ assertTrue(reader.nextKeyValue());
+ Cell cell = reader.getCurrentValue().getCells().get(0);
+ if (!Bytes.equals(column, cell.getQualifier())) {
+ assertTrue("expected [" + Bytes.toString(column) + "], actual ["
+ + Bytes.toString(cell.getQualifier()) + "]", false);
+ }
+ }
+ assertFalse(reader.nextKeyValue());
+ reader.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 8aac384..d613852 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -97,7 +97,7 @@ import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -527,7 +527,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
}
@Override
- public HLog getWAL(HRegionInfo regionInfo) throws IOException {
+ public WAL getWAL(HRegionInfo regionInfo) throws IOException {
// TODO Auto-generated method stub
return null;
}