You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2014/12/02 18:20:47 UTC
[08/21] hbase git commit: HBASE-12522 Backport of write-ahead-log
refactoring and follow-ons.
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
deleted file mode 100644
index f56ef98..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java
+++ /dev/null
@@ -1,566 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
-import org.apache.hadoop.hbase.trace.SpanReceiverHost;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.htrace.Sampler;
-import org.htrace.Trace;
-import org.htrace.TraceScope;
-import org.htrace.impl.ProbabilitySampler;
-
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.reporting.ConsoleReporter;
-
-/**
- * This class runs performance benchmarks for {@link HLog}.
- * See usage for this tool by running:
- * <code>$ hbase org.apache.hadoop.hbase.regionserver.wal.HLogPerformanceEvaluation -h</code>
- */
-@InterfaceAudience.Private
-public final class HLogPerformanceEvaluation extends Configured implements Tool {
- static final Log LOG = LogFactory.getLog(HLogPerformanceEvaluation.class.getName());
- private final MetricsRegistry metrics = new MetricsRegistry();
- private final Meter syncMeter =
- metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS);
- private final Histogram syncHistogram =
- metrics.newHistogram(HLogPerformanceEvaluation.class, "syncHistogram", "nanos-between-syncs",
- true);
- private final Histogram syncCountHistogram =
- metrics.newHistogram(HLogPerformanceEvaluation.class, "syncCountHistogram", "countPerSync",
- true);
- private final Meter appendMeter =
- metrics.newMeter(HLogPerformanceEvaluation.class, "appendMeter", "bytes",
- TimeUnit.MILLISECONDS);
- private final Histogram latencyHistogram =
- metrics.newHistogram(HLogPerformanceEvaluation.class, "latencyHistogram", "nanos", true);
-
- private HBaseTestingUtility TEST_UTIL;
-
- static final String TABLE_NAME = "HLogPerformanceEvaluation";
- static final String QUALIFIER_PREFIX = "q";
- static final String FAMILY_PREFIX = "cf";
-
- private int numQualifiers = 1;
- private int valueSize = 512;
- private int keySize = 16;
-
- @Override
- public void setConf(Configuration conf) {
- super.setConf(conf);
- TEST_UTIL = new HBaseTestingUtility(conf);
- }
-
- /**
- * Perform HLog.append() of Put object, for the number of iterations requested.
- * Keys and Vaues are generated randomly, the number of column families,
- * qualifiers and key/value size is tunable by the user.
- */
- class HLogPutBenchmark implements Runnable {
- private final long numIterations;
- private final int numFamilies;
- private final boolean noSync;
- private final HRegion region;
- private final int syncInterval;
- private final HTableDescriptor htd;
- private final Sampler loopSampler;
-
- HLogPutBenchmark(final HRegion region, final HTableDescriptor htd,
- final long numIterations, final boolean noSync, final int syncInterval,
- final double traceFreq) {
- this.numIterations = numIterations;
- this.noSync = noSync;
- this.syncInterval = syncInterval;
- this.numFamilies = htd.getColumnFamilies().length;
- this.region = region;
- this.htd = htd;
- String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
- if (spanReceivers == null || spanReceivers.isEmpty()) {
- loopSampler = Sampler.NEVER;
- } else {
- if (traceFreq <= 0.0) {
- LOG.warn("Tracing enabled but traceFreq=0.");
- loopSampler = Sampler.NEVER;
- } else if (traceFreq >= 1.0) {
- loopSampler = Sampler.ALWAYS;
- if (numIterations > 1000) {
- LOG.warn("Full tracing of all iterations will produce a lot of data. Be sure your"
- + " SpanReciever can keep up.");
- }
- } else {
- loopSampler = new ProbabilitySampler(traceFreq);
- }
- }
- }
-
- @Override
- public void run() {
- byte[] key = new byte[keySize];
- byte[] value = new byte[valueSize];
- Random rand = new Random(Thread.currentThread().getId());
- HLog hlog = region.getLog();
- ArrayList<UUID> clusters = new ArrayList<UUID>();
- long nonce = HConstants.NO_NONCE;
-
- TraceScope threadScope =
- Trace.startSpan("HLogPerfEval." + Thread.currentThread().getName());
- try {
- long startTime = System.currentTimeMillis();
- int lastSync = 0;
- for (int i = 0; i < numIterations; ++i) {
- assert Trace.currentSpan() == threadScope.getSpan() : "Span leak detected.";
- TraceScope loopScope = Trace.startSpan("runLoopIter" + i, loopSampler);
- try {
- long now = System.nanoTime();
- Put put = setupPut(rand, key, value, numFamilies);
- WALEdit walEdit = new WALEdit();
- addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
- HRegionInfo hri = region.getRegionInfo();
- hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd,
- region.getSequenceId(), true, nonce, nonce);
- if (!this.noSync) {
- if (++lastSync >= this.syncInterval) {
- hlog.sync();
- lastSync = 0;
- }
- }
- latencyHistogram.update(System.nanoTime() - now);
- } finally {
- loopScope.close();
- }
- }
- long totalTime = (System.currentTimeMillis() - startTime);
- logBenchmarkResult(Thread.currentThread().getName(), numIterations, totalTime);
- } catch (Exception e) {
- LOG.error(getClass().getSimpleName() + " Thread failed", e);
- } finally {
- threadScope.close();
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Path rootRegionDir = null;
- int numThreads = 1;
- long numIterations = 1000000;
- int numFamilies = 1;
- int syncInterval = 0;
- boolean noSync = false;
- boolean verify = false;
- boolean verbose = false;
- boolean cleanup = true;
- boolean noclosefs = false;
- long roll = Long.MAX_VALUE;
- boolean compress = false;
- String cipher = null;
- String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
- boolean trace = spanReceivers != null && !spanReceivers.isEmpty();
- double traceFreq = 1.0;
- // Process command line args
- for (int i = 0; i < args.length; i++) {
- String cmd = args[i];
- try {
- if (cmd.equals("-threads")) {
- numThreads = Integer.parseInt(args[++i]);
- } else if (cmd.equals("-iterations")) {
- numIterations = Long.parseLong(args[++i]);
- } else if (cmd.equals("-path")) {
- rootRegionDir = new Path(args[++i]);
- } else if (cmd.equals("-families")) {
- numFamilies = Integer.parseInt(args[++i]);
- } else if (cmd.equals("-qualifiers")) {
- numQualifiers = Integer.parseInt(args[++i]);
- } else if (cmd.equals("-keySize")) {
- keySize = Integer.parseInt(args[++i]);
- } else if (cmd.equals("-valueSize")) {
- valueSize = Integer.parseInt(args[++i]);
- } else if (cmd.equals("-syncInterval")) {
- syncInterval = Integer.parseInt(args[++i]);
- } else if (cmd.equals("-nosync")) {
- noSync = true;
- } else if (cmd.equals("-verify")) {
- verify = true;
- } else if (cmd.equals("-verbose")) {
- verbose = true;
- } else if (cmd.equals("-nocleanup")) {
- cleanup = false;
- } else if (cmd.equals("-noclosefs")) {
- noclosefs = true;
- } else if (cmd.equals("-roll")) {
- roll = Long.parseLong(args[++i]);
- } else if (cmd.equals("-compress")) {
- compress = true;
- } else if (cmd.equals("-encryption")) {
- cipher = args[++i];
- } else if (cmd.equals("-traceFreq")) {
- traceFreq = Double.parseDouble(args[++i]);
- } else if (cmd.equals("-h")) {
- printUsageAndExit();
- } else if (cmd.equals("--help")) {
- printUsageAndExit();
- } else {
- System.err.println("UNEXPECTED: " + cmd);
- printUsageAndExit();
- }
- } catch (Exception e) {
- printUsageAndExit();
- }
- }
-
- if (compress) {
- Configuration conf = getConf();
- conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
- }
-
- if (cipher != null) {
- // Set up HLog for encryption
- Configuration conf = getConf();
- conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
- conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- HLog.Reader.class);
- conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
- HLog.Writer.class);
- conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
- conf.set(HConstants.CRYPTO_WAL_ALGORITHM_CONF_KEY, cipher);
- }
-
- // Internal config. goes off number of threads; if more threads than handlers, stuff breaks.
- // In regionserver, number of handlers == number of threads.
- getConf().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, numThreads);
-
- // Run HLog Performance Evaluation
- // First set the fs from configs. In case we are on hadoop1
- FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
- FileSystem fs = FileSystem.get(getConf());
- LOG.info("FileSystem: " + fs);
-
- SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
- TraceScope scope = Trace.startSpan("HLogPerfEval", trace ? Sampler.ALWAYS : Sampler.NEVER);
-
- try {
- if (rootRegionDir == null) {
- rootRegionDir = TEST_UTIL.getDataTestDirOnTestFS("HLogPerformanceEvaluation");
- }
- rootRegionDir = rootRegionDir.makeQualified(fs);
- cleanRegionRootDir(fs, rootRegionDir);
- // Initialize Table Descriptor
- HTableDescriptor htd = createHTableDescriptor(numFamilies);
- final long whenToRoll = roll;
- final HLog hlog = new FSHLog(fs, rootRegionDir, "wals", getConf()) {
-
- @Override
- public void postSync(final long timeInNanos, final int handlerSyncs) {
- super.postSync(timeInNanos, handlerSyncs);
- syncMeter.mark();
- syncHistogram.update(timeInNanos);
- syncCountHistogram.update(handlerSyncs);
- }
-
- @Override
- public long postAppend(final HLog.Entry entry, final long elapsedTime) {
- long size = super.postAppend(entry, elapsedTime);
- appendMeter.mark(size);
- return size;
- }
- };
- hlog.registerWALActionsListener(new WALActionsListener() {
- private int appends = 0;
-
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
- WALEdit logEdit) {
- this.appends++;
- if (this.appends % whenToRoll == 0) {
- LOG.info("Rolling after " + appends + " edits");
- // We used to do explicit call to rollWriter but changed it to a request
- // to avoid dead lock (there are less threads going on in this class than
- // in the regionserver -- regionserver does not have the issue).
- ((FSHLog)hlog).requestLogRoll();
- }
- }
-
- @Override
- public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
- }
-
- @Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- }
-
- @Override
- public void preLogArchive(Path oldPath, Path newPath) throws IOException {
- }
-
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- }
-
- @Override
- public void postLogArchive(Path oldPath, Path newPath) throws IOException {
- }
-
- @Override
- public void logRollRequested() {
- }
-
- @Override
- public void logCloseRequested() {
- }
- });
- hlog.rollWriter();
- HRegion region = null;
-
- try {
- region = openRegion(fs, rootRegionDir, htd, hlog);
- ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS);
- long putTime =
- runBenchmark(Trace.wrap(
- new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval, traceFreq)),
- numThreads);
- logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations +
- ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);
-
- if (region != null) {
- closeRegion(region);
- region = null;
- }
- if (verify) {
- Path dir = ((FSHLog) hlog).getDir();
- long editCount = 0;
- FileStatus [] fsss = fs.listStatus(dir);
- if (fsss.length == 0) throw new IllegalStateException("No WAL found");
- for (FileStatus fss: fsss) {
- Path p = fss.getPath();
- if (!fs.exists(p)) throw new IllegalStateException(p.toString());
- editCount += verify(p, verbose);
- }
- long expected = numIterations * numThreads;
- if (editCount != expected) {
- throw new IllegalStateException("Counted=" + editCount + ", expected=" + expected);
- }
- }
- } finally {
- if (region != null) closeRegion(region);
- // Remove the root dir for this test region
- if (cleanup) cleanRegionRootDir(fs, rootRegionDir);
- }
- } finally {
- // We may be called inside a test that wants to keep on using the fs.
- if (!noclosefs) fs.close();
- scope.close();
- if (receiverHost != null) receiverHost.closeReceivers();
- }
-
- return(0);
- }
-
- private static HTableDescriptor createHTableDescriptor(final int numFamilies) {
- HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
- for (int i = 0; i < numFamilies; ++i) {
- HColumnDescriptor colDef = new HColumnDescriptor(FAMILY_PREFIX + i);
- htd.addFamily(colDef);
- }
- return htd;
- }
-
- /**
- * Verify the content of the WAL file.
- * Verify that the file has expected number of edits.
- * @param wal
- * @return Count of edits.
- * @throws IOException
- */
- private long verify(final Path wal, final boolean verbose) throws IOException {
- HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf());
- long count = 0;
- Map<String, Long> sequenceIds = new HashMap<String, Long>();
- try {
- while (true) {
- Entry e = reader.next();
- if (e == null) {
- LOG.debug("Read count=" + count + " from " + wal);
- break;
- }
- count++;
- long seqid = e.getKey().getLogSeqNum();
- if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
- // sequenceIds should be increasing for every regions
- if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
- throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
- + sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
- + ", current seqid = " + seqid);
- }
- }
- // update the sequence Id.
- sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
- if (verbose) LOG.info("seqid=" + seqid);
- }
- } finally {
- reader.close();
- }
- return count;
- }
-
- private static void logBenchmarkResult(String testName, long numTests, long totalTime) {
- float tsec = totalTime / 1000.0f;
- LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec));
-
- }
-
- private void printUsageAndExit() {
- System.err.printf("Usage: bin/hbase %s [options]\n", getClass().getName());
- System.err.println(" where [options] are:");
- System.err.println(" -h|-help Show this help and exit.");
- System.err.println(" -threads <N> Number of threads writing on the WAL.");
- System.err.println(" -iterations <N> Number of iterations per thread.");
- System.err.println(" -path <PATH> Path where region's root directory is created.");
- System.err.println(" -families <N> Number of column families to write.");
- System.err.println(" -qualifiers <N> Number of qualifiers to write.");
- System.err.println(" -keySize <N> Row key size in byte.");
- System.err.println(" -valueSize <N> Row/Col value size in byte.");
- System.err.println(" -nocleanup Do NOT remove test data when done.");
- System.err.println(" -noclosefs Do NOT close the filesystem when done.");
- System.err.println(" -nosync Append without syncing");
- System.err.println(" -syncInterval <N> Append N edits and then sync. " +
- "Default=0, i.e. sync every edit.");
- System.err.println(" -verify Verify edits written in sequence");
- System.err.println(" -verbose Output extra info; " +
- "e.g. all edit seq ids when verifying");
- System.err.println(" -roll <N> Roll the way every N appends");
- System.err.println(" -encryption <A> Encrypt the WAL with algorithm A, e.g. AES");
- System.err.println(" -traceFreq <N> Rate of trace sampling. Default: 1.0, " +
- "only respected when tracing is enabled, ie -Dhbase.trace.spanreceiver.classes=...");
- System.err.println("");
- System.err.println("Examples:");
- System.err.println("");
- System.err.println(" To run 100 threads on hdfs with log rolling every 10k edits and " +
- "verification afterward do:");
- System.err.println(" $ ./bin/hbase org.apache.hadoop.hbase.regionserver.wal." +
- "HLogPerformanceEvaluation \\");
- System.err.println(" -conf ./core-site.xml -path hdfs://example.org:7000/tmp " +
- "-threads 100 -roll 10000 -verify");
- System.exit(1);
- }
-
- private HRegion openRegion(final FileSystem fs, final Path dir, final HTableDescriptor htd,
- final HLog hlog)
- throws IOException {
- // Initialize HRegion
- HRegionInfo regionInfo = new HRegionInfo(htd.getTableName());
- return HRegion.createHRegion(regionInfo, dir, getConf(), htd, hlog);
- }
-
- private void closeRegion(final HRegion region) throws IOException {
- if (region != null) {
- region.close();
- HLog wal = region.getLog();
- if (wal != null) wal.close();
- }
- }
-
- private void cleanRegionRootDir(final FileSystem fs, final Path dir) throws IOException {
- if (fs.exists(dir)) {
- fs.delete(dir, true);
- }
- }
-
- private Put setupPut(Random rand, byte[] key, byte[] value, final int numFamilies) {
- rand.nextBytes(key);
- Put put = new Put(key);
- for (int cf = 0; cf < numFamilies; ++cf) {
- for (int q = 0; q < numQualifiers; ++q) {
- rand.nextBytes(value);
- put.add(Bytes.toBytes(FAMILY_PREFIX + cf), Bytes.toBytes(QUALIFIER_PREFIX + q), value);
- }
- }
- return put;
- }
-
- private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
- WALEdit walEdit) {
- for (List<Cell> edits : familyMap.values()) {
- for (Cell cell : edits) {
- walEdit.add(cell);
- }
- }
- }
-
- private long runBenchmark(Runnable runnable, final int numThreads) throws InterruptedException {
- Thread[] threads = new Thread[numThreads];
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < numThreads; ++i) {
- threads[i] = new Thread(runnable, "t" + i);
- threads[i].start();
- }
- for (Thread t : threads) t.join();
- long endTime = System.currentTimeMillis();
- return(endTime - startTime);
- }
-
- /**
- * The guts of the {@link #main} method.
- * Call this method to avoid the {@link #main(String[])} System.exit.
- * @param args
- * @return errCode
- * @throws Exception
- */
- static int innerMain(final Configuration c, final String [] args) throws Exception {
- return ToolRunner.run(c, new HLogPerformanceEvaluation(), args);
- }
-
- public static void main(String[] args) throws Exception {
- System.exit(innerMain(HBaseConfiguration.create(), args));
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java
deleted file mode 100644
index 80eb8c2..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtilsForTests.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-
-/**
- * An Utility testcase that returns the number of log files that
- * were rolled to be accessed from outside packages.
- *
- * This class makes available methods that are package protected.
- * This is interesting for test only.
- */
-public class HLogUtilsForTests {
-
- /**
- *
- * @param log
- * @return
- */
- public static int getNumRolledLogFiles(HLog log) {
- return ((FSHLog) log).getNumRolledLogFiles();
- }
-
- public static int getNumEntries(HLog log) {
- return ((FSHLog) log).getNumEntries();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
new file mode 100644
index 0000000..d7a4618
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedLogWriter.java
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+public class InstrumentedLogWriter extends ProtobufLogWriter {
+
+ public InstrumentedLogWriter() {
+ super();
+ }
+
+ public static boolean activateFailure = false;
+ @Override
+ public void append(Entry entry) throws IOException {
+ super.append(entry);
+ if (activateFailure &&
+ Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) {
+ System.out.println(getClass().getName() + ": I will throw an exception now...");
+ throw(new IOException("This exception is instrumented and should only be thrown for testing"
+ ));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
deleted file mode 100644
index d240e66..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/InstrumentedSequenceFileLogWriter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.util.Bytes;
-
-public class InstrumentedSequenceFileLogWriter extends ProtobufLogWriter {
-
- public InstrumentedSequenceFileLogWriter() {
- super();
- }
-
- public static boolean activateFailure = false;
- @Override
- public void append(HLog.Entry entry) throws IOException {
- super.append(entry);
- if (activateFailure && Bytes.equals(entry.getKey().getEncodedRegionName(), "break".getBytes())) {
- System.out.println(getClass().getName() + ": I will throw an exception now...");
- throw(new IOException("This exception is instrumented and should only be thrown for testing"));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
index 221f76e..7c13c00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
@@ -32,8 +32,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
@@ -42,8 +43,13 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
/**
- * Implementation of {@link HLog.Writer} that delegates to
+ * Implementation of {@link WALProvider.Writer} that delegates to
* SequenceFile.Writer. Legacy implementation only used for compat tests.
+ *
+ * Note that because this class writes to the legacy hadoop-specific SequenceFile
+ * format, users of it must write {@link HLogKey} keys and not arbitrary
+ * {@link WALKey}s because the latter are not Writables (nor made to work with
+ * Hadoop serialization).
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class SequenceFileLogWriter extends WriterBase {
@@ -163,7 +169,7 @@ public class SequenceFileLogWriter extends WriterBase {
}
@Override
- public void append(HLog.Entry entry) throws IOException {
+ public void append(WAL.Entry entry) throws IOException {
entry.setCompressionContext(compressionContext);
try {
this.writer.append(entry.getKey(), entry.getEdit());
@@ -213,11 +219,4 @@ public class SequenceFileLogWriter extends WriterBase {
public FSDataOutputStream getWriterFSDataOutputStream() {
return this.writer_out;
}
-
- /**
- * This method is empty as trailer is added only in Protobuf based hlog readers/writers.
- */
- @Override
- public void setWALTrailer(WALTrailer walTrailer) {
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
index a09bfa0..2f515d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
@@ -37,6 +37,10 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -44,7 +48,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
- * Tests for HLog write durability
+ * Tests for WAL write durability
*/
@Category(MediumTests.class)
public class TestDurability {
@@ -67,6 +71,7 @@ public class TestDurability {
CLUSTER = TEST_UTIL.getDFSCluster();
FS = CLUSTER.getFileSystem();
DIR = TEST_UTIL.getDataTestDirOnTestFS("TestDurability");
+ FSUtils.setRootDir(CONF, DIR);
}
@AfterClass
@@ -76,14 +81,14 @@ public class TestDurability {
@Test
public void testDurability() throws Exception {
- HLog wal = HLogFactory.createHLog(FS, DIR, "hlogdir",
- "hlogdir_archive", CONF);
+ final WALFactory wals = new WALFactory(CONF, null, "TestDurability");
byte[] tableName = Bytes.toBytes("TestDurability");
+ final WAL wal = wals.getWAL(tableName);
HRegion region = createHRegion(tableName, "region", wal, Durability.USE_DEFAULT);
HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, Durability.ASYNC_WAL);
region.put(newPut(null));
- verifyHLogCount(wal, 1);
+ verifyWALCount(wals, wal, 1);
// a put through the deferred table does not write to the wal immediately,
// but maybe has been successfully sync-ed by the underlying AsyncWriter +
@@ -91,44 +96,44 @@ public class TestDurability {
deferredRegion.put(newPut(null));
// but will after we sync the wal
wal.sync();
- verifyHLogCount(wal, 2);
+ verifyWALCount(wals, wal, 2);
// a put through a deferred table will be sync with the put sync'ed put
deferredRegion.put(newPut(null));
wal.sync();
- verifyHLogCount(wal, 3);
+ verifyWALCount(wals, wal, 3);
region.put(newPut(null));
- verifyHLogCount(wal, 4);
+ verifyWALCount(wals, wal, 4);
// a put through a deferred table will be sync with the put sync'ed put
deferredRegion.put(newPut(Durability.USE_DEFAULT));
wal.sync();
- verifyHLogCount(wal, 5);
+ verifyWALCount(wals, wal, 5);
region.put(newPut(Durability.USE_DEFAULT));
- verifyHLogCount(wal, 6);
+ verifyWALCount(wals, wal, 6);
// SKIP_WAL never writes to the wal
region.put(newPut(Durability.SKIP_WAL));
deferredRegion.put(newPut(Durability.SKIP_WAL));
- verifyHLogCount(wal, 6);
+ verifyWALCount(wals, wal, 6);
wal.sync();
- verifyHLogCount(wal, 6);
+ verifyWALCount(wals, wal, 6);
// Async overrides sync table default
region.put(newPut(Durability.ASYNC_WAL));
deferredRegion.put(newPut(Durability.ASYNC_WAL));
wal.sync();
- verifyHLogCount(wal, 8);
+ verifyWALCount(wals, wal, 8);
// sync overrides async table default
region.put(newPut(Durability.SYNC_WAL));
deferredRegion.put(newPut(Durability.SYNC_WAL));
- verifyHLogCount(wal, 10);
+ verifyWALCount(wals, wal, 10);
// fsync behaves like sync
region.put(newPut(Durability.FSYNC_WAL));
deferredRegion.put(newPut(Durability.FSYNC_WAL));
- verifyHLogCount(wal, 12);
+ verifyWALCount(wals, wal, 12);
}
@Test
@@ -139,9 +144,9 @@ public class TestDurability {
byte[] col3 = Bytes.toBytes("col3");
// Setting up region
- HLog wal = HLogFactory.createHLog(FS, DIR, "myhlogdir",
- "myhlogdir_archive", CONF);
+ final WALFactory wals = new WALFactory(CONF, null, "TestIncrement");
byte[] tableName = Bytes.toBytes("TestIncrement");
+ final WAL wal = wals.getWAL(tableName);
HRegion region = createHRegion(tableName, "increment", wal, Durability.USE_DEFAULT);
// col1: amount = 1, 1 write back to WAL
@@ -150,7 +155,7 @@ public class TestDurability {
Result res = region.increment(inc1);
assertEquals(1, res.size());
assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
- verifyHLogCount(wal, 1);
+ verifyWALCount(wals, wal, 1);
// col1: amount = 0, 0 write back to WAL
inc1 = new Increment(row1);
@@ -158,7 +163,7 @@ public class TestDurability {
res = region.increment(inc1);
assertEquals(1, res.size());
assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
- verifyHLogCount(wal, 1);
+ verifyWALCount(wals, wal, 1);
// col1: amount = 0, col2: amount = 0, col3: amount = 0
// 0 write back to WAL
@@ -171,7 +176,7 @@ public class TestDurability {
assertEquals(1, Bytes.toLong(res.getValue(FAMILY, col1)));
assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col2)));
assertEquals(0, Bytes.toLong(res.getValue(FAMILY, col3)));
- verifyHLogCount(wal, 1);
+ verifyWALCount(wals, wal, 1);
// col1: amount = 5, col2: amount = 4, col3: amount = 3
// 1 write back to WAL
@@ -184,7 +189,7 @@ public class TestDurability {
assertEquals(6, Bytes.toLong(res.getValue(FAMILY, col1)));
assertEquals(4, Bytes.toLong(res.getValue(FAMILY, col2)));
assertEquals(3, Bytes.toLong(res.getValue(FAMILY, col3)));
- verifyHLogCount(wal, 2);
+ verifyWALCount(wals, wal, 2);
}
private Put newPut(Durability durability) {
@@ -196,11 +201,11 @@ public class TestDurability {
return p;
}
- private void verifyHLogCount(HLog log, int expected) throws Exception {
- Path walPath = ((FSHLog) log).computeFilename();
- HLog.Reader reader = HLogFactory.createReader(FS, walPath, CONF);
+ private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
+ Path walPath = DefaultWALProvider.getCurrentFileName(log);
+ WAL.Reader reader = wals.createReader(FS, walPath);
int count = 0;
- HLog.Entry entry = new HLog.Entry();
+ WAL.Entry entry = new WAL.Entry();
while (reader.next(entry) != null) count++;
reader.close();
assertEquals(expected, count);
@@ -208,7 +213,7 @@ public class TestDurability {
// lifted from TestAtomicOperation
private HRegion createHRegion (byte [] tableName, String callingMethod,
- HLog log, Durability durability)
+ WAL log, Durability durability)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
htd.setDurability(durability);
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
new file mode 100644
index 0000000..6a7b7fa
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -0,0 +1,478 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Provides FSHLog test cases.
+ */
+@Category(MediumTests.class)
+public class TestFSHLog {
+ protected static final Log LOG = LogFactory.getLog(TestFSHLog.class);
+ {
+ ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem"))
+ .getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ protected static Configuration conf;
+ protected static FileSystem fs;
+ protected static Path dir;
+ protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @Rule
+ public final TestName currentTest = new TestName();
+
+ @Before
+ public void setUp() throws Exception {
+ FileStatus[] entries = fs.listStatus(new Path("/"));
+ for (FileStatus dir : entries) {
+ fs.delete(dir.getPath(), true);
+ }
+ final Path hbaseDir = TEST_UTIL.createRootDir();
+ dir = new Path(hbaseDir, currentTest.getMethodName());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Make block sizes small.
+ TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
+ // quicker heartbeat interval for faster DN death notification
+ TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
+ TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
+ TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000);
+
+ // faster failover with cluster.shutdown();fs.close() idiom
+ TEST_UTIL.getConfiguration()
+ .setInt("hbase.ipc.client.connect.max.retries", 1);
+ TEST_UTIL.getConfiguration().setInt(
+ "dfs.client.block.recovery.retries", 1);
+ TEST_UTIL.getConfiguration().setInt(
+ "hbase.ipc.client.connection.maxidletime", 500);
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName());
+ TEST_UTIL.startMiniDFSCluster(3);
+
+ conf = TEST_UTIL.getConfiguration();
+ fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * A loaded WAL coprocessor won't break existing WAL test cases.
+ */
+ @Test
+ public void testWALCoprocessorLoaded() throws Exception {
+ // test to see whether the coprocessor is loaded or not.
+ FSHLog log = null;
+ try {
+ log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+ HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+ WALCoprocessorHost host = log.getCoprocessorHost();
+ Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
+ assertNotNull(c);
+ } finally {
+ if (log != null) {
+ log.close();
+ }
+ }
+ }
+
+ protected void addEdits(WAL log, HRegionInfo hri, TableName tableName,
+ int times, AtomicLong sequenceId) throws IOException {
+ HTableDescriptor htd = new HTableDescriptor();
+ htd.addFamily(new HColumnDescriptor("row"));
+
+ final byte [] row = Bytes.toBytes("row");
+ for (int i = 0; i < times; i++) {
+ long timestamp = System.currentTimeMillis();
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, row, row, timestamp, row));
+ log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols,
+ sequenceId, true, null);
+ }
+ log.sync();
+ }
+
+ /**
+ * helper method to simulate region flush for a WAL.
+ * @param wal
+ * @param regionEncodedName
+ */
+ protected void flushRegion(WAL wal, byte[] regionEncodedName) {
+ wal.startCacheFlush(regionEncodedName);
+ wal.completeCacheFlush(regionEncodedName);
+ }
+
+ /**
+ * tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
+ * exception if we do). Comparison is based on the timestamp present in the wal name.
+ * @throws Exception
+ */
+ @Test
+ public void testWALComparator() throws Exception {
+ FSHLog wal1 = null;
+ FSHLog walMeta = null;
+ try {
+ wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+ HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+ LOG.debug("Log obtained is: " + wal1);
+ Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
+ Path p1 = wal1.computeFilename(11);
+ Path p2 = wal1.computeFilename(12);
+ // comparing with itself returns 0
+ assertTrue(comp.compare(p1, p1) == 0);
+ // comparing with different filenum.
+ assertTrue(comp.compare(p1, p2) < 0);
+ walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+ HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
+ DefaultWALProvider.META_WAL_PROVIDER_ID);
+ Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
+
+ Path p1WithMeta = walMeta.computeFilename(11);
+ Path p2WithMeta = walMeta.computeFilename(12);
+ assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
+ assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
+ // mixing meta and non-meta logs gives error
+ boolean ex = false;
+ try {
+ comp.compare(p1WithMeta, p2);
+ } catch (IllegalArgumentException e) {
+ ex = true;
+ }
+ assertTrue("Comparator doesn't complain while checking meta log files", ex);
+ boolean exMeta = false;
+ try {
+ compMeta.compare(p1WithMeta, p2);
+ } catch (IllegalArgumentException e) {
+ exMeta = true;
+ }
+ assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
+ } finally {
+ if (wal1 != null) {
+ wal1.close();
+ }
+ if (walMeta != null) {
+ walMeta.close();
+ }
+ }
+ }
+
+ /**
+ * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the
+ * list of regions which should be flushed in order to archive the oldest wal file.
+ * <p>
+ * This method tests this behavior by inserting edits and rolling the wal enough times to reach
+ * the max number of logs threshold. It checks whether we get the "right regions" for flush on
+ * rolling the wal.
+ * @throws Exception
+ */
+ @Test
+ public void testFindMemStoresEligibleForFlush() throws Exception {
+ LOG.debug("testFindMemStoresEligibleForFlush");
+ Configuration conf1 = HBaseConfiguration.create(conf);
+ conf1.setInt("hbase.regionserver.maxlogs", 1);
+ FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
+ HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
+ TableName t1 = TableName.valueOf("t1");
+ TableName t2 = TableName.valueOf("t2");
+ HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ // variables to mock region sequenceIds
+ final AtomicLong sequenceId1 = new AtomicLong(1);
+ final AtomicLong sequenceId2 = new AtomicLong(1);
+ // add edits and roll the wal
+ try {
+ addEdits(wal, hri1, t1, 2, sequenceId1);
+ wal.rollWriter();
+ // add some more edits and roll the wal. This would reach the log number threshold
+ addEdits(wal, hri1, t1, 2, sequenceId1);
+ wal.rollWriter();
+ // with above rollWriter call, the max logs limit is reached.
+ assertTrue(wal.getNumRolledLogFiles() == 2);
+
+ // get the regions to flush; since there is only one region in the oldest wal, it should
+ // return only one region.
+ byte[][] regionsToFlush = wal.findRegionsToForceFlush();
+ assertEquals(1, regionsToFlush.length);
+ assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+ // insert edits in second region
+ addEdits(wal, hri2, t2, 2, sequenceId2);
+ // get the regions to flush, it should still read region1.
+ regionsToFlush = wal.findRegionsToForceFlush();
+ assertEquals(regionsToFlush.length, 1);
+ assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
+ // flush region 1, and roll the wal file. Only last wal which has entries for region1 should
+ // remain.
+ flushRegion(wal, hri1.getEncodedNameAsBytes());
+ wal.rollWriter();
+ // only one wal should remain now (that is for the second region).
+ assertEquals(1, wal.getNumRolledLogFiles());
+ // flush the second region
+ flushRegion(wal, hri2.getEncodedNameAsBytes());
+ wal.rollWriter(true);
+ // no wal should remain now.
+ assertEquals(0, wal.getNumRolledLogFiles());
+ // add edits both to region 1 and region 2, and roll.
+ addEdits(wal, hri1, t1, 2, sequenceId1);
+ addEdits(wal, hri2, t2, 2, sequenceId2);
+ wal.rollWriter();
+ // add edits and roll the writer, to reach the max logs limit.
+ assertEquals(1, wal.getNumRolledLogFiles());
+ addEdits(wal, hri1, t1, 2, sequenceId1);
+ wal.rollWriter();
+ // it should return two regions to flush, as the oldest wal file has entries
+ // for both regions.
+ regionsToFlush = wal.findRegionsToForceFlush();
+ assertEquals(2, regionsToFlush.length);
+ // flush both regions
+ flushRegion(wal, hri1.getEncodedNameAsBytes());
+ flushRegion(wal, hri2.getEncodedNameAsBytes());
+ wal.rollWriter(true);
+ assertEquals(0, wal.getNumRolledLogFiles());
+ // Add an edit to region1, and roll the wal.
+ addEdits(wal, hri1, t1, 2, sequenceId1);
+ // tests partial flush: roll on a partial flush, and ensure that wal is not archived.
+ wal.startCacheFlush(hri1.getEncodedNameAsBytes());
+ wal.rollWriter();
+ wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
+ assertEquals(1, wal.getNumRolledLogFiles());
+ } finally {
+ if (wal != null) {
+ wal.close();
+ }
+ }
+ }
+
+ /**
+ * Simulates WAL append ops for a region and tests
+ * {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
+ * It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
+ * If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
+ * region should be flushed before archiving this WAL.
+ */
+ @Test
+ public void testAllRegionsFlushed() {
+ LOG.debug("testAllRegionsFlushed");
+ Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
+ Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
+ Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
+ // create a table
+ TableName t1 = TableName.valueOf("t1");
+ // create a region
+ HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ // variables to mock region sequenceIds
+ final AtomicLong sequenceId1 = new AtomicLong(1);
+ // test empty map
+ assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+ // add entries in the region
+ seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
+ oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
+ // should say region1 is not flushed.
+ assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+ // test with entries in oldestFlushing map.
+ oldestUnFlushedSeqNo.clear();
+ oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
+ assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+ // simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
+ oldestFlushingSeqNo.clear();
+ oldestUnFlushedSeqNo.clear();
+ assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+ // insert some large values for region1
+ oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
+ seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
+ assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+
+ // tests when oldestUnFlushed/oldestFlushing contains larger value.
+ // It means region is flushed.
+ oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
+ oldestUnFlushedSeqNo.clear();
+ seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
+ assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
+ }
+
+ @Test(expected=IOException.class)
+ public void testFailedToCreateWALIfParentRenamed() throws IOException {
+ final String name = "testFailedToCreateWALIfParentRenamed";
+ FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
+ conf, null, true, null, null);
+ long filenum = System.currentTimeMillis();
+ Path path = log.computeFilename(filenum);
+ log.createWriterInstance(path);
+ Path parent = path.getParent();
+ path = log.computeFilename(filenum + 1);
+ Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting");
+ fs.rename(parent, newPath);
+ log.createWriterInstance(path);
+ fail("It should fail to create the new WAL");
+ }
+
+ /**
+ * Test flush for sure has a sequence id that is beyond the last edit appended. We do this
+ * by slowing appends in the background ring buffer thread while in foreground we call
+ * flush. The addition of the sync over HRegion in flush should fix an issue where flush was
+ * returning before all of its appends had made it out to the WAL (HBASE-11109).
+ * @throws IOException
+ * @see HBASE-11109
+ */
+ @Test
+ public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
+ String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
+ final TableName tableName = TableName.valueOf(testName);
+ final HRegionInfo hri = new HRegionInfo(tableName);
+ final byte[] rowName = tableName.getName();
+ final HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor("f"));
+ HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
+ TEST_UTIL.getConfiguration(), htd);
+ HRegion.closeHRegion(r);
+ final int countPerFamily = 10;
+ final MutableBoolean goslow = new MutableBoolean(false);
+ // subclass and doctor a method.
+ FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
+ testName, conf) {
+ @Override
+ void atHeadOfRingBufferEventHandlerAppend() {
+ if (goslow.isTrue()) {
+ Threads.sleep(100);
+ LOG.debug("Sleeping before appending 100ms");
+ }
+ super.atHeadOfRingBufferEventHandlerAppend();
+ }
+ };
+ HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
+ TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
+ EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
+ try {
+ List<Put> puts = null;
+ for (HColumnDescriptor hcd: htd.getFamilies()) {
+ puts =
+ TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
+ }
+
+ // Now assert edits made it in.
+ final Get g = new Get(rowName);
+ Result result = region.get(g);
+ assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
+
+ // Construct a WALEdit and add it a few times to the WAL.
+ WALEdit edits = new WALEdit();
+ for (Put p: puts) {
+ CellScanner cs = p.cellScanner();
+ while (cs.advance()) {
+ edits.add(cs.current());
+ }
+ }
+ // Add any old cluster id.
+ List<UUID> clusterIds = new ArrayList<UUID>();
+ clusterIds.add(UUID.randomUUID());
+ // Now make appends run slow.
+ goslow.setValue(true);
+ for (int i = 0; i < countPerFamily; i++) {
+ final HRegionInfo info = region.getRegionInfo();
+ final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
+ System.currentTimeMillis(), clusterIds, -1, -1);
+ wal.append(htd, info, logkey, edits, region.getSequenceId(), true, null);
+ }
+ region.flushcache();
+ // FlushResult.flushSequenceId is not visible here so go get the current sequence id.
+ long currentSequenceId = region.getSequenceId().get();
+ // Now release the appends
+ goslow.setValue(false);
+ synchronized (goslow) {
+ goslow.notifyAll();
+ }
+ assertTrue(currentSequenceId >= region.getSequenceId().get());
+ } finally {
+ region.close(true);
+ wal.close();
+ }
+ }
+
+}