You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:42 UTC
[03/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
new file mode 100644
index 0000000..ab23f5f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -0,0 +1,725 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.BindException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * WAL tests that can be reused across providers.
+ */
+@Category(MediumTests.class)
+public class TestWALFactory {
+ protected static final Log LOG = LogFactory.getLog(TestWALFactory.class);
+
+ protected static Configuration conf;
+ private static MiniDFSCluster cluster;
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected static Path hbaseDir;
+
+ protected FileSystem fs;
+ protected Path dir;
+ protected WALFactory wals;
+
+ @Rule
+ public final TestName currentTest = new TestName();
+
+ @Before
+ public void setUp() throws Exception {
+ fs = cluster.getFileSystem();
+ dir = new Path(hbaseDir, currentTest.getMethodName());
+ wals = new WALFactory(conf, null, currentTest.getMethodName());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // testAppendClose closes the FileSystem, which will prevent us from closing cleanly here.
+ try {
+ wals.close();
+ } catch (IOException exception) {
+ LOG.warn("Encountered exception while closing wal factory. If you have other errors, this" +
+ " may be the cause. Message: " + exception);
+ LOG.debug("Exception details for failure to close wal factory.", exception);
+ }
+ FileStatus[] entries = fs.listStatus(new Path("/"));
+ for (FileStatus dir : entries) {
+ fs.delete(dir.getPath(), true);
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Make block sizes small.
+ TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+ // needed for testAppendClose()
+ TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true);
+ TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
+ // quicker heartbeat interval for faster DN death notification
+ TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+ TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+ TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+ // faster failover with cluster.shutdown();fs.close() idiom
+ TEST_UTIL.getConfiguration()
+ .setInt("hbase.ipc.client.connect.max.retries", 1);
+ TEST_UTIL.getConfiguration().setInt(
+ "dfs.client.block.recovery.retries", 1);
+ TEST_UTIL.getConfiguration().setInt(
+ "hbase.ipc.client.connection.maxidletime", 500);
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName());
+ TEST_UTIL.startMiniDFSCluster(3);
+
+ conf = TEST_UTIL.getConfiguration();
+ cluster = TEST_UTIL.getDFSCluster();
+
+ hbaseDir = TEST_UTIL.createRootDir();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void canCloseSingleton() throws IOException {
+ WALFactory.getInstance(conf).close();
+ }
+
+ /**
+ * Just write multiple logs then split. Before fix for HADOOP-2283, this
+ * would fail.
+ * @throws IOException
+ */
+ @Test
+ public void testSplit() throws IOException {
+ final TableName tableName = TableName.valueOf(currentTest.getMethodName());
+ final byte [] rowName = tableName.getName();
+ final Path logdir = new Path(hbaseDir,
+ DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
+ Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ final int howmany = 3;
+ HRegionInfo[] infos = new HRegionInfo[3];
+ Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
+ fs.mkdirs(tabledir);
+ for(int i = 0; i < howmany; i++) {
+ infos[i] = new HRegionInfo(tableName,
+ Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false);
+ fs.mkdirs(new Path(tabledir, infos[i].getEncodedName()));
+ LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString());
+ }
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor("column"));
+
+ // Add edits for three regions.
+ final AtomicLong sequenceId = new AtomicLong(1);
+ for (int ii = 0; ii < howmany; ii++) {
+ for (int i = 0; i < howmany; i++) {
+ final WAL log = wals.getWAL(infos[i].getEncodedNameAsBytes());
+ for (int j = 0; j < howmany; j++) {
+ WALEdit edit = new WALEdit();
+ byte [] family = Bytes.toBytes("column");
+ byte [] qualifier = Bytes.toBytes(Integer.toString(j));
+ byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
+ edit.add(new KeyValue(rowName, family, qualifier,
+ System.currentTimeMillis(), column));
+ LOG.info("Region " + i + ": " + edit);
+ log.append(htd, infos[i], new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), edit, sequenceId, true, null);
+ }
+ log.sync();
+ log.rollWriter();
+ }
+ }
+ wals.shutdown();
+ List<Path> splits = WALSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf, wals);
+ verifySplits(splits, howmany);
+ }
+
+ /**
+ * Test new HDFS-265 sync.
+ * @throws Exception
+ */
+ @Test
+ public void Broken_testSync() throws Exception {
+ TableName tableName = TableName.valueOf(currentTest.getMethodName());
+ // First verify that using streams all works.
+ Path p = new Path(dir, currentTest.getMethodName() + ".fsdos");
+ FSDataOutputStream out = fs.create(p);
+ out.write(tableName.getName());
+ Method syncMethod = null;
+ try {
+ syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
+ } catch (NoSuchMethodException e) {
+ try {
+ syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
+ } catch (NoSuchMethodException ex) {
+ fail("This version of Hadoop supports neither Syncable.sync() " +
+ "nor Syncable.hflush().");
+ }
+ }
+ syncMethod.invoke(out, new Object[]{});
+ FSDataInputStream in = fs.open(p);
+ assertTrue(in.available() > 0);
+ byte [] buffer = new byte [1024];
+ int read = in.read(buffer);
+ assertEquals(tableName.getName().length, read);
+ out.close();
+ in.close();
+
+ final AtomicLong sequenceId = new AtomicLong(1);
+ final int total = 20;
+ WAL.Reader reader = null;
+
+ try {
+ HRegionInfo info = new HRegionInfo(tableName,
+ null,null, false);
+ HTableDescriptor htd = new HTableDescriptor();
+ htd.addFamily(new HColumnDescriptor(tableName.getName()));
+ final WAL wal = wals.getWAL(info.getEncodedNameAsBytes());
+
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
+ wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), kvs, sequenceId, true, null);
+ }
+ // Now call sync and try reading. Opening a Reader before you sync just
+ // gives you EOFE.
+ wal.sync();
+ // Open a Reader.
+ Path walPath = DefaultWALProvider.getCurrentFileName(wal);
+ reader = wals.createReader(fs, walPath);
+ int count = 0;
+ WAL.Entry entry = new WAL.Entry();
+ while ((entry = reader.next(entry)) != null) count++;
+ assertEquals(total, count);
+ reader.close();
+ // Add test that checks to see that an open of a Reader works on a file
+ // that has had a sync done on it.
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
+ wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), kvs, sequenceId, true, null);
+ }
+ wal.sync();
+ reader = wals.createReader(fs, walPath);
+ count = 0;
+ while((entry = reader.next(entry)) != null) count++;
+ assertTrue(count >= total);
+ reader.close();
+ // If I sync, should see double the edits.
+ wal.sync();
+ reader = wals.createReader(fs, walPath);
+ count = 0;
+ while((entry = reader.next(entry)) != null) count++;
+ assertEquals(total * 2, count);
+ reader.close();
+ // Now do a test that ensures stuff works when we go over block boundary,
+ // especially that we return good length on file.
+ final byte [] value = new byte[1025 * 1024]; // Make a 1M value.
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
+ wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), kvs, sequenceId, true, null);
+ }
+ // Now I should have written out lots of blocks. Sync then read.
+ wal.sync();
+ reader = wals.createReader(fs, walPath);
+ count = 0;
+ while((entry = reader.next(entry)) != null) count++;
+ assertEquals(total * 3, count);
+ reader.close();
+ // shutdown and ensure that Reader gets right length also.
+ wal.shutdown();
+ reader = wals.createReader(fs, walPath);
+ count = 0;
+ while((entry = reader.next(entry)) != null) count++;
+ assertEquals(total * 3, count);
+ reader.close();
+ } finally {
+ if (reader != null) reader.close();
+ }
+ }
+
+ private void verifySplits(final List<Path> splits, final int howmany)
+ throws IOException {
+ assertEquals(howmany * howmany, splits.size());
+ for (int i = 0; i < splits.size(); i++) {
+ LOG.info("Verifying=" + splits.get(i));
+ WAL.Reader reader = wals.createReader(fs, splits.get(i));
+ try {
+ int count = 0;
+ String previousRegion = null;
+ long seqno = -1;
+ WAL.Entry entry = new WAL.Entry();
+ while((entry = reader.next(entry)) != null) {
+ WALKey key = entry.getKey();
+ String region = Bytes.toString(key.getEncodedRegionName());
+ // Assert that all edits are for same region.
+ if (previousRegion != null) {
+ assertEquals(previousRegion, region);
+ }
+ LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum());
+ assertTrue(seqno < key.getLogSeqNum());
+ seqno = key.getLogSeqNum();
+ previousRegion = region;
+ count++;
+ }
+ assertEquals(howmany, count);
+ } finally {
+ reader.close();
+ }
+ }
+ }
+
+ /*
+ * We pass different values to recoverFileLease() so that different code paths are covered
+ *
+ * For this test to pass, requires:
+ * 1. HDFS-200 (append support)
+ * 2. HDFS-988 (SafeMode should freeze file operations
+ * [FSNamesystem.nextGenerationStampForBlock])
+ * 3. HDFS-142 (on restart, maintain pendingCreates)
+ */
+ @Test (timeout=300000)
+ public void testAppendClose() throws Exception {
+ TableName tableName =
+ TableName.valueOf(currentTest.getMethodName());
+ HRegionInfo regioninfo = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
+
+ final WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
+ final AtomicLong sequenceId = new AtomicLong(1);
+ final int total = 20;
+
+ HTableDescriptor htd = new HTableDescriptor();
+ htd.addFamily(new HColumnDescriptor(tableName.getName()));
+
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
+ wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), kvs, sequenceId, true, null);
+ }
+ // Now call sync to send the data to HDFS datanodes
+ wal.sync();
+ int namenodePort = cluster.getNameNodePort();
+ final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
+
+
+ // Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
+ try {
+ DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
+ dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
+ TEST_UTIL.shutdownMiniDFSCluster();
+ try {
+ // wal.writer.close() will throw an exception,
+ // but still call this since it closes the LogSyncer thread first
+ wal.shutdown();
+ } catch (IOException e) {
+ LOG.info(e);
+ }
+ fs.close(); // closing FS last so DFSOutputStream can't call close
+ LOG.info("STOPPED first instance of the cluster");
+ } finally {
+ // Restart the cluster
+ while (cluster.isClusterUp()){
+ LOG.error("Waiting for cluster to go down");
+ Thread.sleep(1000);
+ }
+ assertFalse(cluster.isClusterUp());
+ cluster = null;
+ for (int i = 0; i < 100; i++) {
+ try {
+ cluster = TEST_UTIL.startMiniDFSClusterForTestWAL(namenodePort);
+ break;
+ } catch (BindException e) {
+ LOG.info("Sleeping. BindException bringing up new cluster");
+ Threads.sleep(1000);
+ }
+ }
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+ LOG.info("STARTED second instance.");
+ }
+
+ // set the lease period to be 1 second so that the
+ // namenode triggers lease recovery upon append request
+ Method setLeasePeriod = cluster.getClass()
+ .getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
+ setLeasePeriod.setAccessible(true);
+ setLeasePeriod.invoke(cluster, 1000L, 1000L);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.info(e);
+ }
+
+ // Now try recovering the log, like the HMaster would do
+ final FileSystem recoveredFs = fs;
+ final Configuration rlConf = conf;
+
+ class RecoverLogThread extends Thread {
+ public Exception exception = null;
+ public void run() {
+ try {
+ FSUtils.getInstance(fs, rlConf)
+ .recoverFileLease(recoveredFs, walPath, rlConf, null);
+ } catch (IOException e) {
+ exception = e;
+ }
+ }
+ }
+
+ RecoverLogThread t = new RecoverLogThread();
+ t.start();
+ // Timeout after 60 sec. Without correct patches, would be an infinite loop
+ t.join(60 * 1000);
+ if(t.isAlive()) {
+ t.interrupt();
+ throw new Exception("Timed out waiting for WAL.recoverLog()");
+ }
+
+ if (t.exception != null)
+ throw t.exception;
+
+ // Make sure you can read all the content
+ WAL.Reader reader = wals.createReader(fs, walPath);
+ int count = 0;
+ WAL.Entry entry = new WAL.Entry();
+ while (reader.next(entry) != null) {
+ count++;
+ assertTrue("Should be one KeyValue per WALEdit",
+ entry.getEdit().getCells().size() == 1);
+ }
+ assertEquals(total, count);
+ reader.close();
+
+ // Reset the lease period
+ setLeasePeriod.invoke(cluster, new Object[]{new Long(60000), new Long(3600000)});
+ }
+
+ /**
+ * Tests that we can write out an edit, close, and then read it back in again.
+ * @throws IOException
+ */
+ @Test
+ public void testEditAdd() throws IOException {
+ final int COL_COUNT = 10;
+ final TableName tableName =
+ TableName.valueOf("tablename");
+ final byte [] row = Bytes.toBytes("row");
+ WAL.Reader reader = null;
+ try {
+ final AtomicLong sequenceId = new AtomicLong(1);
+
+ // Write columns named 1, 2, 3, etc. and then values of single byte
+ // 1, 2, 3...
+ long timestamp = System.currentTimeMillis();
+ WALEdit cols = new WALEdit();
+ for (int i = 0; i < COL_COUNT; i++) {
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes(Integer.toString(i)),
+ timestamp, new byte[] { (byte)(i + '0') }));
+ }
+ HRegionInfo info = new HRegionInfo(tableName,
+ row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
+ HTableDescriptor htd = new HTableDescriptor();
+ htd.addFamily(new HColumnDescriptor("column"));
+ final WAL log = wals.getWAL(info.getEncodedNameAsBytes());
+
+ final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), cols, sequenceId, true, null);
+ log.sync(txid);
+ log.startCacheFlush(info.getEncodedNameAsBytes());
+ log.completeCacheFlush(info.getEncodedNameAsBytes());
+ log.shutdown();
+ Path filename = DefaultWALProvider.getCurrentFileName(log);
+ // Now open a reader on the log and assert append worked.
+ reader = wals.createReader(fs, filename);
+ // Above we added all columns on a single row so we only read one
+ // entry in the below... thats why we have '1'.
+ for (int i = 0; i < 1; i++) {
+ WAL.Entry entry = reader.next(null);
+ if (entry == null) break;
+ WALKey key = entry.getKey();
+ WALEdit val = entry.getEdit();
+ assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
+ assertTrue(tableName.equals(key.getTablename()));
+ Cell cell = val.getCells().get(0);
+ assertTrue(Bytes.equals(row, cell.getRow()));
+ assertEquals((byte)(i + '0'), cell.getValue()[0]);
+ System.out.println(key + " " + val);
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ /**
+ * @throws IOException
+ */
+ @Test
+ public void testAppend() throws IOException {
+ final int COL_COUNT = 10;
+ final TableName tableName =
+ TableName.valueOf("tablename");
+ final byte [] row = Bytes.toBytes("row");
+ WAL.Reader reader = null;
+ final AtomicLong sequenceId = new AtomicLong(1);
+ try {
+ // Write columns named 1, 2, 3, etc. and then values of single byte
+ // 1, 2, 3...
+ long timestamp = System.currentTimeMillis();
+ WALEdit cols = new WALEdit();
+ for (int i = 0; i < COL_COUNT; i++) {
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes(Integer.toString(i)),
+ timestamp, new byte[] { (byte)(i + '0') }));
+ }
+ HRegionInfo hri = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HTableDescriptor htd = new HTableDescriptor();
+ htd.addFamily(new HColumnDescriptor("column"));
+ final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
+ final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), cols, sequenceId, true, null);
+ log.sync(txid);
+ log.startCacheFlush(hri.getEncodedNameAsBytes());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes());
+ log.shutdown();
+ Path filename = DefaultWALProvider.getCurrentFileName(log);
+ // Now open a reader on the log and assert append worked.
+ reader = wals.createReader(fs, filename);
+ WAL.Entry entry = reader.next();
+ assertEquals(COL_COUNT, entry.getEdit().size());
+ int idx = 0;
+ for (Cell val : entry.getEdit().getCells()) {
+ assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
+ entry.getKey().getEncodedRegionName()));
+ assertTrue(tableName.equals(entry.getKey().getTablename()));
+ assertTrue(Bytes.equals(row, val.getRow()));
+ assertEquals((byte)(idx + '0'), val.getValue()[0]);
+ System.out.println(entry.getKey() + " " + val);
+ idx++;
+ }
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ /**
+ * Test that we can visit entries before they are appended
+ * @throws Exception
+ */
+ @Test
+ public void testVisitors() throws Exception {
+ final int COL_COUNT = 10;
+ final TableName tableName =
+ TableName.valueOf("tablename");
+ final byte [] row = Bytes.toBytes("row");
+ final DumbWALActionsListener visitor = new DumbWALActionsListener();
+ final AtomicLong sequenceId = new AtomicLong(1);
+ long timestamp = System.currentTimeMillis();
+ HTableDescriptor htd = new HTableDescriptor();
+ htd.addFamily(new HColumnDescriptor("column"));
+
+ HRegionInfo hri = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ final WAL log = wals.getWAL(hri.getEncodedNameAsBytes());
+ log.registerWALActionsListener(visitor);
+ for (int i = 0; i < COL_COUNT; i++) {
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes(Integer.toString(i)),
+ timestamp, new byte[]{(byte) (i + '0')}));
+ log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), cols, sequenceId, true, null);
+ }
+ log.sync();
+ assertEquals(COL_COUNT, visitor.increments);
+ log.unregisterWALActionsListener(visitor);
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes(Integer.toString(11)),
+ timestamp, new byte[]{(byte) (11 + '0')}));
+ log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), cols, sequenceId, true, null);
+ log.sync();
+ assertEquals(COL_COUNT, visitor.increments);
+ }
+
+ /**
+ * A loaded WAL coprocessor won't break existing WAL test cases.
+ */
+ @Test
+ public void testWALCoprocessorLoaded() throws Exception {
+ // test to see whether the coprocessor is loaded or not.
+ WALCoprocessorHost host = wals.getWAL(UNSPECIFIED_REGION).getCoprocessorHost();
+ Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+ assertNotNull(c);
+ }
+
+ /**
+ * @throws IOException
+ */
+ @Test
+ public void testReadLegacyLog() throws IOException {
+ final int columnCount = 5;
+ final int recordCount = 5;
+ final TableName tableName =
+ TableName.valueOf("tablename");
+ final byte[] row = Bytes.toBytes("row");
+ long timestamp = System.currentTimeMillis();
+ Path path = new Path(dir, "tempwal");
+ SequenceFileLogWriter sflw = null;
+ WAL.Reader reader = null;
+ try {
+ HRegionInfo hri = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ fs.mkdirs(dir);
+ // Write log in pre-PB format.
+ sflw = new SequenceFileLogWriter();
+ sflw.init(fs, path, conf, false);
+ for (int i = 0; i < recordCount; ++i) {
+ WALKey key = new HLogKey(
+ hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
+ WALEdit edit = new WALEdit();
+ for (int j = 0; j < columnCount; ++j) {
+ if (i == 0) {
+ htd.addFamily(new HColumnDescriptor("column" + j));
+ }
+ String value = i + "" + j;
+ edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value)));
+ }
+ sflw.append(new WAL.Entry(key, edit));
+ }
+ sflw.sync();
+ sflw.close();
+
+ // Now read the log using standard means.
+ reader = wals.createReader(fs, path);
+ assertTrue(reader instanceof SequenceFileLogReader);
+ for (int i = 0; i < recordCount; ++i) {
+ WAL.Entry entry = reader.next();
+ assertNotNull(entry);
+ assertEquals(columnCount, entry.getEdit().size());
+ assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
+ assertEquals(tableName, entry.getKey().getTablename());
+ int idx = 0;
+ for (Cell val : entry.getEdit().getCells()) {
+ assertTrue(Bytes.equals(row, val.getRow()));
+ String value = i + "" + idx;
+ assertArrayEquals(Bytes.toBytes(value), val.getValue());
+ idx++;
+ }
+ }
+ WAL.Entry entry = reader.next();
+ assertNull(entry);
+ } finally {
+ if (sflw != null) {
+ sflw.close();
+ }
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ static class DumbWALActionsListener extends WALActionsListener.Base {
+ int increments = 0;
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey,
+ WALEdit logEdit) {
+ increments++;
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ increments++;
+ }
+ }
+
+ private static final byte[] UNSPECIFIED_REGION = new byte[]{};
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFiltering.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFiltering.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFiltering.java
new file mode 100644
index 0000000..1ce3d14
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFiltering.java
@@ -0,0 +1,153 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
+
+@Category(MediumTests.class)
+public class TestWALFiltering {
+ private static final int NUM_MASTERS = 1;
+ private static final int NUM_RS = 4;
+
+ private static final TableName TABLE_NAME =
+ TableName.valueOf("TestWALFiltering");
+ private static final byte[] CF1 = Bytes.toBytes("MyCF1");
+ private static final byte[] CF2 = Bytes.toBytes("MyCF2");
+ private static final byte[][] FAMILIES = { CF1, CF2 };
+
+ private HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Before
+ public void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
+ fillTable();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private void fillTable() throws IOException, InterruptedException {
+ Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILIES, 3,
+ Bytes.toBytes("row0"), Bytes.toBytes("row99"), NUM_RS);
+ Random rand = new Random(19387129L);
+ for (int iStoreFile = 0; iStoreFile < 4; ++iStoreFile) {
+ for (int iRow = 0; iRow < 100; ++iRow) {
+ final byte[] row = Bytes.toBytes("row" + iRow);
+ Put put = new Put(row);
+ Delete del = new Delete(row);
+ for (int iCol = 0; iCol < 10; ++iCol) {
+ final byte[] cf = rand.nextBoolean() ? CF1 : CF2;
+ final long ts = Math.abs(rand.nextInt());
+ final byte[] qual = Bytes.toBytes("col" + iCol);
+ if (rand.nextBoolean()) {
+ final byte[] value = Bytes.toBytes("value_for_row_" + iRow +
+ "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" +
+ ts + "_random_" + rand.nextLong());
+ put.add(cf, qual, ts, value);
+ } else if (rand.nextDouble() < 0.8) {
+ del.deleteColumn(cf, qual, ts);
+ } else {
+ del.deleteColumns(cf, qual, ts);
+ }
+ }
+ table.put(put);
+ table.delete(del);
+ table.flushCommits();
+ }
+ }
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+ }
+
+ @Test
+ public void testFlushedSequenceIdsSentToHMaster()
+ throws IOException, InterruptedException, ServiceException {
+ SortedMap<byte[], Long> allFlushedSequenceIds =
+ new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
+ for (int i = 0; i < NUM_RS; ++i) {
+ flushAllRegions(i);
+ }
+ Thread.sleep(10000);
+ HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
+ for (int i = 0; i < NUM_RS; ++i) {
+ for (byte[] regionName : getRegionsByServer(i)) {
+ if (allFlushedSequenceIds.containsKey(regionName)) {
+ GetLastFlushedSequenceIdRequest req =
+ RequestConverter.buildGetLastFlushedSequenceIdRequest(regionName);
+
+ assertEquals((long)allFlushedSequenceIds.get(regionName),
+ master.getMasterRpcServices().getLastFlushedSequenceId(
+ null, req).getLastFlushedSequenceId());
+ }
+ }
+ }
+ }
+
+ private List<byte[]> getRegionsByServer(int rsId) throws IOException {
+ List<byte[]> regionNames = Lists.newArrayList();
+ HRegionServer hrs = getRegionServer(rsId);
+ for (HRegion r : hrs.getOnlineRegions(TABLE_NAME)) {
+ regionNames.add(r.getRegionName());
+ }
+ return regionNames;
+ }
+
+ private HRegionServer getRegionServer(int rsId) {
+ return TEST_UTIL.getMiniHBaseCluster().getRegionServer(rsId);
+ }
+
+ private void flushAllRegions(int rsId)
+ throws ServiceException, IOException {
+ HRegionServer hrs = getRegionServer(rsId);
+ for (byte[] regionName : getRegionsByServer(rsId)) {
+ FlushRegionRequest request =
+ RequestConverter.buildFlushRegionRequest(regionName);
+ hrs.getRSRpcServices().flushRegion(null, request);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
new file mode 100644
index 0000000..c3d77e9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java
@@ -0,0 +1,186 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
+import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Simple testing of a few WAL methods.
+ */
+@Category(SmallTests.class)
+public class TestWALMethods {
+ private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
+ private static final TableName TEST_TABLE =
+ TableName.valueOf("test_table");
+
+ private final HBaseTestingUtility util = new HBaseTestingUtility();
+
+ /**
+ * Assert that getSplitEditFilesSorted returns files in expected order and
+ * that it skips moved-aside files.
+ * @throws IOException
+ */
+ @Test public void testGetSplitEditFilesSorted() throws IOException {
+ FileSystem fs = FileSystem.get(util.getConfiguration());
+ Path regiondir = util.getDataTestDir("regiondir");
+ fs.delete(regiondir, true);
+ fs.mkdirs(regiondir);
+ Path recoverededits = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
+ String first = WALSplitter.formatRecoveredEditsFileName(-1);
+ createFile(fs, recoverededits, first);
+ createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(0));
+ createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(1));
+ createFile(fs, recoverededits, WALSplitter
+ .formatRecoveredEditsFileName(11));
+ createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(2));
+ createFile(fs, recoverededits, WALSplitter
+ .formatRecoveredEditsFileName(50));
+ String last = WALSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE);
+ createFile(fs, recoverededits, last);
+ createFile(fs, recoverededits,
+ Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
+
+ final Configuration walConf = new Configuration(util.getConfiguration());
+ FSUtils.setRootDir(walConf, regiondir);
+ (new WALFactory(walConf, null, "dummyLogName")).getWAL(new byte[]{});
+
+ NavigableSet<Path> files = WALSplitter.getSplitEditFilesSorted(fs, regiondir);
+ assertEquals(7, files.size());
+ assertEquals(files.pollFirst().getName(), first);
+ assertEquals(files.pollLast().getName(), last);
+ assertEquals(files.pollFirst().getName(),
+ WALSplitter
+ .formatRecoveredEditsFileName(0));
+ assertEquals(files.pollFirst().getName(),
+ WALSplitter
+ .formatRecoveredEditsFileName(1));
+ assertEquals(files.pollFirst().getName(),
+ WALSplitter
+ .formatRecoveredEditsFileName(2));
+ assertEquals(files.pollFirst().getName(),
+ WALSplitter
+ .formatRecoveredEditsFileName(11));
+ }
+
+ private void createFile(final FileSystem fs, final Path testdir,
+ final String name)
+ throws IOException {
+ FSDataOutputStream fdos = fs.create(new Path(testdir, name), true);
+ fdos.close();
+ }
+
+ @Test
+ public void testRegionEntryBuffer() throws Exception {
+ WALSplitter.RegionEntryBuffer reb = new WALSplitter.RegionEntryBuffer(
+ TEST_TABLE, TEST_REGION);
+ assertEquals(0, reb.heapSize());
+
+ reb.appendEntry(createTestLogEntry(1));
+ assertTrue(reb.heapSize() > 0);
+ }
+
+ @Test
+ public void testEntrySink() throws Exception {
+ Configuration conf = new Configuration();
+ RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+ WALSplitter splitter = new WALSplitter(WALFactory.getInstance(conf),
+ conf, mock(Path.class), mock(FileSystem.class), null, null, mode);
+
+ EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
+ for (int i = 0; i < 1000; i++) {
+ WAL.Entry entry = createTestLogEntry(i);
+ sink.appendEntry(entry);
+ }
+
+ assertTrue(sink.totalBuffered > 0);
+ long amountInChunk = sink.totalBuffered;
+ // Get a chunk
+ RegionEntryBuffer chunk = sink.getChunkToWrite();
+ assertEquals(chunk.heapSize(), amountInChunk);
+
+ // Make sure it got marked that a thread is "working on this"
+ assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
+
+ // Insert some more entries
+ for (int i = 0; i < 500; i++) {
+ WAL.Entry entry = createTestLogEntry(i);
+ sink.appendEntry(entry);
+ }
+ // Asking for another chunk shouldn't work since the first one
+ // is still writing
+ assertNull(sink.getChunkToWrite());
+
+ // If we say we're done writing the first chunk, then we should be able
+ // to get the second
+ sink.doneWriting(chunk);
+
+ RegionEntryBuffer chunk2 = sink.getChunkToWrite();
+ assertNotNull(chunk2);
+ assertNotSame(chunk, chunk2);
+ long amountInChunk2 = sink.totalBuffered;
+ // The second chunk had fewer rows than the first
+ assertTrue(amountInChunk2 < amountInChunk);
+
+ sink.doneWriting(chunk2);
+ assertEquals(0, sink.totalBuffered);
+ }
+
+ private WAL.Entry createTestLogEntry(int i) {
+ long seq = i;
+ long now = i * 1000;
+
+ WALEdit edit = new WALEdit();
+ edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val"));
+ WALKey key = new WALKey(TEST_REGION, TEST_TABLE, seq, now,
+ HConstants.DEFAULT_CLUSTER_ID);
+ WAL.Entry entry = new WAL.Entry(key, edit);
+ return entry;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
new file mode 100644
index 0000000..a13afd0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.log4j.Level;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec;
+
+/*
+ * Test that verifies WAL written by SecureProtobufLogWriter is not readable by ProtobufLogReader
+ */
+@Category(MediumTests.class)
+public class TestWALReaderOnSecureWAL {
+ static final Log LOG = LogFactory.getLog(TestWALReaderOnSecureWAL.class);
+ static {
+ ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal"))
+ .getLogger().setLevel(Level.ALL);
+ };
+ static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ final byte[] value = Bytes.toBytes("Test value");
+
+ private static final String WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
+
+ @Rule
+ public TestName currentTest = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
+ conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
+ conf.setBoolean("hbase.hlog.split.skip.errors", true);
+ conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
+ FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
+ }
+
+ private Path writeWAL(final WALFactory wals, final String tblName) throws IOException {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
+ conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
+ WALCellCodec.class);
+ try {
+ TableName tableName = TableName.valueOf(tblName);
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(tableName.getName()));
+ HRegionInfo regioninfo = new HRegionInfo(tableName,
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
+ final int total = 10;
+ final byte[] row = Bytes.toBytes("row");
+ final byte[] family = Bytes.toBytes("family");
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ Path logDir = TEST_UTIL.getDataTestDir(tblName);
+ final AtomicLong sequenceId = new AtomicLong(1);
+
+ // Write the WAL
+ WAL wal = wals.getWAL(regioninfo.getEncodedNameAsBytes());
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
+ wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis()), kvs, sequenceId, true, null);
+ }
+ wal.sync();
+ final Path walPath = DefaultWALProvider.getCurrentFileName(wal);
+ wal.shutdown();
+
+ return walPath;
+ } finally {
+ // restore the cell codec class
+ conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
+ }
+ }
+
+ @Test()
+ public void testWALReaderOnSecureWAL() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
+ WAL.Reader.class);
+ conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
+ WALProvider.Writer.class);
+ conf.setBoolean(WAL_ENCRYPTION, true);
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
+ Path walPath = writeWAL(wals, currentTest.getMethodName());
+
+ // Insure edits are not plaintext
+ long length = fs.getFileStatus(walPath).getLen();
+ FSDataInputStream in = fs.open(walPath);
+ byte[] fileData = new byte[(int)length];
+ IOUtils.readFully(in, fileData);
+ in.close();
+ assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
+
+ // Confirm the WAL cannot be read back by ProtobufLogReader
+ try {
+ WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
+ assertFalse(true);
+ } catch (IOException ioe) {
+ // expected IOE
+ }
+
+ FileStatus[] listStatus = fs.listStatus(walPath.getParent());
+ RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+ Path rootdir = FSUtils.getRootDir(conf);
+ try {
+ WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
+ s.splitLogFile(listStatus[0], null);
+ Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
+ "corrupt");
+ assertTrue(fs.exists(file));
+ // assertFalse("log splitting should have failed", true);
+ } catch (IOException ioe) {
+ assertTrue("WAL should have been sidelined", false);
+ }
+ wals.close();
+ }
+
+ @Test()
+ public void testSecureWALReaderOnWAL() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
+ WAL.Reader.class);
+ conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
+ WALProvider.Writer.class);
+ conf.setBoolean(WAL_ENCRYPTION, false);
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ final WALFactory wals = new WALFactory(conf, null, currentTest.getMethodName());
+ Path walPath = writeWAL(wals, currentTest.getMethodName());
+
+ // Ensure edits are plaintext
+ long length = fs.getFileStatus(walPath).getLen();
+ FSDataInputStream in = fs.open(walPath);
+ byte[] fileData = new byte[(int)length];
+ IOUtils.readFully(in, fileData);
+ in.close();
+ assertTrue("Cells should be plaintext", Bytes.contains(fileData, value));
+
+ // Confirm the WAL can be read back by SecureProtobufLogReader
+ try {
+ WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
+ reader.close();
+ } catch (IOException ioe) {
+ assertFalse(true);
+ }
+
+ FileStatus[] listStatus = fs.listStatus(walPath.getParent());
+ RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
+ RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
+ Path rootdir = FSUtils.getRootDir(conf);
+ try {
+ WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null, mode);
+ s.splitLogFile(listStatus[0], null);
+ Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
+ "corrupt");
+ assertTrue(!fs.exists(file));
+ } catch (IOException ioe) {
+ assertTrue("WAL should have been processed", false);
+ }
+ wals.close();
+ }
+}