You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:43 UTC
[04/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
deleted file mode 100644
index dfc89bc..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.LargeTests;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import static org.junit.Assert.*;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-@Category(LargeTests.class)
-@RunWith(Parameterized.class)
-public class TestReplicationHLogReaderManager {
-
- private static HBaseTestingUtility TEST_UTIL;
- private static Configuration conf;
- private static Path hbaseDir;
- private static FileSystem fs;
- private static MiniDFSCluster cluster;
- private static final TableName tableName = TableName.valueOf("tablename");
- private static final byte [] family = Bytes.toBytes("column");
- private static final byte [] qualifier = Bytes.toBytes("qualifier");
- private static final HRegionInfo info = new HRegionInfo(tableName,
- HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
- private static final HTableDescriptor htd = new HTableDescriptor(tableName);
-
- private HLog log;
- private ReplicationHLogReaderManager logManager;
- private PathWatcher pathWatcher;
- private int nbRows;
- private int walEditKVs;
- private final AtomicLong sequenceId = new AtomicLong(1);
-
- @Parameters
- public static Collection<Object[]> parameters() {
- // Try out different combinations of row count and KeyValue count
- int[] NB_ROWS = { 1500, 60000 };
- int[] NB_KVS = { 1, 100 };
- // whether compression is used
- Boolean[] BOOL_VALS = { false, true };
- List<Object[]> parameters = new ArrayList<Object[]>();
- for (int nbRows : NB_ROWS) {
- for (int walEditKVs : NB_KVS) {
- for (boolean b : BOOL_VALS) {
- Object[] arr = new Object[3];
- arr[0] = nbRows;
- arr[1] = walEditKVs;
- arr[2] = b;
- parameters.add(arr);
- }
- }
- }
- return parameters;
- }
-
- public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
- this.nbRows = nbRows;
- this.walEditKVs = walEditKVs;
- TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
- enableCompression);
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- TEST_UTIL = new HBaseTestingUtility();
- conf = TEST_UTIL.getConfiguration();
- TEST_UTIL.startMiniDFSCluster(3);
-
- hbaseDir = TEST_UTIL.createRootDir();
- cluster = TEST_UTIL.getDFSCluster();
- fs = cluster.getFileSystem();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- @Before
- public void setUp() throws Exception {
- logManager = new ReplicationHLogReaderManager(fs, conf);
- List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
- pathWatcher = new PathWatcher();
- listeners.add(pathWatcher);
- log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server");
- }
-
- @After
- public void tearDown() throws Exception {
- log.closeAndDelete();
- }
-
- @Test
- public void test() throws Exception {
- // Grab the path that was generated when the log rolled as part of its creation
- Path path = pathWatcher.currentPath;
-
- assertEquals(0, logManager.getPosition());
-
- appendToLog();
-
- // There's one edit in the log, read it. Reading past it needs to return nulls
- assertNotNull(logManager.openReader(path));
- logManager.seek();
- HLog.Entry entry = logManager.readNextAndSetPosition();
- assertNotNull(entry);
- entry = logManager.readNextAndSetPosition();
- assertNull(entry);
- logManager.closeReader();
- long oldPos = logManager.getPosition();
-
- appendToLog();
-
- // Read the newly added entry, make sure we made progress
- assertNotNull(logManager.openReader(path));
- logManager.seek();
- entry = logManager.readNextAndSetPosition();
- assertNotEquals(oldPos, logManager.getPosition());
- assertNotNull(entry);
- logManager.closeReader();
- oldPos = logManager.getPosition();
-
- log.rollWriter();
-
- // We rolled but we still should see the end of the first log and not get data
- assertNotNull(logManager.openReader(path));
- logManager.seek();
- entry = logManager.readNextAndSetPosition();
- assertEquals(oldPos, logManager.getPosition());
- assertNull(entry);
- logManager.finishCurrentFile();
-
- path = pathWatcher.currentPath;
-
- for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
- log.rollWriter();
- logManager.openReader(path);
- logManager.seek();
- for (int i = 0; i < nbRows; i++) {
- HLog.Entry e = logManager.readNextAndSetPosition();
- if (e == null) {
- fail("Should have enough entries");
- }
- }
- }
-
- private void appendToLog() throws IOException {
- appendToLogPlus(1);
- }
-
- private void appendToLogPlus(int count) throws IOException {
- log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId);
- }
-
- private WALEdit getWALEdits(int count) {
- WALEdit edit = new WALEdit();
- for (int i = 0; i < count; i++) {
- edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
- System.currentTimeMillis(), qualifier));
- }
- return edit;
- }
-
- class PathWatcher implements WALActionsListener {
-
- Path currentPath;
-
- @Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- currentPath = newPath;
- }
-
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {}
-
- @Override
- public void preLogArchive(Path oldPath, Path newPath) throws IOException {}
-
- @Override
- public void postLogArchive(Path oldPath, Path newPath) throws IOException {}
-
- @Override
- public void logRollRequested() {}
-
- @Override
- public void logCloseRequested() {}
-
- @Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {}
-
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {}
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 7344190..66ed6b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -51,11 +51,11 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -144,6 +145,7 @@ public class TestReplicationSourceManager {
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId());
+ FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
@@ -193,8 +195,9 @@ public class TestReplicationSourceManager {
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
listeners.add(replication);
- HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), "testLogRoll",
- conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8"));
+ final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+ URLEncoder.encode("regionserver:60020", "UTF8"));
+ final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes());
final AtomicLong sequenceId = new AtomicLong(1);
manager.init();
HTableDescriptor htd = new HTableDescriptor();
@@ -202,12 +205,12 @@ public class TestReplicationSourceManager {
// Testing normal log rolling every 20
for(long i = 1; i < 101; i++) {
if(i > 1 && i % 20 == 0) {
- hlog.rollWriter();
+ wal.rollWriter();
}
LOG.info(i);
- HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
- System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
- hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
+ final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
+ System.currentTimeMillis()), edit, sequenceId, true ,null);
+ wal.sync(txid);
}
// Simulate a rapid insert that's followed
@@ -218,22 +221,26 @@ public class TestReplicationSourceManager {
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
- hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
+ System.currentTimeMillis()), edit, sequenceId, true, null);
}
+ wal.sync();
- assertEquals(6, manager.getHLogs().get(slaveId).size());
+ assertEquals(6, manager.getWALs().get(slaveId).size());
- hlog.rollWriter();
+ wal.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
- hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
+ wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), test,
+ System.currentTimeMillis()), edit, sequenceId, true, null);
+ wal.sync();
- assertEquals(1, manager.getHLogs().size());
+ assertEquals(1, manager.getWALs().size());
- // TODO Need a case with only 2 HLogs and we only want to delete the first one
+ // TODO Need a case with only 2 WALs and we only want to delete the first one
}
@Test
@@ -305,12 +312,12 @@ public class TestReplicationSourceManager {
new Long(1), new Long(2)));
w1.start();
w1.join(5000);
- assertEquals(1, manager.getHlogsByIdRecoveredQueues().size());
+ assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
- assertEquals(files, manager.getHlogsByIdRecoveredQueues().get(id));
+ assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id));
manager.cleanOldLogs("log2", id, true);
// log1 should be deleted
- assertEquals(Sets.newHashSet("log2"), manager.getHlogsByIdRecoveredQueues().get(id));
+ assertEquals(Sets.newHashSet("log2"), manager.getWalsByIdRecoveredQueues().get(id));
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
new file mode 100644
index 0000000..b9f4928
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationWALReaderManager.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Category(LargeTests.class)
+@RunWith(Parameterized.class)
+public class TestReplicationWALReaderManager {
+
+ private static HBaseTestingUtility TEST_UTIL;
+ private static Configuration conf;
+ private static Path hbaseDir;
+ private static FileSystem fs;
+ private static MiniDFSCluster cluster;
+ private static final TableName tableName = TableName.valueOf("tablename");
+ private static final byte [] family = Bytes.toBytes("column");
+ private static final byte [] qualifier = Bytes.toBytes("qualifier");
+ private static final HRegionInfo info = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
+ private static final HTableDescriptor htd = new HTableDescriptor(tableName);
+
+ private WAL log;
+ private ReplicationWALReaderManager logManager;
+ private PathWatcher pathWatcher;
+ private int nbRows;
+ private int walEditKVs;
+ private final AtomicLong sequenceId = new AtomicLong(1);
+
+ @Parameters
+ public static Collection<Object[]> parameters() {
+ // Try out different combinations of row count and KeyValue count
+ int[] NB_ROWS = { 1500, 60000 };
+ int[] NB_KVS = { 1, 100 };
+ // whether compression is used
+ Boolean[] BOOL_VALS = { false, true };
+ List<Object[]> parameters = new ArrayList<Object[]>();
+ for (int nbRows : NB_ROWS) {
+ for (int walEditKVs : NB_KVS) {
+ for (boolean b : BOOL_VALS) {
+ Object[] arr = new Object[3];
+ arr[0] = nbRows;
+ arr[1] = walEditKVs;
+ arr[2] = b;
+ parameters.add(arr);
+ }
+ }
+ }
+ return parameters;
+ }
+
+ public TestReplicationWALReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
+ this.nbRows = nbRows;
+ this.walEditKVs = walEditKVs;
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
+ enableCompression);
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.startMiniDFSCluster(3);
+
+ hbaseDir = TEST_UTIL.createRootDir();
+ cluster = TEST_UTIL.getDFSCluster();
+ fs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ logManager = new ReplicationWALReaderManager(fs, conf);
+ List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
+ pathWatcher = new PathWatcher();
+ listeners.add(pathWatcher);
+ final WALFactory wals = new WALFactory(conf, listeners, "some server");
+ log = wals.getWAL(info.getEncodedNameAsBytes());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ log.close();
+ }
+
+ @Test
+ public void test() throws Exception {
+ // Grab the path that was generated when the log rolled as part of its creation
+ Path path = pathWatcher.currentPath;
+
+ assertEquals(0, logManager.getPosition());
+
+ appendToLog();
+
+ // There's one edit in the log, read it. Reading past it needs to return nulls
+ assertNotNull(logManager.openReader(path));
+ logManager.seek();
+ WAL.Entry entry = logManager.readNextAndSetPosition();
+ assertNotNull(entry);
+ entry = logManager.readNextAndSetPosition();
+ assertNull(entry);
+ logManager.closeReader();
+ long oldPos = logManager.getPosition();
+
+ appendToLog();
+
+ // Read the newly added entry, make sure we made progress
+ assertNotNull(logManager.openReader(path));
+ logManager.seek();
+ entry = logManager.readNextAndSetPosition();
+ assertNotEquals(oldPos, logManager.getPosition());
+ assertNotNull(entry);
+ logManager.closeReader();
+ oldPos = logManager.getPosition();
+
+ log.rollWriter();
+
+ // We rolled but we still should see the end of the first log and not get data
+ assertNotNull(logManager.openReader(path));
+ logManager.seek();
+ entry = logManager.readNextAndSetPosition();
+ assertEquals(oldPos, logManager.getPosition());
+ assertNull(entry);
+ logManager.finishCurrentFile();
+
+ path = pathWatcher.currentPath;
+
+ for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
+ log.rollWriter();
+ logManager.openReader(path);
+ logManager.seek();
+ for (int i = 0; i < nbRows; i++) {
+ WAL.Entry e = logManager.readNextAndSetPosition();
+ if (e == null) {
+ fail("Should have enough entries");
+ }
+ }
+ }
+
+ private void appendToLog() throws IOException {
+ appendToLogPlus(1);
+ }
+
+ private void appendToLogPlus(int count) throws IOException {
+ final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), getWALEdits(count), sequenceId, true, null);
+ log.sync(txid);
+ }
+
+ private WALEdit getWALEdits(int count) {
+ WALEdit edit = new WALEdit();
+ for (int i = 0; i < count; i++) {
+ edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
+ System.currentTimeMillis(), qualifier));
+ }
+ return edit;
+ }
+
+ class PathWatcher extends WALActionsListener.Base {
+
+ Path currentPath;
+
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+ currentPath = newPath;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
index d87a884..5557781 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java
@@ -68,7 +68,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.VisibilityController.VisibilityReplication;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
index 01ceb3c..394fa6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.junit.*;
import org.junit.experimental.categories.Category;
@@ -177,7 +177,7 @@ public class TestFSVisitor {
private void createRecoverEdits(final Path tableDir, final Set<String> tableRegions,
final Set<String> recoverEdits) throws IOException {
for (String region: tableRegions) {
- Path regionEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(new Path(tableDir, region));
+ Path regionEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(new Path(tableDir, region));
long seqId = System.currentTimeMillis();
for (int i = 0; i < 3; ++i) {
String editName = String.format("%019d", seqId + i);
@@ -190,12 +190,20 @@ public class TestFSVisitor {
}
/*
+ * Old style
* |-.logs/
* |----server5,5,1351969633508/
* |-------server5,5,1351969633508.0
* |----server6,6,1351969633512/
* |-------server6,6,1351969633512.0
* |-------server6,6,1351969633512.3
+ * New style
+ * |-.logs/
+ * |----server3,5,1351969633508/
+ * |-------server3,5,1351969633508.default.0
+ * |----server4,6,1351969633512/
+ * |-------server4,6,1351969633512.default.0
+ * |-------server4,6,1351969633512.some_provider.3
*/
private void createLogs(final Path logDir, final Set<String> servers,
final Set<String> logs) throws IOException {
@@ -203,6 +211,13 @@ public class TestFSVisitor {
String server = String.format("server%d,%d,%d", s, s, System.currentTimeMillis());
servers.add(server);
Path serverLogDir = new Path(logDir, server);
+ if (s % 2 == 0) {
+ if (s % 3 == 0) {
+ server += ".default";
+ } else {
+ server += "." + s;
+ }
+ }
fs.mkdirs(serverLogDir);
for (int i = 0; i < 5; ++i) {
String logfile = server + '.' + i;
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
index 0992287..4f6e3ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
@@ -1131,7 +1131,7 @@ public class TestHBaseFsck {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table);
// Mess it up by leaving a hole in the hdfs data
@@ -1164,7 +1164,7 @@ public class TestHBaseFsck {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table);
// Mess it up by deleting hdfs dirs
@@ -1298,14 +1298,14 @@ public class TestHBaseFsck {
TableName.valueOf("testFixByTable2");
try {
setupTable(table1);
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table1);
// Mess them up by leaving a hole in the hdfs data
deleteRegion(conf, tbl.getTableDescriptor(), Bytes.toBytes("B"),
Bytes.toBytes("C"), false, false, true); // don't rm meta
setupTable(table2);
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table2);
// Mess them up by leaving a hole in the hdfs data
deleteRegion(conf, tbl.getTableDescriptor(), Bytes.toBytes("B"),
@@ -1345,7 +1345,7 @@ public class TestHBaseFsck {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table);
HRegionLocation location = tbl.getRegionLocation("B");
@@ -1425,7 +1425,7 @@ public class TestHBaseFsck {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table);
HRegionLocation location = tbl.getRegionLocation("B");
@@ -1475,7 +1475,7 @@ public class TestHBaseFsck {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table);
HRegionLocation location = tbl.getRegionLocation("B");
@@ -2330,7 +2330,7 @@ public class TestHBaseFsck {
setupTable(table);
assertEquals(ROWKEYS.length, countRows());
- // make sure data in regions, if in hlog only there is no data loss
+ // make sure data in regions, if in wal only there is no data loss
TEST_UTIL.getHBaseAdmin().flush(table);
HRegionInfo region1 = tbl.getRegionLocation("A").getRegionInfo();
HRegionInfo region2 = tbl.getRegionLocation("B").getRegionInfo();
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
index 563d51d..5ab0d43 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestMergeTool.java
@@ -41,8 +41,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.ToolRunner;
import org.junit.experimental.categories.Category;
@@ -61,6 +61,7 @@ public class TestMergeTool extends HBaseTestCase {
private HTableDescriptor desc;
private byte [][][] rows;
private MiniDFSCluster dfsCluster = null;
+ private WALFactory wals;
@Override
public void setUp() throws Exception {
@@ -143,6 +144,7 @@ public class TestMergeTool extends HBaseTestCase {
// we will end up with a local file system
super.setUp();
+ wals = new WALFactory(conf, null, "TestMergeTool");
try {
// Create meta region
createMetaRegion();
@@ -183,6 +185,7 @@ public class TestMergeTool extends HBaseTestCase {
HRegion.closeHRegion(r);
}
}
+ wals.close();
TEST_UTIL.shutdownMiniCluster();
}
@@ -196,7 +199,7 @@ public class TestMergeTool extends HBaseTestCase {
* @throws Exception
*/
private HRegion mergeAndVerify(final String msg, final String regionName1,
- final String regionName2, final HLog log, final int upperbound)
+ final String regionName2, final WAL log, final int upperbound)
throws Exception {
Merge merger = new Merge(this.conf);
LOG.info(msg);
@@ -269,39 +272,26 @@ public class TestMergeTool extends HBaseTestCase {
// Close the region and delete the log
HRegion.closeHRegion(regions[i]);
}
-
- // Create a log that we can reuse when we need to open regions
- Path logPath = new Path("/tmp");
- String logName = HConstants.HREGION_LOGDIR_NAME + "_"
- + System.currentTimeMillis();
- LOG.info("Creating log " + logPath.toString() + "/" + logName);
-
- HLog log = HLogFactory.createHLog(this.fs, logPath,
- logName, this.conf);
-
- try {
- // Merge Region 0 and Region 1
- HRegion merged = mergeAndVerify("merging regions 0 and 1 ",
- this.sourceRegions[0].getRegionNameAsString(),
- this.sourceRegions[1].getRegionNameAsString(), log, 2);
-
- // Merge the result of merging regions 0 and 1 with region 2
- merged = mergeAndVerify("merging regions 0+1 and 2",
- merged.getRegionInfo().getRegionNameAsString(),
- this.sourceRegions[2].getRegionNameAsString(), log, 3);
-
- // Merge the result of merging regions 0, 1 and 2 with region 3
- merged = mergeAndVerify("merging regions 0+1+2 and 3",
- merged.getRegionInfo().getRegionNameAsString(),
- this.sourceRegions[3].getRegionNameAsString(), log, 4);
-
- // Merge the result of merging regions 0, 1, 2 and 3 with region 4
- merged = mergeAndVerify("merging regions 0+1+2+3 and 4",
- merged.getRegionInfo().getRegionNameAsString(),
- this.sourceRegions[4].getRegionNameAsString(), log, rows.length);
- } finally {
- log.closeAndDelete();
- }
+ WAL log = wals.getWAL(new byte[]{});
+ // Merge Region 0 and Region 1
+ HRegion merged = mergeAndVerify("merging regions 0 and 1 ",
+ this.sourceRegions[0].getRegionNameAsString(),
+ this.sourceRegions[1].getRegionNameAsString(), log, 2);
+
+ // Merge the result of merging regions 0 and 1 with region 2
+ merged = mergeAndVerify("merging regions 0+1 and 2",
+ merged.getRegionInfo().getRegionNameAsString(),
+ this.sourceRegions[2].getRegionNameAsString(), log, 3);
+
+ // Merge the result of merging regions 0, 1 and 2 with region 3
+ merged = mergeAndVerify("merging regions 0+1+2 and 3",
+ merged.getRegionInfo().getRegionNameAsString(),
+ this.sourceRegions[3].getRegionNameAsString(), log, 4);
+
+ // Merge the result of merging regions 0, 1, 2 and 3 with region 4
+ merged = mergeAndVerify("merging regions 0+1+2+3 and 4",
+ merged.getRegionInfo().getRegionNameAsString(),
+ this.sourceRegions[4].getRegionNameAsString(), log, rows.length);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
new file mode 100644
index 0000000..3212822
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+
+// imports for things that haven't moved yet
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * This is a utility class, used by tests, which fails operation specified by FailureType enum
+ */
+@InterfaceAudience.Private
+public class FaultyFSLog extends FSHLog {
+ public enum FailureType {
+ NONE, APPEND, SYNC
+ }
+ FailureType ft = FailureType.NONE;
+
+ public FaultyFSLog(FileSystem fs, Path rootDir, String logName, Configuration conf)
+ throws IOException {
+ super(fs, rootDir, logName, conf);
+ }
+
+ public void setFailureType(FailureType fType) {
+ this.ft = fType;
+ }
+
+ @Override
+ public void sync(long txid) throws IOException {
+ if (this.ft == FailureType.SYNC) {
+ throw new IOException("sync");
+ }
+ super.sync(txid);
+ }
+
+ @Override
+ public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
+ AtomicLong sequenceId, boolean isInMemstore, List<Cell> cells) throws IOException {
+ if (this.ft == FailureType.APPEND) {
+ throw new IOException("append");
+ }
+ return super.append(htd, info, key, edits, sequenceId, isInMemstore, cells);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
new file mode 100644
index 0000000..6bedc5e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
@@ -0,0 +1,332 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+@Category(MediumTests.class)
+public class TestDefaultWALProvider {
+ protected static final Log LOG = LogFactory.getLog(TestDefaultWALProvider.class);
+
+ protected static Configuration conf;
+ protected static FileSystem fs;
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Rule
+ public final TestName currentTest = new TestName();
+
+ @Before
+ public void setUp() throws Exception {
+ FileStatus[] entries = fs.listStatus(new Path("/"));
+ for (FileStatus dir : entries) {
+ fs.delete(dir.getPath(), true);
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Make block sizes small.
+ TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+ // quicker heartbeat interval for faster DN death notification
+ TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+ TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+ TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+ // faster failover with cluster.shutdown();fs.close() idiom
+ TEST_UTIL.getConfiguration()
+ .setInt("hbase.ipc.client.connect.max.retries", 1);
+ TEST_UTIL.getConfiguration().setInt(
+ "dfs.client.block.recovery.retries", 1);
+ TEST_UTIL.getConfiguration().setInt(
+ "hbase.ipc.client.connection.maxidletime", 500);
+ TEST_UTIL.startMiniDFSCluster(3);
+
+ // Set up a working space for our tests.
+ TEST_UTIL.createRootDir();
+ conf = TEST_UTIL.getConfiguration();
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ static String getName() {
+ return "TestDefaultWALProvider";
+ }
+
+ @Test
+ public void testGetServerNameFromWALDirectoryName() throws IOException {
+ ServerName sn = ServerName.valueOf("hn", 450, 1398);
+ String hl = FSUtils.getRootDir(conf) + "/" +
+ DefaultWALProvider.getWALDirectoryName(sn.toString());
+
+ // Must not throw exception
+ assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, null));
+ assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
+ FSUtils.getRootDir(conf).toUri().toString()));
+ assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, ""));
+ assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, " "));
+ assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl));
+ assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "qdf"));
+ assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, "sfqf" + hl + "qdf"));
+
+ final String wals = "/WALs/";
+ ServerName parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
+ FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
+ "/localhost%2C32984%2C1343316388997.1343316390417");
+ assertEquals("standard", sn, parsed);
+
+ parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl + "/qdf");
+ assertEquals("subdir", sn, parsed);
+
+ parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
+ FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
+ "-splitting/localhost%3A57020.1340474893931");
+ assertEquals("split", sn, parsed);
+ }
+
+
+ protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+ int times, AtomicLong sequenceId) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor();
+ htd.addFamily(new HColumnDescriptor("row"));
+
+ final byte [] row = Bytes.toBytes("row");
+ for (int i = 0; i < times; i++) {
+ long timestamp = System.currentTimeMillis();
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, row, row, timestamp, row));
+ log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
+ sequenceId, true, null);
+ }
+ log.sync();
+ }
+
+ /**
+ * used by TestDefaultWALProviderWithHLogKey
+ */
+ WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
+ return new WALKey(info, tableName, timestamp);
+ }
+
+ /**
+ * helper method to simulate region flush for a WAL.
+ * @param wal
+ * @param regionEncodedName
+ */
+ protected void flushRegion(WAL wal, byte[] regionEncodedName) {
+ wal.startCacheFlush(regionEncodedName);
+ wal.completeCacheFlush(regionEncodedName);
+ }
+
+ private static final byte[] UNSPECIFIED_REGION = new byte[]{};
+
+ @Test
+ public void testLogCleaning() throws Exception {
+ LOG.info("testLogCleaning");
+ final TableName tableName =
+ TableName.valueOf("testLogCleaning");
+ final TableName tableName2 =
+ TableName.valueOf("testLogCleaning2");
+ final Configuration localConf = new Configuration(conf);
+ localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
+ final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
+ final AtomicLong sequenceId = new AtomicLong(1);
+ try {
+ HRegionInfo hri = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HRegionInfo hri2 = new HRegionInfo(tableName2,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ // we want to mix edits from regions, so pick our own identifier.
+ final WAL log = wals.getWAL(UNSPECIFIED_REGION);
+
+ // Add a single edit and make sure that rolling won't remove the file
+ // Before HBASE-3198 it used to delete it
+ addEdits(log, hri, tableName, 1, sequenceId);
+ log.rollWriter();
+ assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
+
+ // See if there's anything wrong with more than 1 edit
+ addEdits(log, hri, tableName, 2, sequenceId);
+ log.rollWriter();
+ assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
+
+ // Now mix edits from 2 regions, still no flushing
+ addEdits(log, hri, tableName, 1, sequenceId);
+ addEdits(log, hri2, tableName2, 1, sequenceId);
+ addEdits(log, hri, tableName, 1, sequenceId);
+ addEdits(log, hri2, tableName2, 1, sequenceId);
+ log.rollWriter();
+ assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
+
+ // Flush the first region, we expect to see the first two files getting
+ // archived. We need to append something or writer won't be rolled.
+ addEdits(log, hri2, tableName2, 1, sequenceId);
+ log.startCacheFlush(hri.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes());
+ log.rollWriter();
+ assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
+
+ // Flush the second region, which removes all the remaining output files
+ // since the oldest was completely flushed and the two others only contain
+ // flush information
+ addEdits(log, hri2, tableName2, 1, sequenceId);
+ log.startCacheFlush(hri2.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri2.getEncodedNameAsBytes());
+ log.rollWriter();
+ assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log));
+ } finally {
+ if (wals != null) {
+ wals.close();
+ }
+ }
+ }
+
+ /**
+ * Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs
+ * and also don't archive "live logs" (that is, a log with un-flushed entries).
+ * <p>
+ * This is what it does:
+ * It creates two regions, and does a series of inserts along with log rolling.
+ * Whenever a WAL is rolled, HLogBase checks previous wals for archiving. A wal is eligible for
+ * archiving if for all the regions which have entries in that wal file, have flushed - past
+ * their maximum sequence id in that wal file.
+ * <p>
+ * @throws IOException
+ */
+ @Test
+ public void testWALArchiving() throws IOException {
+ LOG.debug("testWALArchiving");
+ TableName table1 = TableName.valueOf("t1");
+ TableName table2 = TableName.valueOf("t2");
+ final Configuration localConf = new Configuration(conf);
+ localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
+ final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
+ try {
+ final WAL wal = wals.getWAL(UNSPECIFIED_REGION);
+ assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
+ HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW);
+ HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
+ HConstants.EMPTY_END_ROW);
+ // ensure that we don't split the regions.
+ hri1.setSplit(false);
+ hri2.setSplit(false);
+ // variables to mock region sequenceIds.
+ final AtomicLong sequenceId1 = new AtomicLong(1);
+ final AtomicLong sequenceId2 = new AtomicLong(1);
+ // start with the testing logic: insert a waledit, and roll writer
+ addEdits(wal, hri1, table1, 1, sequenceId1);
+ wal.rollWriter();
+ // assert that the wal is rolled
+ assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
+ // add edits in the second wal file, and roll writer.
+ addEdits(wal, hri1, table1, 1, sequenceId1);
+ wal.rollWriter();
+ // assert that the wal is rolled
+ assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
+ // add a waledit to table1, and flush the region.
+ addEdits(wal, hri1, table1, 3, sequenceId1);
+ flushRegion(wal, hri1.getEncodedNameAsBytes());
+ // roll log; all old logs should be archived.
+ wal.rollWriter();
+ assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
+ // add an edit to table2, and roll writer
+ addEdits(wal, hri2, table2, 1, sequenceId2);
+ wal.rollWriter();
+ assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
+ // add edits for table1, and roll writer
+ addEdits(wal, hri1, table1, 2, sequenceId1);
+ wal.rollWriter();
+ assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
+ // add edits for table2, and flush hri1.
+ addEdits(wal, hri2, table2, 2, sequenceId2);
+ flushRegion(wal, hri1.getEncodedNameAsBytes());
+ // the log : region-sequenceId map is
+ // log1: region2 (unflushed)
+ // log2: region1 (flushed)
+ // log3: region2 (unflushed)
+ // roll the writer; log2 should be archived.
+ wal.rollWriter();
+ assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
+ // flush region2, and all logs should be archived.
+ addEdits(wal, hri2, table2, 2, sequenceId2);
+ flushRegion(wal, hri2.getEncodedNameAsBytes());
+ wal.rollWriter();
+ assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
+ } finally {
+ if (wals != null) {
+ wals.close();
+ }
+ }
+ }
+
+ /**
+ * Write to a log file with three concurrent threads and verifying all data is written.
+ * @throws Exception
+ */
+ @Test
+ public void testConcurrentWrites() throws Exception {
+ // Run the WPE tool with three threads writing 3000 edits each concurrently.
+ // When done, verify that all edits were written.
+ int errCode = WALPerformanceEvaluation.
+ innerMain(new Configuration(TEST_UTIL.getConfiguration()),
+ new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
+ assertEquals(0, errCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
new file mode 100644
index 0000000..1e70be5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProviderWithHLogKey.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+
+@Category(LargeTests.class)
+public class TestDefaultWALProviderWithHLogKey extends TestDefaultWALProvider {
+ @Override
+ WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
+ return new HLogKey(info, tableName, timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
new file mode 100644
index 0000000..437599d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.log4j.Level;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestSecureWAL {
+ static final Log LOG = LogFactory.getLog(TestSecureWAL.class);
+ static {
+ ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
+ .getLogger().setLevel(Level.ALL);
+ };
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
+ conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
+ conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
+ WAL.Reader.class);
+ conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
+ WALProvider.Writer.class);
+ conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
+ FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
+ }
+
+ @Test
+ public void testSecureWAL() throws Exception {
+ TableName tableName = TableName.valueOf("TestSecureWAL");
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(tableName.getName()));
+ HRegionInfo regioninfo = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
+ final int total = 10;
+ final byte[] row = Bytes.toBytes("row");
+ final byte[] family = Bytes.toBytes("family");
+ final byte[] value = Bytes.toBytes("Test value");
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ final WALFactory wals = new WALFactory(TEST_UTIL.getConfiguration(), null, "TestSecureWAL");
+ final AtomicLong sequenceId = new AtomicLong(1);
+
+ // Write the WAL
+ final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
+
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
+ wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), kvs, sequenceId, true, null);
+ }
+ wal.sync();
+ final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
+ wals.shutdown();
+
+ // Insure edits are not plaintext
+ long length = fs.getFileStatus(walPath).getLen();
+ FSDataInputStream in = fs.open(walPath);
+ byte[] fileData = new byte[(int)length];
+ IOUtils.readFully(in, fileData);
+ in.close();
+ assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
+
+ // Confirm the WAL can be read back
+ WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
+ int count = 0;
+ WAL.Entry entry = new WAL.Entry();
+ while (reader.next(entry) != null) {
+ count++;
+ List<Cell> cells = entry.getEdit().getCells();
+ assertTrue("Should be one KV per WALEdit", cells.size() == 1);
+ for (Cell cell: cells) {
+ byte[] thisRow = cell.getRow();
+ assertTrue("Incorrect row", Bytes.equals(thisRow, row));
+ byte[] thisFamily = cell.getFamily();
+ assertTrue("Incorrect family", Bytes.equals(thisFamily, family));
+ byte[] thisValue = cell.getValue();
+ assertTrue("Incorrect value", Bytes.equals(thisValue, value));
+ }
+ }
+ assertEquals("Should have read back as many KVs as written", total, count);
+ reader.close();
+ }
+
+}