You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/03/10 15:03:55 UTC
[hbase] branch branch-2 updated: HBASE-27632 Refactor WAL.Reader implementation so we can better support WAL splitting and replication (#5055)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new cec05562639 HBASE-27632 Refactor WAL.Reader implementation so we can better support WAL splitting and replication (#5055)
cec05562639 is described below
commit cec05562639c3e01d588a7e864f6d476b66a65f1
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Mar 10 21:54:20 2023 +0800
HBASE-27632 Refactor WAL.Reader implementation so we can better support WAL splitting and replication (#5055)
Signed-off-by: GeorryHuang <hu...@apache.org>
(cherry picked from commit e48c4485db8d9255510ee1cc9cf465e14de637d7)
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogTestHelper.java
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
---
.../hbase/IntegrationTestIngestWithEncryption.java | 4 -
.../hadoop/hbase/mapreduce/WALInputFormat.java | 56 +-
.../hadoop/hbase/master/region/MasterRegion.java | 3 +
.../store/region/WALProcedurePrettyPrinter.java | 3 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 33 +-
.../hbase/regionserver/wal/AbstractFSWAL.java | 5 +-
.../wal/AbstractProtobufLogWriter.java | 11 +-
.../wal/AbstractProtobufWALReader.java | 564 +++++++++++++++++++++
.../hadoop/hbase/regionserver/wal/Compressor.java | 10 +-
.../hbase/regionserver/wal/ProtobufLogReader.java | 529 -------------------
.../regionserver/wal/ProtobufWALStreamReader.java | 136 +++++
.../regionserver/wal/ProtobufWALTailingReader.java | 331 ++++++++++++
.../hadoop/hbase/regionserver/wal/ReaderBase.java | 185 -------
.../regionserver/wal/SecureProtobufLogReader.java | 149 ------
.../regionserver/wal/WALHeaderEOFException.java | 44 ++
.../regionserver/ReplicationSourceWALReader.java | 179 +++----
.../SerialReplicationSourceWALReader.java | 33 +-
.../replication/regionserver/WALEntryStream.java | 464 +++++++++--------
.../hadoop/hbase/wal/AbstractFSWALProvider.java | 75 +--
.../wal/AbstractRecoveredEditsOutputSink.java | 3 +-
.../main/java/org/apache/hadoop/hbase/wal/WAL.java | 15 -
.../org/apache/hadoop/hbase/wal/WALFactory.java | 111 ++--
.../apache/hadoop/hbase/wal/WALPrettyPrinter.java | 14 +-
.../org/apache/hadoop/hbase/wal/WALSplitter.java | 26 +-
.../apache/hadoop/hbase/wal/WALStreamReader.java | 64 +++
.../apache/hadoop/hbase/wal/WALTailingReader.java | 148 ++++++
.../TestSequenceIdMonotonicallyIncreasing.java | 7 +-
.../hadoop/hbase/master/AbstractTestDLS.java | 16 -
.../hadoop/hbase/regionserver/TestHRegion.java | 14 +-
.../regionserver/TestHRegionReplayEvents.java | 8 +-
.../hbase/regionserver/TestRecoveredEdits.java | 3 +-
.../TestWALMonotonicallyIncreasingSeqId.java | 9 +-
.../regionserver/wal/AbstractTestProtobufLog.java | 69 +--
.../regionserver/wal/AbstractTestWALReplay.java | 14 +-
...der.java => FaultyProtobufWALStreamReader.java} | 20 +-
.../hbase/regionserver/wal/TestDurability.java | 10 +-
.../hbase/regionserver/wal/TestLogRolling.java | 8 +-
.../regionserver/wal/TestSecureAsyncWALReplay.java | 3 -
.../regionserver/wal/TestSecureWALReplay.java | 3 -
.../replication/SerialReplicationTestBase.java | 16 +-
.../TestReplicationEmptyWALRecovery.java | 7 +-
.../hbase/replication/TestSerialReplication.java | 12 +-
.../regionserver/TestBasicWALEntryStream.java | 213 ++++----
.../TestBasicWALEntryStreamAsyncFSWAL.java | 3 +
.../TestBasicWALEntryStreamFSHLog.java | 3 +
.../TestRaceWhenCreatingReplicationSource.java | 8 +-
.../regionserver/TestReplicationSource.java | 4 +-
.../TestWALEntryStreamDifferentCounts.java | 11 +-
.../regionserver/WALEntryStreamTestBase.java | 42 +-
.../hadoop/hbase/wal/CompressedWALTestBase.java | 2 +-
.../hadoop/hbase/wal/NoEOFWALStreamReader.java | 95 ++++
.../hadoop/hbase/wal/TestParsePartialWALFile.java | 213 ++++++++
.../org/apache/hadoop/hbase/wal/TestSecureWAL.java | 33 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 302 +++++------
.../hbase/wal/TestWALOpenAfterDNRollingStart.java | 4 +-
.../hadoop/hbase/wal/TestWALReaderOnSecureWAL.java | 225 --------
.../org/apache/hadoop/hbase/wal/TestWALSplit.java | 66 +--
.../hadoop/hbase/wal/WALPerformanceEvaluation.java | 5 +-
58 files changed, 2548 insertions(+), 2095 deletions(-)
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
index ef2dcc56afb..de69b5fb841 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestIngestWithEncryption.java
@@ -26,11 +26,9 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.EncryptionTest;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Before;
@@ -54,8 +52,6 @@ public class IntegrationTestIngestWithEncryption extends IntegrationTestIngest {
conf.setInt(HFile.FORMAT_VERSION_KEY, 3);
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
Writer.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
index 60c3e46249e..7362f585d31 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java
@@ -32,12 +32,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -135,7 +137,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
* HLogInputFormat.
*/
static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> {
- private Reader reader = null;
+ private WALStreamReader reader = null;
// visible until we can remove the deprecated HLogInputFormat
Entry currentEntry = new Entry();
private long startTime;
@@ -144,6 +146,47 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
private Path logFile;
private long currentPos;
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
+ justification = "HDFS-4380")
+ private WALStreamReader openReader(Path path, long startPosition) throws IOException {
+ long retryInterval = 2000; // 2 sec
+ int maxAttempts = 30;
+ int attempt = 0;
+ Exception ee = null;
+ WALStreamReader reader = null;
+ while (reader == null && attempt++ < maxAttempts) {
+ try {
+ // Detect if this is a new file, if so get a new reader else
+ // reset the current reader so that we see the new data
+ reader =
+ WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition);
+ return reader;
+ } catch (LeaseNotRecoveredException lnre) {
+ // HBASE-15019 the WAL was not closed due to some hiccup.
+ LOG.warn("Try to recover the WAL lease " + path, lnre);
+ AbstractFSWALProvider.recoverLease(conf, path);
+ reader = null;
+ ee = lnre;
+ } catch (NullPointerException npe) {
+ // Workaround for race condition in HDFS-4380
+ // which throws a NPE if we open a file before any data node has the most recent block
+ // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+ LOG.warn("Got NPE opening reader, will retry.");
+ reader = null;
+ ee = npe;
+ }
+ if (reader == null) {
+ // sleep before next attempt
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ throw new IOException("Could not open reader", ee);
+ }
+
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
@@ -158,8 +201,7 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
private void openReader(Path path) throws IOException {
closeReader();
- reader = AbstractFSWALProvider.openReader(path, conf);
- seek();
+ reader = openReader(path, currentPos > 0 ? currentPos : -1);
setCurrentPath(path);
}
@@ -174,12 +216,6 @@ public class WALInputFormat extends InputFormat<WALKey, WALEdit> {
}
}
- private void seek() throws IOException {
- if (currentPos != 0) {
- reader.seek(currentPos);
- }
- }
-
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (reader == null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
index 1f21e375ce9..2c0c98d86f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java
@@ -273,6 +273,9 @@ public final class MasterRegion {
WAL wal = createWAL(walFactory, walRoller, serverName, walFs, walRootDir, regionInfo);
conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
replayEditsDir.makeQualified(walFs.getUri(), walFs.getWorkingDirectory()).toString());
+ // we do not do WAL splitting here so it is possible to have uncleanly closed WAL files, so we
+ // need to ignore EOFException.
+ conf.setBoolean(HRegion.RECOVERED_EDITS_IGNORE_EOF, true);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, td, wal, null, null);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
index 3070d2732b7..b76680d0fdb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/WALProcedurePrettyPrinter.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -88,7 +89,7 @@ public class WALProcedurePrettyPrinter extends AbstractHBaseTool {
protected int doWork() throws Exception {
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);
- try (WAL.Reader reader = WALFactory.createReader(fs, path, conf)) {
+ try (WALStreamReader reader = WALFactory.createStreamReader(fs, path, conf)) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 7edc43f297a..61bd79b0cae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -183,6 +183,7 @@ import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -263,6 +264,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String SPECIAL_RECOVERED_EDITS_DIR =
"hbase.hregion.special.recovered.edits.dir";
+ /**
+ * Mainly used for master local region, where we will replay the WAL file directly without
+ * splitting, so it is possible to have WAL files which are not closed cleanly, in this way,
+ * hitting EOF is expected so should not consider it as a critical problem.
+ */
+ public static final String RECOVERED_EDITS_IGNORE_EOF =
+ "hbase.hregion.recovered.edits.ignore.eof";
+
/**
* Whether to use {@link MetaCellComparator} even if we are not meta region. Used when creating
* master local region.
@@ -5286,9 +5295,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MonitoredTask status = TaskMonitor.get().createStatus(msg);
status.setStatus("Opening recovered edits");
- WAL.Reader reader = null;
- try {
- reader = WALFactory.createReader(fs, edits, conf);
+ try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
long currentEditSeqId = -1;
long currentReplaySeqId = -1;
long firstSeqIdInLog = -1;
@@ -5442,12 +5449,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
coprocessorHost.postReplayWALs(this.getRegionInfo(), edits);
}
} catch (EOFException eof) {
- Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
- msg = "EnLongAddered EOF. Most likely due to Master failure during "
- + "wal splitting, so we have this data in another edit. Continuing, but renaming " + edits
- + " as " + p + " for region " + this;
- LOG.warn(msg, eof);
- status.abort(msg);
+ if (!conf.getBoolean(RECOVERED_EDITS_IGNORE_EOF, false)) {
+ Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits);
+ msg = "EnLongAddered EOF. Most likely due to Master failure during "
+ + "wal splitting, so we have this data in another edit. Continuing, but renaming "
+ + edits + " as " + p + " for region " + this;
+ LOG.warn(msg, eof);
+ status.abort(msg);
+ } else {
+ LOG.warn("EOF while replaying recover edits and config '{}' is true so "
+ + "we will ignore it and continue", RECOVERED_EDITS_IGNORE_EOF, eof);
+ }
} catch (IOException ioe) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
@@ -5474,9 +5486,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return currentEditSeqId;
} finally {
status.cleanup();
- if (reader != null) {
- reader.close();
- }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 18b8f7990c8..24e8b9b26a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -105,8 +105,9 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
* (smaller) than the most-recent flush.
* <p>
- * To read an WAL, call
- * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. *
+ * To read an WAL, call {@link WALFactory#createStreamReader(FileSystem, Path)} for one way read,
+ * call {@link WALFactory#createTailingReader(FileSystem, Path, Configuration, long)} for
+ * replication where we may want to tail the active WAL file.
* <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL
* is now a lame duck; any more appends or syncs will fail also with the same original exception. If
* we have made successful appends to the WAL and we then are unable to sync them, our current
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 8437fef3bc2..d89ef6f2110 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
-import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC;
+import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.WAL_TRAILER_WARN_SIZE;
import java.io.IOException;
import java.io.OutputStream;
@@ -185,8 +187,7 @@ public abstract class AbstractProtobufLogWriter {
headerBuilder.setValueCompressionAlgorithm(
CompressionContext.getValueCompressionAlgorithm(conf).ordinal());
}
- length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC,
- buildWALHeader(conf, headerBuilder)));
+ length.set(writeMagicAndWALHeader(PB_WAL_MAGIC, buildWALHeader(conf, headerBuilder)));
initAfterHeader(doCompress);
@@ -257,7 +258,7 @@ public abstract class AbstractProtobufLogWriter {
LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize
+ " > " + this.trailerWarnSize);
}
- length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC));
+ length.set(writeWALTrailerAndMagic(trailer, PB_WAL_COMPLETE_MAGIC));
this.trailerWritten = true;
} catch (IOException ioe) {
LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufWALReader.java
new file mode 100644
index 00000000000..3af49107d2c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufWALReader.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.security.Key;
+import java.security.KeyException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Cipher;
+import org.apache.hadoop.hbase.io.crypto.Decryptor;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.util.LRUDictionary;
+import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EncryptionTest;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+
+/**
+ * Base class for reading protobuf based wal reader
+ */
+@InterfaceAudience.Private
+public abstract class AbstractProtobufWALReader
+ implements AbstractFSWALProvider.Initializer, Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractProtobufWALReader.class);
+
+ // public for WALFactory until we move everything to o.a.h.h.wal
+ public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
+
+ // public for TestWALSplit
+ public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
+
+ /**
+ * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
+ * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
+ */
+ static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
+ static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
+
+ private static final List<String> WRITER_CLS_NAMES =
+ ImmutableList.of(ProtobufLogWriter.class.getSimpleName(),
+ AsyncProtobufLogWriter.class.getSimpleName(), SecureProtobufLogWriter.class.getSimpleName(),
+ SecureAsyncProtobufLogWriter.class.getSimpleName());
+
+ protected Configuration conf;
+
+ protected FileSystem fs;
+
+ protected Path path;
+
+ protected long fileLength;
+
+ protected FSDataInputStream inputStream;
+
+ protected CompressionContext compressionCtx;
+ protected boolean hasCompression = false;
+ protected boolean hasTagCompression = false;
+ protected boolean hasValueCompression = false;
+ protected Compression.Algorithm valueCompressionType;
+
+ protected Codec.Decoder cellDecoder;
+ protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
+
+ protected long walEditsStopOffset;
+ protected boolean trailerPresent;
+ protected WALTrailer trailer;
+ // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
+ // than this size, it is written/read respectively, with a WARN message in the log.
+ protected int trailerWarnSize;
+
+ // cell codec classname
+ protected String codecClsName;
+
+ protected Decryptor decryptor;
+
+ /**
+ * Get or create the input stream used by cell decoder.
+ * <p/>
+ * For implementing replication, we may need to limit the bytes we can read, so here we provide a
+ * method so sub classes can wrap the original input stream.
+ */
+ protected abstract InputStream getCellCodecInputStream(FSDataInputStream stream);
+
+ /**
+ * Skip to the given position.
+ */
+ protected abstract void skipTo(long position) throws IOException;
+
+ @Override
+ public void init(FileSystem fs, Path path, Configuration conf, long startPosition)
+ throws IOException {
+ this.conf = conf;
+ this.path = path;
+ this.fs = fs;
+ this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
+
+ Pair<FSDataInputStream, FileStatus> pair = open();
+ FSDataInputStream stream = pair.getFirst();
+ FileStatus stat = pair.getSecond();
+ boolean initSucceeded = false;
+ try {
+ // read the header
+ WALProtos.WALHeader header = readHeader(stream);
+ // initialize metadata and fields
+ initDecryptor(header);
+ initCompression(header);
+ initWALCellCodec(header, getCellCodecInputStream(stream));
+
+ // read trailer if available
+ readTrailer(stream, stat);
+
+ // this is intentional as we do not want the above methods to use the inputStream field. For
+ // implementation tailing reader, we need to wrap the input stream when creating cell decoder,
+ // so we need to make sure in the above methods we do not accidentally use the stored
+ // inputStream directly and cause trouble. If a method needs to use an input stream, we just
+ // pass the input stream in, like readHeader and readTrailer.
+ this.inputStream = stream;
+
+ // seek to the given position if it is not -1
+ if (startPosition >= 0 && startPosition != inputStream.getPos()) {
+ if (compressionCtx != null) {
+ // skip to the position, as we need to construct the compression dictionary
+ skipTo(startPosition);
+ } else {
+ // just seek to the position
+ stream.seek(startPosition);
+ }
+ }
+ initSucceeded = true;
+ } finally {
+ if (!initSucceeded) {
+ Closeables.close(stream, initSucceeded);
+ inputStream = null;
+ }
+ }
+ }
+
+ private Pair<FSDataInputStream, FileStatus> openArchivedWAL() throws IOException {
+ Path archivedWAL = AbstractFSWALProvider.findArchivedLog(path, conf);
+ if (archivedWAL != null) {
+ // try open from oldWAL dir
+ return Pair.newPair(fs.open(archivedWAL), fs.getFileStatus(archivedWAL));
+ } else {
+ return null;
+ }
+ }
+
+ protected final Pair<FSDataInputStream, FileStatus> open() throws IOException {
+ try {
+ return Pair.newPair(fs.open(path), fs.getFileStatus(path));
+ } catch (FileNotFoundException e) {
+ Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL();
+ if (pair != null) {
+ return pair;
+ } else {
+ throw e;
+ }
+ } catch (RemoteException re) {
+ IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
+ if (!(ioe instanceof FileNotFoundException)) {
+ throw ioe;
+ }
+ Pair<FSDataInputStream, FileStatus> pair = openArchivedWAL();
+ if (pair != null) {
+ return pair;
+ } else {
+ throw ioe;
+ }
+ }
+ }
+
+ protected final WALProtos.WALHeader readHeader(FSDataInputStream stream) throws IOException {
+ byte[] magic = new byte[PB_WAL_MAGIC.length];
+ try {
+ stream.readFully(magic);
+ } catch (EOFException e) {
+ throw new WALHeaderEOFException("EOF while reading PB WAL magic", e);
+ }
+ if (!Arrays.equals(PB_WAL_MAGIC, magic)) {
+ throw new IOException("Invalid PB WAL magic " + Bytes.toStringBinary(magic) + ", expected "
+ + Bytes.toStringBinary(PB_WAL_MAGIC));
+ }
+ WALProtos.WALHeader header;
+ try {
+ header = ProtobufUtil.parseDelimitedFrom(stream, WALProtos.WALHeader.parser());
+ } catch (InvalidProtocolBufferException e) {
+ if (ProtobufUtil.isEOF(e)) {
+ throw new WALHeaderEOFException("EOF while reading PB header", e);
+ } else {
+ throw e;
+ }
+ } catch (EOFException e) {
+ throw new WALHeaderEOFException("EOF while reading PB header", e);
+ }
+ if (header == null) {
+ throw new WALHeaderEOFException("EOF while reading PB header");
+ }
+ if (header.hasWriterClsName() && !getWriterClsNames().contains(header.getWriterClsName())) {
+ throw new IOException("Got unknown writer class: " + header.getWriterClsName());
+ }
+ return header;
+ }
+
+ private void initDecryptor(WALProtos.WALHeader header) throws IOException {
+ if (!header.hasEncryptionKey()) {
+ return;
+ }
+ EncryptionTest.testKeyProvider(conf);
+ EncryptionTest.testCipherProvider(conf);
+
+ // Retrieve a usable key
+ byte[] keyBytes = header.getEncryptionKey().toByteArray();
+ Key key = null;
+ String walKeyName = conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY);
+ // First try the WAL key, if one is configured
+ if (walKeyName != null) {
+ try {
+ key = EncryptionUtil.unwrapWALKey(conf, walKeyName, keyBytes);
+ } catch (KeyException e) {
+ LOG.debug("Unable to unwrap key with WAL key '{}'", walKeyName, e);
+ key = null;
+ }
+ }
+ if (key == null) {
+ String masterKeyName =
+ conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName());
+ try {
+ // Then, try the cluster master key
+ key = EncryptionUtil.unwrapWALKey(conf, masterKeyName, keyBytes);
+ } catch (KeyException e) {
+ // If the current master key fails to unwrap, try the alternate, if
+ // one is configured
+ LOG.debug("Unable to unwrap key with current master key '{}'", masterKeyName, e);
+ String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
+ if (alternateKeyName != null) {
+ try {
+ key = EncryptionUtil.unwrapWALKey(conf, alternateKeyName, keyBytes);
+ } catch (KeyException ex) {
+ throw new IOException(ex);
+ }
+ } else {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ // Use the algorithm the key wants
+ Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
+ if (cipher == null) {
+ throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
+ }
+
+ // Set up the decryptor for this WAL
+
+ decryptor = cipher.getDecryptor();
+ decryptor.setKey(key);
+
+ LOG.debug("Initialized secure protobuf WAL: cipher={}", cipher.getName());
+ }
+
+ private void initCompression(WALProtos.WALHeader header) throws IOException {
+ this.hasCompression = header.hasHasCompression() && header.getHasCompression();
+ if (!hasCompression) {
+ return;
+ }
+ this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
+ this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression();
+ if (header.hasValueCompressionAlgorithm()) {
+ try {
+ this.valueCompressionType =
+ Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("Invalid compression type", e);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Initializing compression context for {}: isRecoveredEdits={}"
+ + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
+ path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression, hasValueCompression,
+ valueCompressionType);
+ }
+ try {
+ compressionCtx =
+ new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
+ hasTagCompression, hasValueCompression, valueCompressionType);
+ } catch (Exception e) {
+ throw new IOException("Failed to initialize CompressionContext", e);
+ }
+ }
+
+ private WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
+ CompressionContext compressionContext) throws IOException {
+ return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
+ }
+
+ protected final void initWALCellCodec(WALProtos.WALHeader header, InputStream inputStream)
+ throws IOException {
+ String cellCodecClsName = header.hasCellCodecClsName() ? header.getCellCodecClsName() : null;
+ if (decryptor != null && SecureWALCellCodec.class.getName().equals(cellCodecClsName)) {
+ WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor);
+ this.cellDecoder = codec.getDecoder(inputStream);
+ // We do not support compression with WAL encryption
+ this.compressionCtx = null;
+ this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
+ this.hasCompression = false;
+ this.hasTagCompression = false;
+ this.hasValueCompression = false;
+ } else {
+ WALCellCodec codec = getCodec(conf, cellCodecClsName, compressionCtx);
+ this.cellDecoder = codec.getDecoder(inputStream);
+ if (this.hasCompression) {
+ this.byteStringUncompressor = codec.getByteStringUncompressor();
+ } else {
+ this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
+ }
+ }
+ this.codecClsName = cellCodecClsName;
+ }
+
+ protected final void readTrailer(FSDataInputStream stream, FileStatus stat) throws IOException {
+ this.fileLength = stat.getLen();
+ this.walEditsStopOffset = this.fileLength;
+ long currentPos = stream.getPos();
+ // we will reset walEditsStopOffset if trailer if available
+ trailerPresent = setTrailerIfPresent(stream);
+ if (currentPos != stream.getPos()) {
+ // seek back
+ stream.seek(currentPos);
+ }
+ }
+
+ /**
+ * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
+ * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
+ * the trailer, and checks whether the trailer is present at the end or not by comparing the last
+ * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
+ * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
+ * before the trailer.
+ * <p/>
+ * The trailer is ignored in case:
+ * <ul>
+ * <li>fileLength is 0 or not correct (when file is under recovery, etc).
+ * <li>the trailer size is negative.
+ * </ul>
+ * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
+ * @return true if a valid trailer is present
+ */
+ private boolean setTrailerIfPresent(FSDataInputStream stream) throws IOException {
+ try {
+ long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
+ if (trailerSizeOffset <= 0) {
+ // no trailer possible.
+ return false;
+ }
+ stream.seek(trailerSizeOffset);
+ // read the int as trailer size.
+ int trailerSize = stream.readInt();
+ ByteBuffer buf = ByteBuffer.allocate(PB_WAL_COMPLETE_MAGIC.length);
+ stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
+ if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
+ LOG.trace("No trailer found.");
+ return false;
+ }
+ if (trailerSize < 0) {
+ LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
+ return false;
+ } else if (trailerSize > this.trailerWarnSize) {
+ // continue reading after warning the user.
+ LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
+ + trailerSize + " > " + this.trailerWarnSize);
+ }
+ // seek to the position where trailer starts.
+ long positionOfTrailer = trailerSizeOffset - trailerSize;
+ stream.seek(positionOfTrailer);
+ // read the trailer.
+ buf = ByteBuffer.allocate(trailerSize);// for trailer.
+ stream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
+ trailer = WALTrailer.parseFrom(buf.array());
+ this.walEditsStopOffset = positionOfTrailer;
+ return true;
+ } catch (IOException ioe) {
+ LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
+ }
+ return false;
+ }
+
+ protected final boolean reachWALEditsStopOffset(long pos) {
+ if (trailerPresent && pos > 0 && pos == walEditsStopOffset) {
+ LOG.trace("Reached end of expected edits area at offset {}", pos);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Returns names of the accepted writer classes
+ */
+ public List<String> getWriterClsNames() {
+ return WRITER_CLS_NAMES;
+ }
+
+ /**
+ * Returns the cell codec classname
+ */
+ public String getCodecClsName() {
+ return codecClsName;
+ }
+
+ public long getPosition() throws IOException {
+ return inputStream != null ? inputStream.getPos() : -1;
+ }
+
+ public long trailerSize() {
+ if (trailerPresent) {
+ // sizeof PB_WAL_COMPLETE_MAGIC + sizeof trailerSize + trailer
+ final long calculatedSize =
+ (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
+ final long expectedSize = fileLength - walEditsStopOffset;
+ if (expectedSize != calculatedSize) {
+ LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we "
+ + "calculate it as being {}", expectedSize, calculatedSize);
+ }
+ return expectedSize;
+ } else {
+ return -1L;
+ }
+ }
+
+ protected final String getPositionQuietly() {
+ try {
+ long pos = getPosition();
+ return pos >= 0 ? Long.toString(pos) : "<unknown>";
+ } catch (Exception e) {
+ LOG.warn("failed to get position, ignoring", e);
+ return "<unknown>";
+ }
+ }
+
+ protected final IOException extractHiddenEof(Exception ex) {
+ // There are two problems we are dealing with here. Hadoop stream throws generic exception
+ // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
+ IOException ioEx = null;
+ if (ex instanceof EOFException) {
+ return (EOFException) ex;
+ } else if (ex instanceof IOException) {
+ ioEx = (IOException) ex;
+ } else if (
+ ex instanceof RuntimeException && ex.getCause() != null
+ && ex.getCause() instanceof IOException
+ ) {
+ ioEx = (IOException) ex.getCause();
+ }
+ if ((ioEx != null) && (ioEx.getMessage() != null)) {
+ if (ioEx.getMessage().contains("EOF")) {
+ return ioEx;
+ }
+ return null;
+ }
+ return null;
+ }
+
+ /**
+ * This is used to determine whether we have already reached the WALTrailer. As the size and magic
+ * are at the end of the WAL file, it is possible that these two options are missing while
+ * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
+ * will try to decode it as WALKey and we will fail but the error could be vary as it is parsing
+ * WALTrailer actually.
+ * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
+ */
+ protected final boolean isWALTrailer(long startPosition) throws IOException {
+ // We have nothing in the WALTrailer PB message now so its size is just a int length size and a
+ // magic at the end
+ int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
+ if (fileLength - startPosition >= trailerSize) {
+ // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
+ // We also test for == here because if this is a valid trailer, we can read it while opening
+ // the reader so we should not reach here
+ return false;
+ }
+ inputStream.seek(startPosition);
+ for (int i = 0; i < 4; i++) {
+ int r = inputStream.read();
+ if (r == -1) {
+ // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
+ // is a partial trailer
+ return true;
+ }
+ if (r != 0) {
+ // the length is not 0, should not be a trailer
+ return false;
+ }
+ }
+ for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
+ int r = inputStream.read();
+ if (r == -1) {
+ // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
+ // this is a partial trailer
+ return true;
+ }
+ if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
+ // does not match magic, should not be a trailer
+ return false;
+ }
+ }
+ // in fact we should not reach here, as this means the trailer bytes are all matched and
+ // complete, then we should not call this method...
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (inputStream != null) {
+ IOUtils.closeQuietly(inputStream);
+ inputStream = null;
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index bed31530d87..aebc8e07725 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.io.WritableUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -69,21 +70,22 @@ public class Compressor {
FileSystem inFS = input.getFileSystem(conf);
FileSystem outFS = output.getFileSystem(conf);
- WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf);
+ WALStreamReader in = WALFactory.createStreamReader(inFS, input, conf);
WALProvider.Writer out = null;
try {
- if (!(in instanceof ReaderBase)) {
+ if (!(in instanceof AbstractProtobufWALReader)) {
System.err.println("Cannot proceed, invalid reader type: " + in.getClass().getName());
return;
}
- boolean compress = ((ReaderBase) in).hasCompression();
+ boolean compress = ((AbstractProtobufWALReader) in).hasCompression;
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress);
out = WALFactory.createWALWriter(outFS, output, conf);
WAL.Entry e = null;
- while ((e = in.next()) != null)
+ while ((e = in.next()) != null) {
out.append(e);
+ }
} finally {
in.close();
if (out != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
deleted file mode 100644
index a7ca1827845..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader.Builder;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
-
-/**
- * A Protobuf based WAL has the following structure:
- * <p>
- * <PB_WAL_MAGIC><WALHeader><WALEdits>...<WALEdits><Trailer>
- * <TrailerSize> <PB_WAL_COMPLETE_MAGIC>
- * </p>
- * The Reader reads meta information (WAL Compression state, WALTrailer, etc) in
- * ProtobufLogReader#initReader(FSDataInputStream). A WALTrailer is an extensible structure which is
- * appended at the end of the WAL. This is empty for now; it can contain some meta information such
- * as Region level stats, etc in future.
- */
-@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX,
- HBaseInterfaceAudience.CONFIG })
-public class ProtobufLogReader extends ReaderBase {
- private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogReader.class);
- // public for WALFactory until we move everything to o.a.h.h.wal
- @InterfaceAudience.Private
- public static final byte[] PB_WAL_MAGIC = Bytes.toBytes("PWAL");
- // public for TestWALSplit
- @InterfaceAudience.Private
- public static final byte[] PB_WAL_COMPLETE_MAGIC = Bytes.toBytes("LAWP");
- /**
- * Configuration name of WAL Trailer's warning size. If a waltrailer's size is greater than the
- * configured size, providers should log a warning. e.g. this is used with Protobuf reader/writer.
- */
- static final String WAL_TRAILER_WARN_SIZE = "hbase.regionserver.waltrailer.warn.size";
- static final int DEFAULT_WAL_TRAILER_WARN_SIZE = 1024 * 1024; // 1MB
-
- protected FSDataInputStream inputStream;
- protected Codec.Decoder cellDecoder;
- protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
- protected boolean hasCompression = false;
- protected boolean hasTagCompression = false;
- protected boolean hasValueCompression = false;
- protected Compression.Algorithm valueCompressionType = null;
- // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit
- // entry in the wal, the inputstream's position is equal to walEditsStopOffset.
- private long walEditsStopOffset;
- private boolean trailerPresent;
- protected WALTrailer trailer;
- // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger
- // than this size, it is written/read respectively, with a WARN message in the log.
- protected int trailerWarnSize;
- private static List<String> writerClsNames = new ArrayList<>();
- static {
- writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
- writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
- }
-
- // cell codec classname
- private String codecClsName = null;
-
- // a flag indicate that whether we need to reset compression context when seeking back
- private boolean resetCompression;
-
- @InterfaceAudience.Private
- public long trailerSize() {
- if (trailerPresent) {
- // sizeof PB_WAL_COMPLETE_MAGIC + sizof trailerSize + trailer
- final long calculatedSize =
- (long) PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT + trailer.getSerializedSize();
- final long expectedSize = fileLength - walEditsStopOffset;
- if (expectedSize != calculatedSize) {
- LOG.warn("After parsing the trailer, we expect the total footer to be {} bytes, but we "
- + "calculate it as being {}", expectedSize, calculatedSize);
- }
- return expectedSize;
- } else {
- return -1L;
- }
- }
-
- enum WALHdrResult {
- EOF, // stream is at EOF when method starts
- SUCCESS,
- UNKNOWN_WRITER_CLS // name of writer class isn't recognized
- }
-
- // context for WALHdr carrying information such as Cell Codec classname
- static class WALHdrContext {
- WALHdrResult result;
- String cellCodecClsName;
-
- WALHdrContext(WALHdrResult result, String cellCodecClsName) {
- this.result = result;
- this.cellCodecClsName = cellCodecClsName;
- }
-
- WALHdrResult getResult() {
- return result;
- }
-
- String getCellCodecClsName() {
- return cellCodecClsName;
- }
- }
-
- public ProtobufLogReader() {
- super();
- }
-
- @Override
- public void close() throws IOException {
- if (this.inputStream != null) {
- this.inputStream.close();
- this.inputStream = null;
- }
- }
-
- @Override
- public long getPosition() throws IOException {
- return inputStream.getPos();
- }
-
- @Override
- public void reset() throws IOException {
- String clsName = initInternal(null, false);
- if (resetCompression) {
- resetCompression();
- }
- initAfterCompression(clsName); // We need a new decoder (at least).
- }
-
- @Override
- public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
- throws IOException {
- this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
- super.init(fs, path, conf, stream);
- }
-
- @Override
- protected String initReader(FSDataInputStream stream) throws IOException {
- return initInternal(stream, true);
- }
-
- /*
- * Returns names of the accepted writer classes
- */
- public List<String> getWriterClsNames() {
- return writerClsNames;
- }
-
- /*
- * Returns the cell codec classname
- */
- public String getCodecClsName() {
- return codecClsName;
- }
-
- protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream) throws IOException {
- boolean res = builder.mergeDelimitedFrom(stream);
- if (!res) return new WALHdrContext(WALHdrResult.EOF, null);
- if (builder.hasWriterClsName() && !getWriterClsNames().contains(builder.getWriterClsName())) {
- return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null);
- }
- String clsName = null;
- if (builder.hasCellCodecClsName()) {
- clsName = builder.getCellCodecClsName();
- }
- return new WALHdrContext(WALHdrResult.SUCCESS, clsName);
- }
-
- private String initInternal(FSDataInputStream stream, boolean isFirst) throws IOException {
- close();
- if (!isFirst) {
- // Re-compute the file length.
- this.fileLength = fs.getFileStatus(path).getLen();
- }
- long expectedPos = PB_WAL_MAGIC.length;
- if (stream == null) {
- stream = fs.open(path);
- stream.seek(expectedPos);
- }
- if (stream.getPos() != expectedPos) {
- throw new IOException("The stream is at invalid position: " + stream.getPos());
- }
- // Initialize metadata or, when we reset, just skip the header.
- WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder();
- WALHdrContext hdrCtxt = readHeader(builder, stream);
- WALHdrResult walHdrRes = hdrCtxt.getResult();
- if (walHdrRes == WALHdrResult.EOF) {
- throw new EOFException("Couldn't read WAL PB header");
- }
- if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) {
- throw new IOException("Got unknown writer class: " + builder.getWriterClsName());
- }
- if (isFirst) {
- WALProtos.WALHeader header = builder.build();
- this.hasCompression = header.hasHasCompression() && header.getHasCompression();
- this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
- this.hasValueCompression = header.hasHasValueCompression() && header.getHasValueCompression();
- if (header.hasValueCompressionAlgorithm()) {
- try {
- this.valueCompressionType =
- Compression.Algorithm.values()[header.getValueCompressionAlgorithm()];
- } catch (ArrayIndexOutOfBoundsException e) {
- throw new IOException("Invalid compression type", e);
- }
- }
- }
- this.inputStream = stream;
- this.walEditsStopOffset = this.fileLength;
- long currentPosition = stream.getPos();
- trailerPresent = setTrailerIfPresent();
- this.seekOnFs(currentPosition);
- if (LOG.isTraceEnabled()) {
- LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset
- + ", fileLength: " + this.fileLength + ", " + "trailerPresent: "
- + (trailerPresent ? "true, size: " + trailer.getSerializedSize() : "false")
- + ", currentPosition: " + currentPosition);
- }
-
- codecClsName = hdrCtxt.getCellCodecClsName();
-
- return hdrCtxt.getCellCodecClsName();
- }
-
- /**
- * To check whether a trailer is present in a WAL, it seeks to position (fileLength -
- * PB_WAL_COMPLETE_MAGIC.size() - Bytes.SIZEOF_INT). It reads the int value to know the size of
- * the trailer, and checks whether the trailer is present at the end or not by comparing the last
- * PB_WAL_COMPLETE_MAGIC.size() bytes. In case trailer is not present, it returns false;
- * otherwise, sets the trailer and sets this.walEditsStopOffset variable up to the point just
- * before the trailer.
- * <ul>
- * The trailer is ignored in case:
- * <li>fileLength is 0 or not correct (when file is under recovery, etc).
- * <li>the trailer size is negative.
- * </ul>
- * <p>
- * In case the trailer size > this.trailerMaxSize, it is read after a WARN message.
- * @return true if a valid trailer is present
- */
- private boolean setTrailerIfPresent() {
- try {
- long trailerSizeOffset = this.fileLength - (PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT);
- if (trailerSizeOffset <= 0) return false;// no trailer possible.
- this.seekOnFs(trailerSizeOffset);
- // read the int as trailer size.
- int trailerSize = this.inputStream.readInt();
- ByteBuffer buf = ByteBuffer.allocate(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length);
- this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
- if (!Arrays.equals(buf.array(), PB_WAL_COMPLETE_MAGIC)) {
- LOG.trace("No trailer found.");
- return false;
- }
- if (trailerSize < 0) {
- LOG.warn("Invalid trailer Size " + trailerSize + ", ignoring the trailer");
- return false;
- } else if (trailerSize > this.trailerWarnSize) {
- // continue reading after warning the user.
- LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum configured size : "
- + trailerSize + " > " + this.trailerWarnSize);
- }
- // seek to the position where trailer starts.
- long positionOfTrailer = trailerSizeOffset - trailerSize;
- this.seekOnFs(positionOfTrailer);
- // read the trailer.
- buf = ByteBuffer.allocate(trailerSize);// for trailer.
- this.inputStream.readFully(buf.array(), buf.arrayOffset(), buf.capacity());
- trailer = WALTrailer.parseFrom(buf.array());
- this.walEditsStopOffset = positionOfTrailer;
- return true;
- } catch (IOException ioe) {
- LOG.warn("Got IOE while reading the trailer. Continuing as if no trailer is present.", ioe);
- }
- return false;
- }
-
- protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName,
- CompressionContext compressionContext) throws IOException {
- return WALCellCodec.create(conf, cellCodecClsName, compressionContext);
- }
-
- @Override
- protected void initAfterCompression() throws IOException {
- initAfterCompression(null);
- }
-
- @Override
- protected void initAfterCompression(String cellCodecClsName) throws IOException {
- WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext);
- this.cellDecoder = codec.getDecoder(this.inputStream);
- if (this.hasCompression) {
- this.byteStringUncompressor = codec.getByteStringUncompressor();
- } else {
- this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
- }
- }
-
- @Override
- protected boolean hasCompression() {
- return this.hasCompression;
- }
-
- @Override
- protected boolean hasTagCompression() {
- return this.hasTagCompression;
- }
-
- @Override
- protected boolean hasValueCompression() {
- return this.hasValueCompression;
- }
-
- @Override
- protected Compression.Algorithm getValueCompressionAlgorithm() {
- return this.valueCompressionType;
- }
-
- @Override
- protected boolean readNext(Entry entry) throws IOException {
- resetCompression = false;
- // OriginalPosition might be < 0 on local fs; if so, it is useless to us.
- long originalPosition = this.inputStream.getPos();
- if (trailerPresent && originalPosition > 0 && originalPosition == this.walEditsStopOffset) {
- LOG.trace("Reached end of expected edits area at offset {}", originalPosition);
- return false;
- }
- boolean resetPosition = false;
- try {
- WALKey walKey;
- try {
- walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALKey.parser());
- } catch (InvalidProtocolBufferException e) {
- if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
- // only rethrow EOF if it indicates an EOF, or we have reached the partial WALTrailer
- resetPosition = true;
- throw (EOFException) new EOFException("Invalid PB, EOF? Ignoring; originalPosition="
- + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
- } else {
- throw e;
- }
- } catch (EOFException e) {
- // append more detailed information
- throw (EOFException) new EOFException("EOF while reading WAL key; originalPosition="
- + originalPosition + ", currentPosition=" + this.inputStream.getPos()).initCause(e);
- }
- entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
- if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
- LOG.debug("WALKey has no KVs that follow it; trying the next one. current offset={}",
- this.inputStream.getPos());
- return true;
- }
- // Starting from here, we will start to read cells, which will change the content in
- // compression dictionary, so if we fail in the below operations, when resetting, we also need
- // to clear the compression context, and read from the beginning to reconstruct the
- // compression dictionary, instead of seeking to the position directly.
- // This is very useful for saving the extra effort for reconstructing the compression
- // dictionary, as DFSInputStream implement the available method, so in most cases we will
- // not reach here if there are not enough data.
- resetCompression = true;
- int expectedCells = walKey.getFollowingKvCount();
- long posBefore = this.inputStream.getPos();
- try {
- int actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
- if (expectedCells != actualCells) {
- resetPosition = true;
- throw new EOFException("Only read " + actualCells); // other info added in catch
- }
- } catch (Exception ex) {
- String posAfterStr = "<unknown>";
- try {
- posAfterStr = this.inputStream.getPos() + "";
- } catch (Throwable t) {
- LOG.trace("Error getting pos for error message - ignoring", t);
- }
- String message = " while reading " + expectedCells + " WAL KVs; started reading at "
- + posBefore + " and read up to " + posAfterStr;
- IOException realEofEx = extractHiddenEof(ex);
- throw (EOFException) new EOFException("EOF " + message)
- .initCause(realEofEx != null ? realEofEx : ex);
- }
- if (trailerPresent && this.inputStream.getPos() > this.walEditsStopOffset) {
- LOG.error(
- "Read WALTrailer while reading WALEdits. wal: " + this.path + ", inputStream.getPos(): "
- + this.inputStream.getPos() + ", walEditsStopOffset: " + this.walEditsStopOffset);
- throw new EOFException("Read WALTrailer while reading WALEdits");
- }
- } catch (EOFException eof) {
- // If originalPosition is < 0, it is rubbish and we cannot use it (probably local fs)
- if (originalPosition < 0) {
- LOG.debug(
- "Encountered a malformed edit, but can't seek back to last good position "
- + "because originalPosition is negative. last offset={}",
- this.inputStream.getPos(), eof);
- throw eof;
- }
- // If stuck at the same place and we got an exception, lets go back at the beginning.
- if (inputStream.getPos() == originalPosition) {
- if (resetPosition) {
- LOG.debug("Encountered a malformed edit, seeking to the beginning of the WAL since "
- + "current position and original position match at {}", originalPosition);
- seekOnFs(0);
- } else {
- LOG.debug("EOF at position {}", originalPosition);
- }
- } else {
- // Else restore our position to original location in hope that next time through we will
- // read successfully.
- LOG.debug("Encountered a malformed edit, seeking back to last good position in file, "
- + "from {} to {}", inputStream.getPos(), originalPosition, eof);
- seekOnFs(originalPosition);
- }
- return false;
- }
- return true;
- }
-
- private IOException extractHiddenEof(Exception ex) {
- // There are two problems we are dealing with here. Hadoop stream throws generic exception
- // for EOF, not EOFException; and scanner further hides it inside RuntimeException.
- IOException ioEx = null;
- if (ex instanceof EOFException) {
- return (EOFException) ex;
- } else if (ex instanceof IOException) {
- ioEx = (IOException) ex;
- } else if (
- ex instanceof RuntimeException && ex.getCause() != null
- && ex.getCause() instanceof IOException
- ) {
- ioEx = (IOException) ex.getCause();
- }
- if ((ioEx != null) && (ioEx.getMessage() != null)) {
- if (ioEx.getMessage().contains("EOF")) return ioEx;
- return null;
- }
- return null;
- }
-
- /**
- * This is used to determine whether we have already reached the WALTrailer. As the size and magic
- * are at the end of the WAL file, it is possible that these two options are missing while
- * writing, so we will consider there is no trailer. And when we actually reach the WALTrailer, we
- * will try to decode it as WALKey and we will fail but the error could vary as it is parsing
- * WALTrailer actually.
- * @return whether this is a WALTrailer and we should throw EOF to upper layer the file is done
- */
- private boolean isWALTrailer(long startPosition) throws IOException {
- // We have nothing in the WALTrailer PB message now so its size is just a int length size and a
- // magic at the end
- int trailerSize = PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT;
- if (fileLength - startPosition >= trailerSize) {
- // We still have more than trailerSize bytes before reaching the EOF so this is not a trailer.
- // We also test for == here because if this is a valid trailer, we can read it while opening
- // the reader so we should not reach here
- return false;
- }
- inputStream.seek(startPosition);
- for (int i = 0; i < 4; i++) {
- int r = inputStream.read();
- if (r == -1) {
- // we have reached EOF while reading the length, and all bytes read are 0, so we assume this
- // is a partial trailer
- return true;
- }
- if (r != 0) {
- // the length is not 0, should not be a trailer
- return false;
- }
- }
- for (int i = 0; i < PB_WAL_COMPLETE_MAGIC.length; i++) {
- int r = inputStream.read();
- if (r == -1) {
- // we have reached EOF while reading the magic, and all bytes read are matched, so we assume
- // this is a partial trailer
- return true;
- }
- if (r != (PB_WAL_COMPLETE_MAGIC[i] & 0xFF)) {
- // does not match magic, should not be a trailer
- return false;
- }
- }
- // in fact we should not reach here, as this means the trailer bytes are all matched and
- // complete, then we should not call this method...
- return true;
- }
-
- @Override
- protected void seekOnFs(long pos) throws IOException {
- this.inputStream.seek(pos);
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALStreamReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALStreamReader.java
new file mode 100644
index 00000000000..626921401aa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALStreamReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * A one way stream reader for reading protobuf based WAL file.
+ */
+@InterfaceAudience.Private
+public class ProtobufWALStreamReader extends AbstractProtobufWALReader
+ implements WALStreamReader, AbstractFSWALProvider.Initializer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
+
+ @Override
+ public Entry next(Entry reuse) throws IOException {
+ long originalPosition = getPosition();
+ if (reachWALEditsStopOffset(originalPosition)) {
+ return null;
+ }
+ WALProtos.WALKey walKey;
+ try {
+ // for one way stream reader, we do not care about what is the exact position where we hit the
+ // EOF or IOE, so just use the helper method to parse WALKey, in tailing reader, we will try
+ // to read the varint size by ourselves
+ walKey = ProtobufUtil.parseDelimitedFrom(inputStream, WALProtos.WALKey.parser());
+ } catch (InvalidProtocolBufferException e) {
+ if (ProtobufUtil.isEOF(e) || isWALTrailer(originalPosition)) {
+ // InvalidProtocolBufferException.truncatedMessage, should throw EOF
+ // or we have started to read the partial WALTrailer
+ throw (EOFException) new EOFException("EOF while reading WALKey, originalPosition="
+ + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e);
+ } else {
+ // For all other type of IPBEs, it means the WAL key is broken, throw IOException out to let
+ // the upper layer know, unless we have already reached the partial WALTrailer
+ throw (IOException) new IOException("Error while reading WALKey, originalPosition="
+ + originalPosition + ", currentPosition=" + inputStream.getPos()).initCause(e);
+ }
+ }
+ Entry entry = reuse;
+ if (entry == null) {
+ entry = new Entry();
+ }
+ entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor);
+ if (!walKey.hasFollowingKvCount() || walKey.getFollowingKvCount() == 0) {
+ LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
+ inputStream.getPos());
+ return entry;
+ }
+ int expectedCells = walKey.getFollowingKvCount();
+ long posBefore = getPosition();
+ int actualCells;
+ try {
+ actualCells = entry.getEdit().readFromCells(cellDecoder, expectedCells);
+ } catch (Exception e) {
+ String message = " while reading " + expectedCells + " WAL KVs; started reading at "
+ + posBefore + " and read up to " + getPositionQuietly();
+ IOException realEofEx = extractHiddenEof(e);
+ if (realEofEx != null) {
+ throw (EOFException) new EOFException("EOF " + message).initCause(realEofEx);
+ } else {
+ // do not throw EOFException as it could be other type of errors, throwing EOF will cause
+ // the upper layer to consider the file has been fully read and cause data loss.
+ throw new IOException("Error " + message, e);
+ }
+ }
+ if (expectedCells != actualCells) {
+ throw new EOFException("Only read " + actualCells + " cells, expected " + expectedCells
+ + "; started reading at " + posBefore + " and read up to " + getPositionQuietly());
+ }
+ long posAfter = this.inputStream.getPos();
+ if (trailerPresent && posAfter > this.walEditsStopOffset) {
+ LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {},"
+ + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset);
+ throw new EOFException("Read WALTrailer while reading WALEdits; started reading at "
+ + posBefore + " and read up to " + posAfter);
+ }
+ return entry;
+ }
+
+ @Override
+ protected InputStream getCellCodecInputStream(FSDataInputStream stream) {
+ // just return the original input stream
+ return stream;
+ }
+
+ @Override
+ protected void skipTo(long position) throws IOException {
+ Entry entry = new Entry();
+ for (;;) {
+ entry = next(entry);
+ if (entry == null) {
+ throw new EOFException("Can not skip to the given position " + position
+ + " as we have already reached the end of file");
+ }
+ long pos = inputStream.getPos();
+ if (pos > position) {
+ throw new IOException("Can not skip to the given position " + position + ", stopped at "
+ + pos + " which is already beyond the give position, malformed WAL?");
+ }
+ if (pos == position) {
+ return;
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java
new file mode 100644
index 00000000000..62091acdd1d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufWALTailingReader.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.io.DelegatingInputStream;
+import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALTailingReader;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
+import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
+import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * A WAL reader for replication. It supports reset so can be used to tail a WAL file which is being
+ * written currently.
+ */
+@InterfaceAudience.Private
+public class ProtobufWALTailingReader extends AbstractProtobufWALReader
+ implements WALTailingReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProtobufWALStreamReader.class);
+
+ private DelegatingInputStream delegatingInput;
+
+ private static final class ReadWALKeyResult {
+ final State state;
+ final Entry entry;
+ final int followingKvCount;
+
+ public ReadWALKeyResult(State state, Entry entry, int followingKvCount) {
+ this.state = state;
+ this.entry = entry;
+ this.followingKvCount = followingKvCount;
+ }
+ }
+
+ private static final ReadWALKeyResult KEY_ERROR_AND_RESET =
+ new ReadWALKeyResult(State.ERROR_AND_RESET, null, 0);
+
+ private static final ReadWALKeyResult KEY_EOF_AND_RESET =
+ new ReadWALKeyResult(State.EOF_AND_RESET, null, 0);
+
+ private IOException unwrapIPBE(IOException e) {
+ if (e instanceof InvalidProtocolBufferException) {
+ return ((InvalidProtocolBufferException) e).unwrapIOException();
+ } else {
+ return e;
+ }
+ }
+
+ private ReadWALKeyResult readWALKey(long originalPosition) {
+ int firstByte;
+ try {
+ firstByte = delegatingInput.read();
+ } catch (IOException e) {
+ LOG.warn("Failed to read wal key length first byte", e);
+ return KEY_ERROR_AND_RESET;
+ }
+ if (firstByte == -1) {
+ return KEY_EOF_AND_RESET;
+ }
+ int size;
+ try {
+ size = CodedInputStream.readRawVarint32(firstByte, delegatingInput);
+ } catch (IOException e) {
+ // if we are reading a partial WALTrailer, the size will just be 0 so we will not get an
+ // exception here, so do not need to check whether it is a partial WALTrailer.
+ if (
+ e instanceof InvalidProtocolBufferException
+ && ProtobufUtil.isEOF((InvalidProtocolBufferException) e)
+ ) {
+ LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}",
+ originalPosition, getPositionQuietly(), e.toString());
+ return KEY_EOF_AND_RESET;
+ } else {
+ LOG.warn("Failed to read wal key length", e);
+ return KEY_ERROR_AND_RESET;
+ }
+ }
+ if (size < 0) {
+ LOG.warn("Negative pb message size read: {}, malformed WAL file?", size);
+ return KEY_ERROR_AND_RESET;
+ }
+ int available;
+ try {
+ available = delegatingInput.available();
+ } catch (IOException e) {
+ LOG.warn("Failed to get available bytes", e);
+ return KEY_ERROR_AND_RESET;
+ }
+ if (available > 0 && available < size) {
+ LOG.info(
+ "Available stream not enough for edit, available={}, " + "entry size={} at offset={}",
+ available, size, getPositionQuietly());
+ return KEY_EOF_AND_RESET;
+ }
+ WALProtos.WALKey walKey;
+ try {
+ if (available > 0) {
+ walKey = WALProtos.WALKey.parseFrom(ByteStreams.limit(delegatingInput, size));
+ } else {
+ byte[] content = new byte[size];
+ ByteStreams.readFully(delegatingInput, content);
+ walKey = WALProtos.WALKey.parseFrom(content);
+ }
+ } catch (IOException e) {
+ e = unwrapIPBE(e);
+ if (
+ e instanceof EOFException || (e instanceof InvalidProtocolBufferException
+ && ProtobufUtil.isEOF((InvalidProtocolBufferException) e))
+ ) {
+ LOG.info("EOF while reading WALKey, originalPosition={}, currentPosition={}, error={}",
+ originalPosition, getPositionQuietly(), e.toString());
+ return KEY_EOF_AND_RESET;
+ } else {
+ boolean isWALTrailer;
+ try {
+ isWALTrailer = isWALTrailer(originalPosition);
+ } catch (IOException ioe) {
+ LOG.warn("Error while testing whether this is a partial WAL trailer, originalPosition={},"
+ + " currentPosition={}", originalPosition, getPositionQuietly(), e);
+ return KEY_ERROR_AND_RESET;
+ }
+ if (isWALTrailer) {
+ LOG.info("Reached partial WAL Trailer(EOF) while reading WALKey, originalPosition={},"
+ + " currentPosition={}", originalPosition, getPositionQuietly(), e);
+ return KEY_EOF_AND_RESET;
+ } else {
+ // for all other type of IPBEs or IOEs, it means the WAL key is broken
+ LOG.warn("Error while reading WALKey, originalPosition={}, currentPosition={}",
+ originalPosition, getPositionQuietly(), e);
+ return KEY_ERROR_AND_RESET;
+ }
+ }
+ }
+ Entry entry = new Entry();
+ try {
+ entry.getKey().readFieldsFromPb(walKey, byteStringUncompressor);
+ } catch (IOException e) {
+ LOG.warn("Failed to read wal key fields from pb message", e);
+ return KEY_ERROR_AND_RESET;
+ }
+ return new ReadWALKeyResult(State.NORMAL, entry,
+ walKey.hasFollowingKvCount() ? walKey.getFollowingKvCount() : 0);
+ }
+
+ private Result editEof() {
+ return hasCompression
+ ? State.EOF_AND_RESET_COMPRESSION.getResult()
+ : State.EOF_AND_RESET.getResult();
+ }
+
+ private Result editError() {
+ return hasCompression
+ ? State.ERROR_AND_RESET_COMPRESSION.getResult()
+ : State.ERROR_AND_RESET.getResult();
+ }
+
+ private Result readWALEdit(Entry entry, int followingKvCount) {
+ long posBefore;
+ try {
+ posBefore = inputStream.getPos();
+ } catch (IOException e) {
+ LOG.warn("failed to get position", e);
+ return State.ERROR_AND_RESET.getResult();
+ }
+ if (followingKvCount == 0) {
+ LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
+ posBefore);
+ return new Result(State.NORMAL, entry, posBefore);
+ }
+ int actualCells;
+ try {
+ actualCells = entry.getEdit().readFromCells(cellDecoder, followingKvCount);
+ } catch (Exception e) {
+ String message = " while reading " + followingKvCount + " WAL KVs; started reading at "
+ + posBefore + " and read up to " + getPositionQuietly();
+ IOException realEofEx = extractHiddenEof(e);
+ if (realEofEx != null) {
+ LOG.warn("EOF " + message, realEofEx);
+ return editEof();
+ } else {
+ LOG.warn("Error " + message, e);
+ return editError();
+ }
+ }
+ if (actualCells != followingKvCount) {
+ LOG.warn("Only read {} cells, expected {}; started reading at {} and read up to {}",
+ actualCells, followingKvCount, posBefore, getPositionQuietly());
+ return editEof();
+ }
+ long posAfter;
+ try {
+ posAfter = inputStream.getPos();
+ } catch (IOException e) {
+ LOG.warn("failed to get position", e);
+ return editError();
+ }
+ if (trailerPresent && posAfter > this.walEditsStopOffset) {
+ LOG.error("Read WALTrailer while reading WALEdits. wal: {}, inputStream.getPos(): {},"
+ + " walEditsStopOffset: {}", path, posAfter, walEditsStopOffset);
+ return editEof();
+ }
+ return new Result(State.NORMAL, entry, posAfter);
+ }
+
+ @Override
+ public Result next(long limit) {
+ long originalPosition;
+ try {
+ originalPosition = inputStream.getPos();
+ } catch (IOException e) {
+ LOG.warn("failed to get position", e);
+ return State.EOF_AND_RESET.getResult();
+ }
+ if (reachWALEditsStopOffset(originalPosition)) {
+ return State.EOF_WITH_TRAILER.getResult();
+ }
+ if (limit < 0) {
+ // should be closed WAL file, set to no limit, i.e, just use the original inputStream
+ delegatingInput.setDelegate(inputStream);
+ } else if (limit <= originalPosition) {
+ // no data available, just return EOF
+ return State.EOF_AND_RESET.getResult();
+ } else {
+ // calculate the remaining bytes we can read and set
+ delegatingInput.setDelegate(ByteStreams.limit(inputStream, limit - originalPosition));
+ }
+ ReadWALKeyResult readKeyResult = readWALKey(originalPosition);
+ if (readKeyResult.state != State.NORMAL) {
+ return readKeyResult.state.getResult();
+ }
+ return readWALEdit(readKeyResult.entry, readKeyResult.followingKvCount);
+ }
+
+ private void skipHeader(FSDataInputStream stream) throws IOException {
+ stream.seek(PB_WAL_MAGIC.length);
+ int headerLength = StreamUtils.readRawVarint32(stream);
+ stream.seek(stream.getPos() + headerLength);
+ }
+
+ @Override
+ public void resetTo(long position, boolean resetCompression) throws IOException {
+ close();
+ Pair<FSDataInputStream, FileStatus> pair = open();
+ boolean resetSucceed = false;
+ try {
+ if (!trailerPresent) {
+ // try read trailer this time
+ readTrailer(pair.getFirst(), pair.getSecond());
+ }
+ inputStream = pair.getFirst();
+ delegatingInput.setDelegate(inputStream);
+ if (position < 0) {
+ // read from the beginning
+ if (compressionCtx != null) {
+ compressionCtx.clear();
+ }
+ skipHeader(inputStream);
+ } else if (resetCompression && compressionCtx != null) {
+ // clear compressCtx and skip to the expected position, to fill up the dictionary
+ compressionCtx.clear();
+ skipHeader(inputStream);
+ if (position != inputStream.getPos()) {
+ skipTo(position);
+ }
+ } else {
+ // just seek to the expected position
+ inputStream.seek(position);
+ }
+ resetSucceed = true;
+ } finally {
+ if (!resetSucceed) {
+ // close the input stream to avoid resource leak
+ close();
+ }
+ }
+ }
+
+ @Override
+ protected InputStream getCellCodecInputStream(FSDataInputStream stream) {
+ delegatingInput = new DelegatingInputStream(stream);
+ return delegatingInput;
+ }
+
+ @Override
+ protected void skipTo(long position) throws IOException {
+ for (;;) {
+ Result result = next(-1);
+ if (result.getState() != State.NORMAL) {
+ throw new IOException("Can not skip to the given position " + position + ", stopped at "
+ + result.getEntryEndPos() + " which is still before the give position");
+ }
+ if (result.getEntryEndPos() == position) {
+ return;
+ }
+ if (result.getEntryEndPos() > position) {
+ throw new IOException("Can not skip to the given position " + position + ", stopped at "
+ + result.getEntryEndPos() + " which is already beyond the give position, malformed WAL?");
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
deleted file mode 100644
index 5caceeac09b..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.util.LRUDictionary;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
-public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
- private static final Logger LOG = LoggerFactory.getLogger(ReaderBase.class);
- protected Configuration conf;
- protected FileSystem fs;
- protected Path path;
- protected long edit = 0;
- protected long fileLength;
- /**
- * Compression context to use reading. Can be null if no compression.
- */
- protected CompressionContext compressionContext = null;
- private boolean emptyCompressionContext = true;
-
- /**
- * Default constructor.
- */
- public ReaderBase() {
- }
-
- @Override
- public void init(FileSystem fs, Path path, Configuration conf, FSDataInputStream stream)
- throws IOException {
- this.conf = conf;
- this.path = path;
- this.fs = fs;
- this.fileLength = this.fs.getFileStatus(path).getLen();
- String cellCodecClsName = initReader(stream);
-
- boolean compression = hasCompression();
- if (compression) {
- // If compression is enabled, new dictionaries are created here.
- try {
- if (compressionContext == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Initializing compression context for {}: isRecoveredEdits={}"
- + ", hasTagCompression={}, hasValueCompression={}, valueCompressionType={}",
- path, CommonFSUtils.isRecoveredEdits(path), hasTagCompression(),
- hasValueCompression(), getValueCompressionAlgorithm());
- }
- compressionContext =
- new CompressionContext(LRUDictionary.class, CommonFSUtils.isRecoveredEdits(path),
- hasTagCompression(), hasValueCompression(), getValueCompressionAlgorithm());
- } else {
- compressionContext.clear();
- }
- } catch (Exception e) {
- throw new IOException("Failed to initialize CompressionContext", e);
- }
- }
- initAfterCompression(cellCodecClsName);
- }
-
- @Override
- public Entry next() throws IOException {
- return next(null);
- }
-
- @Override
- public Entry next(Entry reuse) throws IOException {
- Entry e = reuse;
- if (e == null) {
- e = new Entry();
- }
-
- boolean hasEntry = false;
- try {
- hasEntry = readNext(e);
- } catch (IllegalArgumentException iae) {
- TableName tableName = e.getKey().getTableName();
- if (tableName != null && tableName.equals(TableName.OLD_ROOT_TABLE_NAME)) {
- // It is old ROOT table edit, ignore it
- LOG.info("Got an old ROOT edit, ignoring ");
- return next(e);
- } else throw iae;
- }
- edit++;
- if (compressionContext != null && emptyCompressionContext) {
- emptyCompressionContext = false;
- }
- return hasEntry ? e : null;
- }
-
- @Override
- public void seek(long pos) throws IOException {
- if (compressionContext != null && emptyCompressionContext) {
- while (next() != null) {
- if (getPosition() == pos) {
- emptyCompressionContext = false;
- break;
- }
- }
- }
- seekOnFs(pos);
- }
-
- /**
- * Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext}
- * to true, so when seeking, we will try to skip to the position and reconstruct the dictionary.
- */
- protected final void resetCompression() {
- if (compressionContext != null) {
- compressionContext.clear();
- emptyCompressionContext = true;
- }
- }
-
- /**
- * Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
- * the stream if not null and may use it. Called once.
- * @return the class name of cell Codec, null if such information is not available
- */
- protected abstract String initReader(FSDataInputStream stream) throws IOException;
-
- /**
- * Initializes the compression after the shared stuff has been initialized. Called once.
- */
- protected abstract void initAfterCompression() throws IOException;
-
- /**
- * Initializes the compression after the shared stuff has been initialized. Called once.
- * @param cellCodecClsName class name of cell Codec
- */
- protected abstract void initAfterCompression(String cellCodecClsName) throws IOException;
-
- /** Returns Whether compression is enabled for this log. */
- protected abstract boolean hasCompression();
-
- /** Returns Whether tag compression is enabled for this log. */
- protected abstract boolean hasTagCompression();
-
- /** Returns Whether value compression is enabled for this log. */
- protected abstract boolean hasValueCompression();
-
- /** Returns Value compression algorithm for this log. */
- protected abstract Compression.Algorithm getValueCompressionAlgorithm();
-
- /**
- * Read next entry.
- * @param e The entry to read into.
- * @return Whether there was anything to read.
- */
- protected abstract boolean readNext(Entry e) throws IOException;
-
- /**
- * Performs a filesystem-level seek to a certain position in an underlying file.
- */
- protected abstract void seekOnFs(long pos) throws IOException;
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
deleted file mode 100644
index 863739c72f2..00000000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.wal;
-
-import java.io.IOException;
-import java.security.Key;
-import java.security.KeyException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.io.crypto.Cipher;
-import org.apache.hadoop.hbase.io.crypto.Decryptor;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
-import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.EncryptionTest;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
-
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class SecureProtobufLogReader extends ProtobufLogReader {
-
- private static final Logger LOG = LoggerFactory.getLogger(SecureProtobufLogReader.class);
-
- private Decryptor decryptor = null;
- private static List<String> writerClsNames = new ArrayList<>();
- static {
- writerClsNames.add(ProtobufLogWriter.class.getSimpleName());
- writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName());
- writerClsNames.add(AsyncProtobufLogWriter.class.getSimpleName());
- writerClsNames.add(SecureAsyncProtobufLogWriter.class.getSimpleName());
- }
-
- @Override
- public List<String> getWriterClsNames() {
- return writerClsNames;
- }
-
- @Override
- protected WALHdrContext readHeader(WALHeader.Builder builder, FSDataInputStream stream)
- throws IOException {
- WALHdrContext hdrCtxt = super.readHeader(builder, stream);
- WALHdrResult result = hdrCtxt.getResult();
- // We need to unconditionally handle the case where the WAL has a key in
- // the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is
- // no longer set in the site configuration.
- if (result == WALHdrResult.SUCCESS && builder.hasEncryptionKey()) {
- // Serialized header data has been merged into the builder from the
- // stream.
-
- EncryptionTest.testKeyProvider(conf);
- EncryptionTest.testCipherProvider(conf);
-
- // Retrieve a usable key
-
- byte[] keyBytes = builder.getEncryptionKey().toByteArray();
- Key key = null;
- String walKeyName = conf.get(HConstants.CRYPTO_WAL_KEY_NAME_CONF_KEY);
- // First try the WAL key, if one is configured
- if (walKeyName != null) {
- try {
- key = EncryptionUtil.unwrapWALKey(conf, walKeyName, keyBytes);
- } catch (KeyException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unable to unwrap key with WAL key '" + walKeyName + "'");
- }
- key = null;
- }
- }
- if (key == null) {
- String masterKeyName =
- conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName());
- try {
- // Then, try the cluster master key
- key = EncryptionUtil.unwrapWALKey(conf, masterKeyName, keyBytes);
- } catch (KeyException e) {
- // If the current master key fails to unwrap, try the alternate, if
- // one is configured
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
- }
- String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
- if (alternateKeyName != null) {
- try {
- key = EncryptionUtil.unwrapWALKey(conf, alternateKeyName, keyBytes);
- } catch (KeyException ex) {
- throw new IOException(ex);
- }
- } else {
- throw new IOException(e);
- }
- }
- }
-
- // Use the algorithm the key wants
-
- Cipher cipher = Encryption.getCipher(conf, key.getAlgorithm());
- if (cipher == null) {
- throw new IOException("Cipher '" + key.getAlgorithm() + "' is not available");
- }
-
- // Set up the decryptor for this WAL
-
- decryptor = cipher.getDecryptor();
- decryptor.setKey(key);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName());
- }
- }
-
- return hdrCtxt;
- }
-
- @Override
- protected void initAfterCompression(String cellCodecClsName) throws IOException {
- if (decryptor != null && cellCodecClsName.equals(SecureWALCellCodec.class.getName())) {
- WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor);
- this.cellDecoder = codec.getDecoder(this.inputStream);
- // We do not support compression with WAL encryption
- this.compressionContext = null;
- this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
- this.hasCompression = false;
- } else {
- super.initAfterCompression(cellCodecClsName);
- }
- }
-
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALHeaderEOFException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALHeaderEOFException.java
new file mode 100644
index 00000000000..d88e89ab7b3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALHeaderEOFException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A special EOFException to indicate that the EOF happens when we read the header of a WAL file.
+ * <p/>
+ * This usually means the WAL file just contains nothing and we are safe to skip over it.
+ */
+@InterfaceAudience.Private
+public class WALHeaderEOFException extends EOFException {
+
+ private static final long serialVersionUID = -4544368452826740759L;
+
+ public WALHeaderEOFException() {
+ }
+
+ public WALHeaderEOFException(String s) {
+ super(s);
+ }
+
+ public WALHeaderEOFException(String s, Throwable cause) {
+ super(s);
+ initCause(cause);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index d6351ea0eab..4e1d76a9764 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -17,13 +17,11 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@@ -35,7 +33,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -75,7 +72,6 @@ class ReplicationSourceWALReader extends Thread {
private long currentPosition;
private final long sleepForRetries;
private final int maxRetriesMultiplier;
- private final boolean eofAutoRecovery;
// Indicates whether this particular worker is running
private boolean isReaderRunning = true;
@@ -115,7 +111,6 @@ class ReplicationSourceWALReader extends Thread {
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
// 5 minutes @ 1 sec per
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
- this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.walGroupId = walGroupId;
LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : "
@@ -124,14 +119,30 @@ class ReplicationSourceWALReader extends Thread {
+ ", replicationBatchQueueCapacity=" + batchCount);
}
+ private void replicationDone() throws InterruptedException {
+ // we're done with current queue, either this is a recovered queue, or it is the special
+ // group for a sync replication peer and the peer has been transited to DA or S state.
+ LOG.debug("Stopping the replication source wal reader");
+ setReaderRunning(false);
+ // shuts down shipper thread immediately
+ entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
+ }
+
+ protected final int sleep(int sleepMultiplier) {
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ sleepMultiplier++;
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
+ return sleepMultiplier;
+ }
+
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
WALEntryBatch batch = null;
- try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(),
- source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId)) {
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
+ source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
batch = null;
if (!source.isPeerEnabled()) {
@@ -141,34 +152,47 @@ class ReplicationSourceWALReader extends Thread {
if (!checkQuota()) {
continue;
}
- batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
- if (batch == null) {
- // got no entries and didn't advance position in WAL
- handleEmptyWALEntryBatch();
- entryStream.reset(); // reuse stream
+ Path currentPath = entryStream.getCurrentPath();
+ WALEntryStream.HasNext hasNext = entryStream.hasNext();
+ if (hasNext == WALEntryStream.HasNext.NO) {
+ replicationDone();
+ return;
+ }
+ // first, check if we have switched a file, if so, we need to manually add an EOF entry
+ // batch to the queue
+ if (currentPath != null && switched(entryStream, currentPath)) {
+ entryBatchQueue.put(WALEntryBatch.endOfFile(currentPath));
+ continue;
+ }
+ if (hasNext == WALEntryStream.HasNext.RETRY) {
+ // sleep and retry
+ sleepMultiplier = sleep(sleepMultiplier);
continue;
}
- // if we have already switched a file, skip reading and put it directly to the ship queue
- if (!batch.isEndOfFile()) {
- readWALEntries(entryStream, batch);
- currentPosition = entryStream.getPosition();
+ if (hasNext == WALEntryStream.HasNext.RETRY_IMMEDIATELY) {
+ // retry immediately, this usually means we have switched a file
+ continue;
}
+ // below are all for hasNext == YES
+ batch = createBatch(entryStream);
+ readWALEntries(entryStream, batch);
+ currentPosition = entryStream.getPosition();
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
}
- } catch (WALEntryFilterRetryableException | IOException e) { // stream related
- if (!handleEofException(e, batch)) {
- LOG.warn("Failed to read stream of replication entries", e);
- if (sleepMultiplier < maxRetriesMultiplier) {
- sleepMultiplier++;
- }
- Threads.sleep(sleepForRetries * sleepMultiplier);
- }
+ } catch (WALEntryFilterRetryableException e) {
+ // here we have to recreate the WALEntryStream, as when filtering, we have already called
+ // next to get the WAL entry and advanced the WALEntryStream, at WALEntryStream layer, it
+ // just considers everything is fine,that's why the catch block is not in the inner block
+ LOG.warn("Failed to filter WAL entries and the filter let us retry later", e);
+ sleepMultiplier = sleep(sleepMultiplier);
} catch (InterruptedException e) {
- LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue");
+ // this usually means we want to quit
+ LOG.warn("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue",
+ e);
Thread.currentThread().interrupt();
}
}
@@ -204,7 +228,7 @@ class ReplicationSourceWALReader extends Thread {
// This is required in case there is any exception in while reading entries
// we do not want to loss the existing entries in the batch
protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
- throws IOException, InterruptedException {
+ throws InterruptedException {
Path currentPath = entryStream.getCurrentPath();
for (;;) {
Entry entry = entryStream.next();
@@ -215,111 +239,22 @@ class ReplicationSourceWALReader extends Thread {
break;
}
}
- boolean hasNext = entryStream.hasNext();
+ WALEntryStream.HasNext hasNext = entryStream.hasNext();
// always return if we have switched to a new file
if (switched(entryStream, currentPath)) {
batch.setEndOfFile(true);
break;
}
- if (!hasNext) {
+ if (hasNext != WALEntryStream.HasNext.YES) {
+ // For hasNext other than YES, it is OK to just retry.
+ // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
+ // return NO again when you call the method next time, so it is OK to just return here and
+ // let the loop in the upper layer to call hasNext again.
break;
}
}
}
- private void handleEmptyWALEntryBatch() throws InterruptedException {
- LOG.trace("Didn't read any new entries from WAL");
- if (logQueue.getQueue(walGroupId).isEmpty()) {
- // we're done with current queue, either this is a recovered queue, or it is the special group
- // for a sync replication peer and the peer has been transited to DA or S state.
- LOG.debug("Stopping the replication source wal reader");
- setReaderRunning(false);
- // shuts down shipper thread immediately
- entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
- } else {
- Thread.sleep(sleepForRetries);
- }
- }
-
- private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream)
- throws IOException {
- Path currentPath = entryStream.getCurrentPath();
- if (!entryStream.hasNext()) {
- // check whether we have switched a file
- if (currentPath != null && switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
- } else {
- return null;
- }
- }
- if (currentPath != null) {
- if (switched(entryStream, currentPath)) {
- return WALEntryBatch.endOfFile(currentPath);
- }
- }
- return createBatch(entryStream);
- }
-
- /**
- * This is to handle the EOFException from the WAL entry stream. EOFException should be handled
- * carefully because there are chances of data loss because of never replicating the data. Thus we
- * should always try to ship existing batch of entries here. If there was only one log in the
- * queue before EOF, we ship the empty batch here and since reader is still active, in the next
- * iteration of reader we will stop the reader.
- * <p/>
- * If there was more than one log in the queue before EOF, we ship the existing batch and reset
- * the wal patch and position to the log with EOF, so shipper can remove logs from replication
- * queue
- * @return true only the IOE can be handled
- */
- private boolean handleEofException(Exception e, WALEntryBatch batch) {
- PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
- // Dump the log even if logQueue size is 1 if the source is from recovered Source
- // since we don't add current log to recovered source queue so it is safe to remove.
- if (
- (e instanceof EOFException || e.getCause() instanceof EOFException)
- && (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery
- ) {
- Path path = queue.peek();
- try {
- if (!fs.exists(path)) {
- // There is a chance that wal has moved to oldWALs directory, so look there also.
- path = AbstractFSWALProvider.findArchivedLog(path, conf);
- // path can be null if unable to locate in archiveDir.
- }
- if (path != null && fs.getFileStatus(path).getLen() == 0) {
- LOG.warn("Forcing removal of 0 length log in queue: {}", path);
- logQueue.remove(walGroupId);
- currentPosition = 0;
- if (batch != null) {
- // After we removed the WAL from the queue, we should try shipping the existing batch of
- // entries
- addBatchToShippingQueue(batch);
- }
- return true;
- }
- } catch (IOException ioe) {
- LOG.warn("Couldn't get file length information about log " + path, ioe);
- } catch (InterruptedException ie) {
- LOG.trace("Interrupted while adding WAL batch to ship queue");
- Thread.currentThread().interrupt();
- }
- }
- return false;
- }
-
- /**
- * Update the batch try to ship and return true if shipped
- * @param batch Batch of entries to ship
- * @throws InterruptedException throws interrupted exception
- */
- private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException {
- // need to propagate the batch even it has no entries since it may carry the last
- // sequence id information for serial replication.
- LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
- entryBatchQueue.put(batch);
- }
-
public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 1a8bbf74a2c..41d95df2821 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* WAL reader for a serial replication peer.
@@ -33,6 +35,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader {
+ private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationSourceWALReader.class);
+
// used to store the first cell in an entry before filtering. This is because that if serial
// replication is enabled, we may find out that an entry can not be pushed after filtering. And
// when we try the next time, the cells maybe null since the entry has already been filtered,
@@ -51,7 +55,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
@Override
protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
- throws IOException, InterruptedException {
+ throws InterruptedException {
Path currentPath = entryStream.getCurrentPath();
long positionBefore = entryStream.getPosition();
for (;;) {
@@ -75,13 +79,23 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
entry = filterEntry(entry);
}
if (entry != null) {
- if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
+ int sleepMultiplier = 1;
+ try {
+ if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) {
+ if (batch.getLastWalPosition() > positionBefore) {
+ // we have something that can push, break
+ break;
+ } else {
+ checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("failed to check whether we can push the WAL entries", e);
if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
break;
- } else {
- checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering);
}
+ sleepMultiplier = sleep(sleepMultiplier);
}
// arrive here means we can push the entry, record the last sequence id
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
@@ -95,20 +109,23 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
// actually remove the entry.
removeEntryFromStream(entryStream, batch);
}
- boolean hasNext = entryStream.hasNext();
+ WALEntryStream.HasNext hasNext = entryStream.hasNext();
// always return if we have switched to a new file.
if (switched(entryStream, currentPath)) {
batch.setEndOfFile(true);
break;
}
- if (!hasNext) {
+ if (hasNext != WALEntryStream.HasNext.YES) {
+ // For hasNext other than YES, it is OK to just retry.
+ // As for RETRY and RETRY_IMMEDIATELY, the correct action is to retry, and for NO, it will
+ // return NO again when you call the method next time, so it is OK to just return here and
+ // let the loop in the upper layer to call hasNext again.
break;
}
}
}
- private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)
- throws IOException {
+ private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) {
entryStream.next();
firstCellInEntryBeforeFiltering = null;
batch.setLastWalPosition(entryStream.getPosition());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index aa059aa30a2..d95d42f2f30 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -26,17 +26,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
+import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
+import org.apache.hadoop.hbase.wal.WALTailingReader;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -52,7 +50,8 @@ import org.slf4j.LoggerFactory;
class WALEntryStream implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
- private Reader reader;
+ private WALTailingReader reader;
+ private WALTailingReader.State state;
private Path currentPath;
// cache of next entry for hasNext()
private Entry currentEntry;
@@ -67,56 +66,108 @@ class WALEntryStream implements Closeable {
private final FileSystem fs;
private final Configuration conf;
private final WALFileLengthProvider walFileLengthProvider;
- // which region server the WALs belong to
- private final ServerName serverName;
private final MetricsSource metrics;
+ // we should be able to skip empty WAL files, but for safety, we still provide this config
+ // see HBASE-18137 for more details
+ private boolean eofAutoRecovery;
+
/**
* Create an entry stream over the given queue at the given start position
* @param logQueue the queue of WAL paths
- * @param conf the {@link Configuration} to use to create {@link Reader} for this
- * stream
+ * @param conf the {@link Configuration} to use to create {@link WALStreamReader}
+ * for this stream
* @param startPosition the position in the first WAL to start reading at
* @param walFileLengthProvider provides the length of the WAL file
* @param serverName the server name which all WALs belong to
* @param metrics the replication metrics
- * @throws IOException throw IO exception from stream
*/
- public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf, long startPosition,
- WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics,
- String walGroupId) throws IOException {
+ public WALEntryStream(ReplicationSourceLogQueue logQueue, FileSystem fs, Configuration conf,
+ long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics,
+ String walGroupId) {
this.logQueue = logQueue;
- this.fs = CommonFSUtils.getWALFileSystem(conf);
+ this.fs = fs;
this.conf = conf;
this.currentPositionOfEntry = startPosition;
this.walFileLengthProvider = walFileLengthProvider;
- this.serverName = serverName;
this.metrics = metrics;
this.walGroupId = walGroupId;
+ this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
+ }
+
+ public enum HasNext {
+ /** means there is a new entry and you could use peek or next to get current entry */
+ YES,
+ /**
+ * means there are something wrong or we have reached EOF of the current file but it is not
+ * closed yet and there is no new file in the replication queue yet, you should sleep a while
+ * and try to call hasNext again
+ */
+ RETRY,
+ /**
+ * Usually this means we have finished reading a WAL file, and for simplify the implementation
+ * of this class, we just let the upper layer issue a new hasNext call again to open the next
+ * WAL file.
+ */
+ RETRY_IMMEDIATELY,
+ /**
+ * means there is no new entry and stream is end, the upper layer should close this stream and
+ * release other resources as well
+ */
+ NO
}
- /** Returns true if there is another WAL {@link Entry} */
- public boolean hasNext() throws IOException {
+ /**
+ * Try advance the stream if there is no entry yet. See the javadoc for {@link HasNext} for more
+ * details about the meanings of the return values.
+ * <p/>
+ * You can call {@link #peek()} or {@link #next()} to get the actual {@link Entry} if this method
+ * returns {@link HasNext#YES}.
+ */
+ public HasNext hasNext() {
if (currentEntry == null) {
- tryAdvanceEntry();
+ return tryAdvanceEntry();
+ } else {
+ return HasNext.YES;
}
- return currentEntry != null;
}
/**
* Returns the next WAL entry in this stream but does not advance.
+ * <p/>
+ * Must call {@link #hasNext()} first before calling this method, and if you have already called
+ * {@link #next()} to consume the current entry, you need to call {@link #hasNext()} again to
+ * advance the stream before calling this method again, otherwise it will always return
+ * {@code null}
+ * <p/>
+ * The reason here is that, we need to use the return value of {@link #hasNext()} to tell upper
+ * layer to retry or not, so we can not wrap the {@link #hasNext()} call inside {@link #peek()} or
+ * {@link #next()} as they have their own return value.
+ * @see #hasNext()
+ * @see #next()
*/
- public Entry peek() throws IOException {
- return hasNext() ? currentEntry : null;
+ public Entry peek() {
+ return currentEntry;
}
/**
- * Returns the next WAL entry in this stream and advance the stream.
+ * Returns the next WAL entry in this stream and advance the stream. Will throw
+ * {@link IllegalStateException} if you do not call {@link #hasNext()} before calling this method.
+ * Please see the javadoc of {@link #peek()} method to see why we need this.
+ * @throws IllegalStateException Every time you want to call this method, please call
+ * {@link #hasNext()} first, otherwise a
+ * {@link IllegalStateException} will be thrown.
+ * @see #hasNext()
+ * @see #peek()
*/
- public Entry next() throws IOException {
+ public Entry next() {
+ if (currentEntry == null) {
+ throw new IllegalStateException("Call hasNext first");
+ }
Entry save = peek();
currentPositionOfEntry = currentPositionOfReader;
currentEntry = null;
+ state = null;
return save;
}
@@ -124,7 +175,7 @@ class WALEntryStream implements Closeable {
* {@inheritDoc}
*/
@Override
- public void close() throws IOException {
+ public void close() {
closeReader();
}
@@ -149,62 +200,171 @@ class WALEntryStream implements Closeable {
return sb.toString();
}
- /**
- * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
- * false)
- */
- public void reset() throws IOException {
- if (reader != null && currentPath != null) {
- resetReader();
- }
- }
-
- private void setPosition(long position) {
- currentPositionOfEntry = position;
- }
-
private void setCurrentPath(Path path) {
this.currentPath = path;
}
- private void tryAdvanceEntry() throws IOException {
- if (checkReader()) {
- boolean beingWritten = readNextEntryAndRecordReaderPosition();
- LOG.trace("Reading WAL {}; currently open for write={}", this.currentPath, beingWritten);
- if (currentEntry == null && !beingWritten) {
- // no more entries in this log file, and the file is already closed, i.e, rolled
- // Before dequeueing, we should always get one more attempt at reading.
- // This is in case more entries came in after we opened the reader, and the log is rolled
- // while we were reading. See HBASE-6758
- resetReader();
- readNextEntryAndRecordReaderPosition();
- if (currentEntry == null) {
- if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
+ justification = "HDFS-4380")
+ private HasNext tryAdvanceEntry() {
+ if (reader == null) {
+ // try open next WAL file
+ PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
+ Path nextPath = queue.peek();
+ if (nextPath != null) {
+ setCurrentPath(nextPath);
+ // we need to test this prior to create the reader. If not, it is possible that, while
+ // opening the file, the file is still being written so its header is incomplete and we get
+ // a header EOF, but then while we test whether it is still being written, we have already
+ // flushed the data out and we consider it is not being written, and then we just skip over
+ // file, then we will lose the data written after opening...
+ boolean beingWritten =
+ walFileLengthProvider.getLogFileSizeIfBeingWritten(nextPath).isPresent();
+ try {
+ reader = WALFactory.createTailingReader(fs, nextPath, conf,
+ currentPositionOfEntry > 0 ? currentPositionOfEntry : -1);
+ } catch (WALHeaderEOFException e) {
+ if (!eofAutoRecovery) {
+ // if we do not enable EOF auto recovery, just let the upper layer retry
+ // the replication will be stuck usually, and need to be fixed manually
+ return HasNext.RETRY;
+ }
+ // we hit EOF while reading the WAL header, usually this means we can just skip over this
+ // file, but we need to be careful that whether this file is still being written, if so we
+ // should retry instead of skipping.
+ LOG.warn("EOF while trying to open WAL reader for path: {}", nextPath, e);
+ if (beingWritten) {
+ // just retry as the file is still being written, maybe next time we could read
+ // something
+ return HasNext.RETRY;
+ } else {
+ // the file is not being written so we are safe to just skip over it
dequeueCurrentLog();
- if (openNextLog()) {
- readNextEntryAndRecordReaderPosition();
+ return HasNext.RETRY_IMMEDIATELY;
+ }
+ } catch (LeaseNotRecoveredException e) {
+ // HBASE-15019 the WAL was not closed due to some hiccup.
+ LOG.warn("Try to recover the WAL lease " + nextPath, e);
+ AbstractFSWALProvider.recoverLease(conf, nextPath);
+ return HasNext.RETRY;
+ } catch (IOException | NullPointerException e) {
+ // For why we need to catch NPE here, see HDFS-4380 for more details
+ LOG.warn("Failed to open WAL reader for path: {}", nextPath, e);
+ return HasNext.RETRY;
+ }
+ } else {
+ // no more files in queue, this could happen for recovered queue, or for a wal group of a
+ // sync replication peer which has already been transited to DA or S.
+ setCurrentPath(null);
+ return HasNext.NO;
+ }
+ } else if (state != null && state != WALTailingReader.State.NORMAL) {
+ // reset before reading
+ try {
+ if (currentPositionOfEntry > 0) {
+ reader.resetTo(currentPositionOfEntry, state.resetCompression());
+ } else {
+ // we will read from the beginning so we should always clear the compression context
+ reader.resetTo(-1, true);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+ currentPositionOfEntry, state.resetCompression(), e);
+ // just leave the state as is, and try resetting next time
+ return HasNext.RETRY;
+ }
+ }
+
+ Pair<WALTailingReader.State, Boolean> pair = readNextEntryAndRecordReaderPosition();
+ state = pair.getFirst();
+ boolean beingWritten = pair.getSecond();
+ LOG.trace("Reading WAL {}; result={}, currently open for write={}", this.currentPath, state,
+ beingWritten);
+ switch (state) {
+ case NORMAL:
+ // everything is fine, just return
+ return HasNext.YES;
+ case EOF_WITH_TRAILER:
+ // we have reached the trailer, which means this WAL file has been closed cleanly and we
+ // have finished reading it successfully, just move to the next WAL file and let the upper
+ // layer start reading the next WAL file
+ dequeueCurrentLog();
+ return HasNext.RETRY_IMMEDIATELY;
+ case EOF_AND_RESET:
+ case EOF_AND_RESET_COMPRESSION:
+ if (!beingWritten) {
+ // no more entries in this log file, and the file is already closed, i.e, rolled
+ // Before dequeuing, we should always get one more attempt at reading.
+ // This is in case more entries came in after we opened the reader, and the log is rolled
+ // while we were reading. See HBASE-6758
+ try {
+ reader.resetTo(currentPositionOfEntry, state.resetCompression());
+ } catch (IOException e) {
+ LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
+ currentPositionOfEntry, state.resetCompression(), e);
+ // just leave the state as is, next time we will try to reset it again, but there is a
+ // nasty problem is that, we will still reach here finally and try reset again to see if
+ // the log has been fully replicated, which is redundant, can be optimized later
+ return HasNext.RETRY;
+ }
+ Pair<WALTailingReader.State, Boolean> p = readNextEntryAndRecordReaderPosition();
+ state = pair.getFirst();
+ // should not be written
+ assert !p.getSecond();
+ if (state.eof()) {
+ if (checkAllBytesParsed()) {
+ // move to the next wal file and read
+ dequeueCurrentLog();
+ return HasNext.RETRY_IMMEDIATELY;
+ } else {
+ // see HBASE-15983, if checkAllBytesParsed returns false, we need to try read from
+ // beginning again. Here we set position to 0 and state to ERROR_AND_RESET_COMPRESSION
+ // so when calling tryAdvanceENtry next time we will reset the reader to the beginning
+ // and read.
+ currentPositionOfEntry = 0;
+ currentPositionOfReader = 0;
+ state = WALTailingReader.State.ERROR_AND_RESET_COMPRESSION;
+ return HasNext.RETRY;
}
}
+ } else {
+ // just sleep a bit and retry to see if there are new entries coming since the file is
+ // still being written
+ return HasNext.RETRY;
}
+ case ERROR_AND_RESET:
+ case ERROR_AND_RESET_COMPRESSION:
+ // we have meet an error, just sleep a bit and retry again
+ return HasNext.RETRY;
+ default:
+ throw new IllegalArgumentException("Unknown read next result: " + state);
+ }
+ }
+
+ private FileStatus getCurrentPathFileStatus() throws IOException {
+ try {
+ return fs.getFileStatus(currentPath);
+ } catch (FileNotFoundException e) {
+ // try archived path
+ Path archivedWAL = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
+ if (archivedWAL != null) {
+ return fs.getFileStatus(archivedWAL);
+ } else {
+ throw e;
}
- // if currentEntry != null then just return
- // if currentEntry == null but the file is still being written, then we should not switch to
- // the next log either, just return here and try next time to see if there are more entries in
- // the current file
}
- // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
}
// HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
- private boolean checkAllBytesParsed() throws IOException {
+ private boolean checkAllBytesParsed() {
// -1 means the wal wasn't closed cleanly.
final long trailerSize = currentTrailerSize();
FileStatus stat = null;
try {
- stat = fs.getFileStatus(this.currentPath);
- } catch (IOException exception) {
+ stat = getCurrentPathFileStatus();
+ } catch (IOException e) {
LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
- currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
+ currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat(), e);
metrics.incrUnknownFileLengthForClosedWAL();
}
// Here we use currentPositionOfReader instead of currentPositionOfEntry.
@@ -228,182 +388,72 @@ class WALEntryStream implements Closeable {
"Processing end of WAL {} at position {}, which is too far away from"
+ " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
- setPosition(0);
- resetReader();
metrics.incrRestartedWALReading();
metrics.incrRepeatedFileBytes(currentPositionOfReader);
return false;
}
}
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reached the end of " + this.currentPath + " and length of the file is "
- + (stat == null ? "N/A" : stat.getLen()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reached the end of {} and length of the file is {}", currentPath,
+ stat == null ? "N/A" : stat.getLen());
}
metrics.incrCompletedWAL();
return true;
}
- private void dequeueCurrentLog() throws IOException {
+ private void dequeueCurrentLog() {
LOG.debug("EOF, closing {}", currentPath);
closeReader();
logQueue.remove(walGroupId);
setCurrentPath(null);
- setPosition(0);
+ currentPositionOfEntry = 0;
+ state = null;
}
/**
* Returns whether the file is opened for writing.
*/
- private boolean readNextEntryAndRecordReaderPosition() throws IOException {
- Entry readEntry = reader.next();
- long readerPos = reader.getPosition();
+ private Pair<WALTailingReader.State, Boolean> readNextEntryAndRecordReaderPosition() {
OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
- if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
- // See HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
- // data, so we need to make sure that we do not read beyond the committed file length.
- if (LOG.isDebugEnabled()) {
- LOG.debug("The provider tells us the valid length for " + currentPath + " is "
- + fileLength.getAsLong() + ", but we have advanced to " + readerPos);
- }
- resetReader();
- return true;
- }
- if (readEntry != null) {
+ WALTailingReader.Result readResult = reader.next(fileLength.orElse(-1));
+ long readerPos = readResult.getEntryEndPos();
+ Entry readEntry = readResult.getEntry();
+ if (readResult.getState() == WALTailingReader.State.NORMAL) {
LOG.trace("reading entry: {} ", readEntry);
metrics.incrLogEditsRead();
metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
- }
- currentEntry = readEntry; // could be null
- this.currentPositionOfReader = readerPos;
- return fileLength.isPresent();
- }
-
- private void closeReader() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- }
-
- // if we don't have a reader, open a reader on the next log
- private boolean checkReader() throws IOException {
- if (reader == null) {
- return openNextLog();
- }
- return true;
- }
-
- // open a reader on the next log in queue
- private boolean openNextLog() throws IOException {
- PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
- Path nextPath = queue.peek();
- if (nextPath != null) {
- openReader(nextPath);
- if (reader != null) {
- return true;
- }
+ // record current entry and reader position
+ currentEntry = readResult.getEntry();
+ this.currentPositionOfReader = readerPos;
} else {
- // no more files in queue, this could only happen for recovered queue.
- setCurrentPath(null);
- }
- return false;
- }
-
- private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
- // If the log was archived, continue reading from there
- Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
- // archivedLog can be null if unable to locate in archiveDir.
- if (archivedLog != null) {
- openReader(archivedLog);
- } else {
- throw fnfe;
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
- justification = "HDFS-4380")
- private void openReader(Path path) throws IOException {
- try {
- // Detect if this is a new file, if so get a new reader else
- // reset the current reader so that we see the new data
- if (reader == null || !getCurrentPath().equals(path)) {
- closeReader();
- reader = WALFactory.createReader(fs, path, conf);
- seek();
- setCurrentPath(path);
- } else {
- resetReader();
- }
- } catch (FileNotFoundException fnfe) {
- handleFileNotFound(path, fnfe);
- } catch (RemoteException re) {
- IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
- if (!(ioe instanceof FileNotFoundException)) {
- throw ioe;
- }
- handleFileNotFound(path, (FileNotFoundException) ioe);
- } catch (LeaseNotRecoveredException lnre) {
- // HBASE-15019 the WAL was not closed due to some hiccup.
- LOG.warn("Try to recover the WAL lease " + path, lnre);
- recoverLease(conf, path);
- reader = null;
- } catch (NullPointerException npe) {
- // Workaround for race condition in HDFS-4380
- // which throws a NPE if we open a file before any data node has the most recent block
- // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
- LOG.warn("Got NPE opening reader, will retry.");
- reader = null;
- }
- }
-
- // For HBASE-15019
- private void recoverLease(final Configuration conf, final Path path) {
- try {
- final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf);
- RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
- @Override
- public boolean progress() {
- LOG.debug("recover WAL lease: " + path);
- return true;
- }
- });
- } catch (IOException e) {
- LOG.warn("unable to recover lease for WAL: " + path, e);
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
- justification = "HDFS-4380")
- private void resetReader() throws IOException {
- try {
+ LOG.trace("reading entry failed with: {}", readResult.getState());
+ // set current entry to null
currentEntry = null;
- reader.reset();
- seek();
- } catch (FileNotFoundException fnfe) {
- // If the log was archived, continue reading from there
- Path archivedLog = AbstractFSWALProvider.findArchivedLog(currentPath, conf);
- // archivedLog can be null if unable to locate in archiveDir.
- if (archivedLog != null) {
- openReader(archivedLog);
- } else {
- throw fnfe;
+ try {
+ this.currentPositionOfReader = reader.getPosition();
+ } catch (IOException e) {
+ LOG.warn("failed to get current position of reader", e);
+ if (readResult.getState().resetCompression()) {
+ return Pair.newPair(WALTailingReader.State.ERROR_AND_RESET_COMPRESSION,
+ fileLength.isPresent());
+ }
}
- } catch (NullPointerException npe) {
- throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
}
+ return Pair.newPair(readResult.getState(), fileLength.isPresent());
}
- private void seek() throws IOException {
- if (currentPositionOfEntry != 0) {
- reader.seek(currentPositionOfEntry);
+ private void closeReader() {
+ if (reader != null) {
+ reader.close();
+ reader = null;
}
}
private long currentTrailerSize() {
long size = -1L;
- if (reader instanceof ProtobufLogReader) {
- final ProtobufLogReader pblr = (ProtobufLogReader) reader;
- size = pblr.trailerSize();
+ if (reader instanceof AbstractProtobufWALReader) {
+ final AbstractProtobufWALReader pbwr = (AbstractProtobufWALReader) reader;
+ size = pbwr.trailerSize();
}
return size;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 7e5e33098c2..7d5974bb2cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.wal;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,7 +27,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
@@ -40,7 +38,6 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -69,15 +66,15 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
- // Only public so classes back in regionserver.wal can access
- public interface Reader extends WAL.Reader {
+ public interface Initializer {
/**
- * @param fs File system.
- * @param path Path.
- * @param c Configuration.
- * @param s Input stream that may have been pre-opened by the caller; may be null.
+ * A method to initialize a WAL reader.
+ * @param startPosition the start position you want to read from, -1 means start reading from
+ * the first WAL entry. Notice that, the first entry is not started at
+ * position as we have several headers, so typically you should not pass 0
+ * here.
*/
- void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException;
+ void init(FileSystem fs, Path path, Configuration c, long startPosition) throws IOException;
}
protected volatile T wal;
@@ -399,7 +396,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
serverName = null;
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
}
- if (serverName != null && serverName.getStartcode() < 0) {
+ if (serverName != null && serverName.getStartCode() < 0) {
LOG.warn("Invalid log file path=" + logFile);
serverName = null;
}
@@ -477,62 +474,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return null;
}
- /**
- * Opens WAL reader with retries and additional exception handling
- * @param path path to WAL file
- * @param conf configuration
- * @return WAL Reader instance
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DCN_NULLPOINTER_EXCEPTION",
- justification = "HDFS-4380")
- public static WAL.Reader openReader(Path path, Configuration conf) throws IOException {
- long retryInterval = 2000; // 2 sec
- int maxAttempts = 30;
- int attempt = 0;
- Exception ee = null;
- org.apache.hadoop.hbase.wal.WAL.Reader reader = null;
- while (reader == null && attempt++ < maxAttempts) {
- try {
- // Detect if this is a new file, if so get a new reader else
- // reset the current reader so that we see the new data
- reader = WALFactory.createReader(path.getFileSystem(conf), path, conf);
- return reader;
- } catch (FileNotFoundException fnfe) {
- // If the log was archived, continue reading from there
- Path archivedLog = AbstractFSWALProvider.findArchivedLog(path, conf);
- // archivedLog can be null if unable to locate in archiveDir.
- if (archivedLog != null) {
- return openReader(archivedLog, conf);
- } else {
- throw fnfe;
- }
- } catch (LeaseNotRecoveredException lnre) {
- // HBASE-15019 the WAL was not closed due to some hiccup.
- LOG.warn("Try to recover the WAL lease " + path, lnre);
- recoverLease(conf, path);
- reader = null;
- ee = lnre;
- } catch (NullPointerException npe) {
- // Workaround for race condition in HDFS-4380
- // which throws a NPE if we open a file before any data node has the most recent block
- // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
- LOG.warn("Got NPE opening reader, will retry.");
- reader = null;
- ee = npe;
- }
- if (reader == null) {
- // sleep before next attempt
- try {
- Thread.sleep(retryInterval);
- } catch (InterruptedException e) {
- }
- }
- }
- throw new IOException("Could not open reader", ee);
- }
-
// For HBASE-15019
- private static void recoverLease(final Configuration conf, final Path path) {
+ public static void recoverLease(Configuration conf, Path path) {
try {
final FileSystem dfs = CommonFSUtils.getCurrentFileSystem(conf);
RecoverLeaseFSUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
index 56e137e725f..e463591f518 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java
@@ -161,7 +161,8 @@ abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
throws IOException {
long dstMinLogSeqNum = -1L;
- try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
+ try (WALStreamReader reader =
+ walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 5212242b9ff..79db2a678a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -230,21 +230,6 @@ public interface WAL extends Closeable, WALFileLengthProvider {
@Override
String toString();
- /**
- * When outside clients need to consume persisted WALs, they rely on a provided Reader.
- */
- interface Reader extends Closeable {
- Entry next() throws IOException;
-
- Entry next(Entry reuse) throws IOException;
-
- void seek(long pos) throws IOException;
-
- long getPosition() throws IOException;
-
- void reset() throws IOException;
- }
-
/**
* Utility class that lets us keep track of the edit with it's key.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index e3968ae3cff..e8a5fa52bff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -28,16 +28,18 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
/**
* Entry point for users of the Write Ahead Log. Acts as the shim between internal use and the
* particular WALProvider we use to handle wal requests. Configure which provider gets used with the
@@ -57,6 +59,21 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class WALFactory {
+ /**
+ * Used in tests for injecting customized stream reader implementation, for example, inject fault
+ * when reading, etc.
+ * <p/>
+ * After removing the sequence file based WAL, we always use protobuf based WAL reader, and we
+ * will also determine whether the WAL file is encrypted and we should use
+ * {@link org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec} to decode by check the
+ * header of the WAL file, so we do not need to specify a specical reader to read the WAL file
+ * either.
+ * <p/>
+ * So typically you should not use this config in production.
+ */
+ public static final String WAL_STREAM_READER_CLASS_IMPL =
+ "hbase.regionserver.wal.stream.reader.impl";
+
private static final Logger LOG = LoggerFactory.getLogger(WALFactory.class);
/**
@@ -93,7 +110,7 @@ public class WALFactory {
/**
* Configuration-specified WAL Reader used when a custom reader is requested
*/
- private final Class<? extends AbstractFSWALProvider.Reader> logReaderClass;
+ private final Class<? extends WALStreamReader> walStreamReaderClass;
/**
* How long to attempt opening in-recovery wals
@@ -111,8 +128,12 @@ public class WALFactory {
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
- logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
- AbstractFSWALProvider.Reader.class);
+ walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
+ ProtobufWALStreamReader.class, WALStreamReader.class);
+ Preconditions.checkArgument(
+ AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
+ "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
+ AbstractFSWALProvider.Initializer.class.getName());
this.conf = conf;
// end required early initialization
@@ -196,8 +217,12 @@ public class WALFactory {
// happen prior to provider initialization, in case they need to instantiate a reader/writer.
timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000);
/* TODO Both of these are probably specific to the fs wal provider */
- logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
- AbstractFSWALProvider.Reader.class);
+ walStreamReaderClass = conf.getClass(WAL_STREAM_READER_CLASS_IMPL,
+ ProtobufWALStreamReader.class, WALStreamReader.class);
+ Preconditions.checkArgument(
+ AbstractFSWALProvider.Initializer.class.isAssignableFrom(walStreamReaderClass),
+ "The wal stream reader class %s is not a sub class of %s", walStreamReaderClass.getName(),
+ AbstractFSWALProvider.Initializer.class.getName());
this.conf = conf;
this.factoryId = factoryId;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
@@ -301,25 +326,26 @@ public class WALFactory {
}
}
- public Reader createReader(final FileSystem fs, final Path path) throws IOException {
- return createReader(fs, path, (CancelableProgressable) null);
+ public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
+ return createStreamReader(fs, path, (CancelableProgressable) null);
}
/**
- * Create a reader for the WAL. If you are reading from a file that's being written to and need to
- * reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method then just seek
- * back to the last known good position.
+ * Create a one-way stream reader for the WAL.
* @return A WAL reader. Close when done with it.
*/
- public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter)
- throws IOException {
- return createReader(fs, path, reporter, true);
+ public WALStreamReader createStreamReader(FileSystem fs, Path path,
+ CancelableProgressable reporter) throws IOException {
+ return createStreamReader(fs, path, reporter, -1);
}
- public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter,
- boolean allowCustom) throws IOException {
- Class<? extends AbstractFSWALProvider.Reader> lrClass =
- allowCustom ? logReaderClass : ProtobufLogReader.class;
+ /**
+ * Create a one-way stream reader for the WAL, and start reading from the given
+ * {@code startPosition}.
+ * @return A WAL reader. Close when done with it.
+ */
+ public WALStreamReader createStreamReader(FileSystem fs, Path path,
+ CancelableProgressable reporter, long startPosition) throws IOException {
try {
// A wal file could be under recovery, so it may take several
// tries to get it open. Instead of claiming it is corrupted, retry
@@ -327,22 +353,17 @@ public class WALFactory {
long startWaiting = EnvironmentEdgeManager.currentTime();
long openTimeout = timeoutMillis + startWaiting;
int nbAttempt = 0;
- AbstractFSWALProvider.Reader reader = null;
+ WALStreamReader reader = null;
while (true) {
try {
- reader = lrClass.getDeclaredConstructor().newInstance();
- reader.init(fs, path, conf, null);
+ reader = walStreamReaderClass.getDeclaredConstructor().newInstance();
+ ((AbstractFSWALProvider.Initializer) reader).init(fs, path, conf, startPosition);
return reader;
} catch (Exception e) {
// catch Exception so that we close reader for all exceptions. If we don't
// close the reader, we leak a socket.
if (reader != null) {
- try {
- reader.close();
- } catch (IOException exception) {
- LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
- LOG.debug("exception details", exception);
- }
+ reader.close();
}
// Only inspect the Exception to consider retry when it's an IOException
@@ -435,34 +456,30 @@ public class WALFactory {
}
/**
- * Create a reader for the given path, accept custom reader classes from conf. If you already have
- * a WALFactory, you should favor the instance method.
- * @return a WAL Reader, caller must close.
+ * Create a tailing reader for the given path. Mainly used in replication.
*/
- public static Reader createReader(final FileSystem fs, final Path path,
- final Configuration configuration) throws IOException {
- return getInstance(configuration).createReader(fs, path);
+ public static WALTailingReader createTailingReader(FileSystem fs, Path path, Configuration conf,
+ long startPosition) throws IOException {
+ ProtobufWALTailingReader reader = new ProtobufWALTailingReader();
+ reader.init(fs, path, conf, startPosition);
+ return reader;
}
/**
- * Create a reader for the given path, accept custom reader classes from conf. If you already have
- * a WALFactory, you should favor the instance method.
- * @return a WAL Reader, caller must close.
+ * Create a one-way stream reader for a given path.
*/
- static Reader createReader(final FileSystem fs, final Path path,
- final Configuration configuration, final CancelableProgressable reporter) throws IOException {
- return getInstance(configuration).createReader(fs, path, reporter);
+ public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf)
+ throws IOException {
+ return createStreamReader(fs, path, conf, -1);
}
/**
- * Create a reader for the given path, ignore custom reader classes from conf. If you already have
- * a WALFactory, you should favor the instance method. only public pending move of
- * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
- * @return a WAL Reader, caller must close.
+ * Create a one-way stream reader for a given path.
*/
- public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
- final Configuration configuration) throws IOException {
- return getInstance(configuration).createReader(fs, path, null, false);
+ public static WALStreamReader createStreamReader(FileSystem fs, Path path, Configuration conf,
+ long startPosition) throws IOException {
+ return getInstance(conf).createStreamReader(fs, path, (CancelableProgressable) null,
+ startPosition);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
index f810e034502..b03357b9332 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.GsonUtil;
@@ -266,10 +266,10 @@ public class WALPrettyPrinter {
throw new IOException(p + " is not a file");
}
- WAL.Reader log = WALFactory.createReader(fs, p, conf);
+ WALStreamReader log = WALFactory.createStreamReader(fs, p, conf, position > 0 ? position : -1);
- if (log instanceof ProtobufLogReader) {
- List<String> writerClsNames = ((ProtobufLogReader) log).getWriterClsNames();
+ if (log instanceof AbstractProtobufWALReader) {
+ List<String> writerClsNames = ((AbstractProtobufWALReader) log).getWriterClsNames();
if (writerClsNames != null && writerClsNames.size() > 0) {
out.print("Writer Classes: ");
for (int i = 0; i < writerClsNames.size(); i++) {
@@ -281,7 +281,7 @@ public class WALPrettyPrinter {
out.println();
}
- String cellCodecClsName = ((ProtobufLogReader) log).getCodecClsName();
+ String cellCodecClsName = ((AbstractProtobufWALReader) log).getCodecClsName();
if (cellCodecClsName != null) {
out.println("Cell Codec Class: " + cellCodecClsName);
}
@@ -292,10 +292,6 @@ public class WALPrettyPrinter {
firstTxn = true;
}
- if (position > 0) {
- log.seek(position);
- }
-
try {
WAL.Entry entry;
while ((entry = log.next()) != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 2ed9ffc96c8..9576b3b3790 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -289,7 +288,7 @@ public class WALSplitter {
int editsSkipped = 0;
MonitoredTask status = TaskMonitor.get()
.createStatus("Splitting " + wal + " to temporary staging area.", false, true);
- Reader walReader = null;
+ WALStreamReader walReader = null;
this.fileBeingSplit = walStatus;
long startTS = EnvironmentEdgeManager.currentTime();
long length = walStatus.getLen();
@@ -406,12 +405,8 @@ public class WALSplitter {
final String log = "Finishing writing output for " + wal + " so closing down";
LOG.debug(log);
status.setStatus(log);
- try {
- if (null != walReader) {
- walReader.close();
- }
- } catch (IOException exception) {
- LOG.warn("Could not close {} reader", wal, exception);
+ if (null != walReader) {
+ walReader.close();
}
try {
if (outputSinkStarted) {
@@ -442,14 +437,14 @@ public class WALSplitter {
}
/**
- * Create a new {@link Reader} for reading logs to split.
+ * Create a new {@link WALStreamReader} for reading logs to split.
* @return Returns null if file has length zero or file can't be found.
*/
- protected Reader getReader(FileStatus walStatus, boolean skipErrors,
+ protected WALStreamReader getReader(FileStatus walStatus, boolean skipErrors,
CancelableProgressable cancel) throws IOException, CorruptedLogFileException {
Path path = walStatus.getPath();
long length = walStatus.getLen();
- Reader in;
+ WALStreamReader in;
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
@@ -489,7 +484,7 @@ public class WALSplitter {
return in;
}
- private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+ private Entry getNextLogLine(WALStreamReader in, Path path, boolean skipErrors)
throws CorruptedLogFileException, IOException {
try {
return in.next();
@@ -524,11 +519,12 @@ public class WALSplitter {
}
/**
- * Create a new {@link Reader} for reading logs to split.
+ * Create a new {@link WALStreamReader} for reading logs to split.
* @return new Reader instance, caller should close
*/
- private Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
- return walFactory.createReader(walFS, curLogFile, reporter);
+ private WALStreamReader getReader(Path curLogFile, CancelableProgressable reporter)
+ throws IOException {
+ return walFactory.createStreamReader(walFS, curLogFile, reporter);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALStreamReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALStreamReader.java
new file mode 100644
index 00000000000..ef927139906
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALStreamReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A one way WAL reader, without reset and seek support.
+ * <p/>
+ * In most cases you should use this interface to read WAL file, as the implementation is simple and
+ * robust. For replication, where we want to tail the WAL file which is currently being written, you
+ * should use {@link WALTailingReader} instead.
+ * @see WALTailingReader
+ */
+@InterfaceAudience.Private
+public interface WALStreamReader extends Closeable {
+
+ /**
+ * Read the next entry in WAL.
+ * <p/>
+ * In most cases you should just use this method, especially when reading a closed wal file for
+ * splitting or printing.
+ */
+ default WAL.Entry next() throws IOException {
+ return next(null);
+ }
+
+ /**
+ * Read the next entry in WAL, use the given {@link WAL.Entry} if not {@code null} to hold the
+ * data.
+ * <p/>
+ * Mainly used in MR.
+ * @param reuse the entry to be used for reading, can be {@code null}
+ */
+ WAL.Entry next(WAL.Entry reuse) throws IOException;
+
+ /**
+ * Get the current reading position.
+ */
+ long getPosition() throws IOException;
+
+ /**
+ * Override to remove the 'throws IOException' as we are just a reader.
+ */
+ @Override
+ void close();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALTailingReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALTailingReader.java
new file mode 100644
index 00000000000..319fb7fb956
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALTailingReader.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A WAL reader which is designed for be able to tailing the WAL file which is currently being
+ * written. It adds support
+ */
+@InterfaceAudience.Private
+public interface WALTailingReader extends Closeable {
+
+ enum State {
+ /** This means we read an Entry without any error */
+ NORMAL,
+ /**
+ * This means the WAL file has a trailer and we have reached it, which means we have finished
+ * reading this file normally
+ */
+ EOF_WITH_TRAILER,
+ /**
+ * This means we meet an error so the upper layer need to reset to read again
+ */
+ ERROR_AND_RESET,
+ /**
+ * Mostly the same with the above {@link #ERROR_AND_RESET}, the difference is that here we also
+ * mess up the compression dictionary when reading data, so the upper layer should also clear
+ * the compression context when reseting, which means when calling resetTo method, we need to
+ * skip to the position instead of just seek to, which will impact performance.
+ */
+ ERROR_AND_RESET_COMPRESSION,
+ /**
+ * This means we reach the EOF and the upper layer need to reset to see if there is more data.
+ * Notice that this does not mean that there is necessarily more data, the upper layer should
+ * determine whether they need to reset and read again.
+ */
+ EOF_AND_RESET,
+ /**
+ * Mostly the same with the above {@link #EOF_AND_RESET}, the difference is that here we also
+ * mess up the compression dictionary when reading data, so the upper layer should also clear
+ * the compression context when reseting, which means when calling resetTo method, we need to
+ * skip to the position instead of just seek to, which will impact performance. The
+ * implementation should try its best to not fall into this situation.
+ */
+ EOF_AND_RESET_COMPRESSION;
+
+ /**
+ * A dummy result for returning, as except {@link NORMAL}, for other state we do not need to
+ * provide fields other than state in the returned {@link Result}.
+ */
+ private Result result = new Result(this, null, -1);
+
+ public Result getResult() {
+ return result;
+ }
+
+ public boolean resetCompression() {
+ return this == ERROR_AND_RESET_COMPRESSION || this == EOF_AND_RESET_COMPRESSION;
+ }
+
+ public boolean eof() {
+ return this == EOF_AND_RESET || this == EOF_AND_RESET_COMPRESSION || this == EOF_WITH_TRAILER;
+ }
+ }
+
+ final class Result {
+
+ private final State state;
+ private final Entry entry;
+ private final long entryEndPos;
+
+ public Result(State state, Entry entry, long entryEndPos) {
+ this.state = state;
+ this.entry = entry;
+ this.entryEndPos = entryEndPos;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public Entry getEntry() {
+ return entry;
+ }
+
+ public long getEntryEndPos() {
+ return entryEndPos;
+ }
+ }
+
+ /**
+ * Read the next entry and make sure the position after reading does not go beyond the given
+ * {@code limit}.
+ * <p/>
+ * Notice that we will not throw any checked exception out, all the states are represented by the
+ * return value. Of course we will log the exceptions out. The reason why we do this is that, for
+ * tailing a WAL file which is currently being written, we will hit EOFException many times, so it
+ * should not be considered as an 'exception' and also, creating an Exception is a bit expensive.
+ * @param limit the position limit. See HBASE-14004 for more details about why we need this
+ * limitation. -1 means no limit.
+ */
+ Result next(long limit);
+
+ /**
+ * Get the current reading position.
+ */
+ long getPosition() throws IOException;
+
+ /**
+ * Reopen the reader to see if there is new data arrives, and also seek(or skip) to the given
+ * position.
+ * <p/>
+ * If you want to read from the beginning instead of a given position, please pass -1 as
+ * {@code position}, then the reader will locate to the first entry. Notice that, since we have a
+ * magic header and a pb header, the first WAL entry is not located at position 0, so passing 0
+ * will cause trouble.
+ * @param position the position we want to start reading from after resetting, or -1 if
+ * you want to start reading from the beginning.
+ * @param resetCompression whether we also need to clear the compression context. If {@code true},
+ * we will use skip instead of seek after resetting.
+ */
+ void resetTo(long position, boolean resetCompression) throws IOException;
+
+ /**
+ * Override to remove the 'throws IOException' as we are just a reader.
+ */
+ @Override
+ void close();
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
index a01b8542292..b619ad265f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
@@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -93,8 +94,8 @@ public class TestSequenceIdMonotonicallyIncreasing {
private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException {
Path walFile = ((AbstractFSWAL<?>) rs.getWAL(null)).getCurrentFileName();
long maxSeqId = -1L;
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
+ try (WALStreamReader reader =
+ NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index cf1a1cd9a5f..d228c5f6072 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -30,8 +30,6 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
@@ -59,7 +57,6 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.After;
@@ -402,19 +399,6 @@ public abstract class AbstractTestDLS {
return;
}
- private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
- int count = 0;
- try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
- WAL.Entry e;
- while ((e = in.next()) != null) {
- if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
- count++;
- }
- }
- }
- return count;
- }
-
private void blockUntilNoRIT() throws Exception {
TEST_UTIL.waitUntilNoRegionsInTransition(60000);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b4583958629..8c42acd2633 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -176,6 +176,7 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -1080,8 +1081,8 @@ public class TestHRegion {
// now verify that the flush markers are written
wal.shutdown();
- WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
- TEST_UTIL.getConfiguration());
+ WALStreamReader reader = WALFactory.createStreamReader(fs,
+ AbstractFSWALProvider.getCurrentFileName(wal), TEST_UTIL.getConfiguration());
try {
List<WAL.Entry> flushDescriptors = new ArrayList<>();
long lastFlushSeqId = -1;
@@ -1132,14 +1133,7 @@ public class TestHRegion {
}
writer.close();
} finally {
- if (null != reader) {
- try {
- reader.close();
- } catch (IOException exception) {
- LOG.warn("Problem closing wal: " + exception.getMessage());
- LOG.debug("exception details", exception);
- }
- }
+ reader.close();
}
// close the region now, and reopen again
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index bdc81974f90..4c7384a34ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -73,11 +73,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.AfterClass;
@@ -143,7 +145,7 @@ public class TestHRegionReplayEvents {
private RegionInfo primaryHri, secondaryHri;
private HRegion primaryRegion, secondaryRegion;
private WAL walPrimary, walSecondary;
- private WAL.Reader reader;
+ private WALStreamReader reader;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -317,8 +319,8 @@ public class TestHRegionReplayEvents {
return Integer.parseInt(Bytes.toString(put.getRow()));
}
- WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
- return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
+ private WALStreamReader createWALReaderForPrimary() throws FileNotFoundException, IOException {
+ return NoEOFWALStreamReader.create(TEST_UTIL.getTestFileSystem(),
AbstractFSWALProvider.getCurrentFileName(walPrimary), TEST_UTIL.getConfiguration());
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
index 8c4a1eee0f7..15209baff9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -179,7 +180,7 @@ public class TestRecoveredEdits {
int count = 0;
// Read all cells from recover edits
List<Cell> walCells = new ArrayList<>();
- try (WAL.Reader reader = WALFactory.createReader(fs, edits, conf)) {
+ try (WALStreamReader reader = WALFactory.createStreamReader(fs, edits, conf)) {
WAL.Entry entry;
while ((entry = reader.next()) != null) {
WALKey key = entry.getKey();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
index 3291b39b6a0..6ea7d6dccc1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -203,11 +204,11 @@ public class TestWALMonotonicallyIncreasingSeqId {
TEST_UTIL.cleanupTestDir();
}
- private WAL.Reader createReader(Path logPath, Path oldWalsDir) throws IOException {
+ private WALStreamReader createReader(Path logPath, Path oldWalsDir) throws IOException {
try {
- return wals.createReader(fileSystem, logPath);
+ return wals.createStreamReader(fileSystem, logPath);
} catch (IOException e) {
- return wals.createReader(fileSystem, new Path(oldWalsDir, logPath.getName()));
+ return wals.createStreamReader(fileSystem, new Path(oldWalsDir, logPath.getName()));
}
}
@@ -229,7 +230,7 @@ public class TestWALMonotonicallyIncreasingSeqId {
Thread.sleep(10);
Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR));
Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
- try (WAL.Reader reader = createReader(logPath, oldWalsDir)) {
+ try (WALStreamReader reader = createReader(logPath, oldWalsDir)) {
long currentMaxSeqid = 0;
for (WAL.Entry e; (e = reader.next()) != null;) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
index 6a4800fb1da..e765fb8a4a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -53,6 +53,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
/**
@@ -139,13 +141,12 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
Path path = new Path(dir, "tempwal");
// delete the log if already exists, for test only
fs.delete(path, true);
+ HRegionInfo hri =
+ new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ fs.mkdirs(dir);
W writer = null;
- ProtobufLogReader reader = null;
try {
- HRegionInfo hri =
- new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
- HTableDescriptor htd = new HTableDescriptor(tableName);
- fs.mkdirs(dir);
// Write log in pb format.
writer = createWriter(path);
for (int i = 0; i < recordCount; ++i) {
@@ -162,39 +163,41 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
append(writer, new WAL.Entry(key, edit));
}
sync(writer);
- if (withTrailer) writer.close();
-
- // Now read the log using standard means.
- reader = (ProtobufLogReader) wals.createReader(fs, path);
if (withTrailer) {
- assertNotNull(reader.trailer);
- } else {
- assertNull(reader.trailer);
+ writer.close();
+ writer = null;
}
- for (int i = 0; i < recordCount; ++i) {
- WAL.Entry entry = reader.next();
- assertNotNull(entry);
- assertEquals(columnCount, entry.getEdit().size());
- assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
- assertEquals(tableName, entry.getKey().getTableName());
- int idx = 0;
- for (Cell val : entry.getEdit().getCells()) {
- assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
- val.getRowLength()));
- String value = i + "" + idx;
- assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
- idx++;
+ // Now read the log using standard means.
+ try (ProtobufWALStreamReader reader =
+ (ProtobufWALStreamReader) wals.createStreamReader(fs, path)) {
+ if (withTrailer) {
+ assertNotNull(reader.trailer);
+ } else {
+ assertNull(reader.trailer);
+ }
+ for (int i = 0; i < recordCount; ++i) {
+ WAL.Entry entry = reader.next();
+ assertNotNull(entry);
+ assertEquals(columnCount, entry.getEdit().size());
+ assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
+ assertEquals(tableName, entry.getKey().getTableName());
+ int idx = 0;
+ for (Cell val : entry.getEdit().getCells()) {
+ assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(),
+ val.getRowLength()));
+ String value = i + "" + idx;
+ assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val));
+ idx++;
+ }
+ }
+ if (withTrailer) {
+ assertNotNull(reader.trailer);
+ } else {
+ assertNull(reader.trailer);
}
}
- WAL.Entry entry = reader.next();
- assertNull(entry);
} finally {
- if (writer != null) {
- writer.close();
- }
- if (reader != null) {
- reader.close();
- }
+ Closeables.close(writer, true);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 8dff69ce431..2ef7ece92db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.junit.After;
import org.junit.AfterClass;
@@ -893,15 +894,10 @@ public abstract class AbstractTestWALReplay {
// here we let the DFSInputStream throw an IOException just after the WALHeader.
Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first();
- FSDataInputStream stream = fs.open(editFile);
- stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length);
- Class<? extends AbstractFSWALProvider.Reader> logReaderClass =
- conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class,
- AbstractFSWALProvider.Reader.class);
- AbstractFSWALProvider.Reader reader = logReaderClass.getDeclaredConstructor().newInstance();
- reader.init(this.fs, editFile, conf, stream);
- final long headerLength = stream.getPos();
- reader.close();
+ final long headerLength;
+ try (WALStreamReader reader = WALFactory.createStreamReader(fs, editFile, conf)) {
+ headerLength = reader.getPosition();
+ }
FileSystem spyFs = spy(this.fs);
doAnswer(new Answer<FSDataInputStream>() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufWALStreamReader.java
similarity index 88%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufWALStreamReader.java
index a2fd2c3a125..cbfa90cf38c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufWALStreamReader.java
@@ -18,11 +18,11 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-public class FaultyProtobufLogReader extends ProtobufLogReader {
+public class FaultyProtobufWALStreamReader extends ProtobufWALStreamReader {
// public until class relocates to o.a.h.h.wal
public enum FailureType {
@@ -32,7 +32,7 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
NONE
}
- Queue<Entry> nextQueue = new LinkedList<>();
+ Queue<Entry> nextQueue = new ArrayDeque<>();
int numberOfFileEntries = 0;
FailureType getFailureType() {
@@ -42,13 +42,15 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
@Override
public Entry next(Entry reuse) throws IOException {
if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading
- boolean b;
- do {
+ for (;;) {
Entry e = new Entry();
- b = readNext(e);
+ e = super.next(e);
+ if (e == null) {
+ break;
+ }
nextQueue.offer(e);
numberOfFileEntries++;
- } while (b);
+ }
}
if (nextQueue.size() == this.numberOfFileEntries && getFailureType() == FailureType.BEGINNING) {
@@ -61,10 +63,6 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
throw new IOException("fake Exception");
}
- if (nextQueue.peek() != null) {
- edit++;
- }
-
Entry e = nextQueue.poll();
if (e.getEdit().isEmpty()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
index 07adbde128e..0cdb35b22ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -276,14 +277,7 @@ public class TestDurability {
private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
- WAL.Reader reader = wals.createReader(FS, walPath);
- int count = 0;
- WAL.Entry entry = new WAL.Entry();
- while (reader.next(entry) != null) {
- count++;
- }
- reader.close();
- assertEquals(expected, count);
+ assertEquals(expected, NoEOFWALStreamReader.count(wals, FS, walPath));
}
// lifted from TestAtomicOperation
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index 130a5e73ba0..c098140fbe9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.BeforeClass;
@@ -515,9 +516,8 @@ public class TestLogRolling extends AbstractTestLogRolling {
TEST_UTIL.getConfiguration(), null);
LOG.debug("Reading WAL " + CommonFSUtils.getPath(p));
- WAL.Reader reader = null;
- try {
- reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
+ try (WALStreamReader reader =
+ WALFactory.createStreamReader(fs, p, TEST_UTIL.getConfiguration())) {
WAL.Entry entry;
while ((entry = reader.next()) != null) {
LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
@@ -528,8 +528,6 @@ public class TestLogRolling extends AbstractTestLogRolling {
}
} catch (EOFException e) {
LOG.debug("EOF reading file " + CommonFSUtils.getPath(p));
- } finally {
- if (reader != null) reader.close();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
index 01286a867d4..e4184df553a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureAsyncWALReplay.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@@ -41,8 +40,6 @@ public class TestSecureAsyncWALReplay extends TestAsyncWALReplay {
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- Reader.class);
conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
AsyncWriter.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java
index eea0e63f7e1..d531707e257 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSecureWALReplay.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -41,8 +40,6 @@ public class TestSecureWALReplay extends TestWALReplay {
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
Writer.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index 69ff1c2722d..d4f76584f00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -36,10 +36,11 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -196,12 +197,9 @@ public class SerialReplicationTestBase {
@Override
public boolean evaluate() throws Exception {
- try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
- int count = 0;
- while (reader.next() != null) {
- count++;
- }
- return count >= expectedEntries;
+ try {
+ return NoEOFWALStreamReader.count(FS, logPath, UTIL.getConfiguration())
+ >= expectedEntries;
} catch (IOException e) {
return false;
}
@@ -228,8 +226,8 @@ public class SerialReplicationTestBase {
}
protected final void checkOrder(int expectedEntries) throws IOException {
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ try (WALStreamReader reader =
+ NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 78e6ec0be96..63cbfe3119c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -161,11 +161,14 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
*/
@Test
public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
+ final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+ // make sure we only the current active wal file in queue
+ verifyNumberOfLogsInQueue(1, numRs);
+
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
hbaseAdmin.disableReplicationPeer(PEER_ID2);
- int numOfEntriesToReplicate = 20;
- final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
+ int numOfEntriesToReplicate = 20;
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = EnvironmentEdgeManager.currentTime();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 4e9fa8fe617..7ab82b60cec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -41,9 +41,9 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.NoEOFWALStreamReader;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -107,8 +107,8 @@ public class TestSerialReplication extends SerialReplicationTestBase {
Map<String, Long> regionsToSeqId = new HashMap<>();
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ try (WALStreamReader reader =
+ NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
@@ -168,8 +168,8 @@ public class TestSerialReplication extends SerialReplicationTestBase {
RegionInfo region = regionsAfterMerge.get(0);
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ try (WALStreamReader reader =
+ NoEOFWALStreamReader.create(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index eda89b232c3..efd76854250 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -69,17 +71,33 @@ import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
+ @Parameter
+ public boolean isCompressionEnabled;
+
+ @Parameters(name = "{index}: isCompressionEnabled={0}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[] { false }, new Object[] { true });
+ }
+
@Before
public void setUp() throws Exception {
+ CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
initWAL();
}
+ private Entry next(WALEntryStream entryStream) {
+ assertEquals(HasNext.YES, entryStream.hasNext());
+ return entryStream.next();
+ }
+
/**
* Tests basic reading of log appends
*/
@@ -88,24 +106,24 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
appendToLogAndSync();
long oldPos;
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
// There's one edit in the log, read it. Reading past it needs to throw exception
- assertTrue(entryStream.hasNext());
+ assertEquals(HasNext.YES, entryStream.hasNext());
WAL.Entry entry = entryStream.peek();
assertSame(entry, entryStream.next());
assertNotNull(entry);
- assertFalse(entryStream.hasNext());
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
assertNull(entryStream.peek());
- assertNull(entryStream.next());
+ assertThrows(IllegalStateException.class, () -> entryStream.next());
oldPos = entryStream.getPosition();
}
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
- null, new MetricsSource("1"), fakeWalGroupId)) {
+ try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, oldPos, log,
+ new MetricsSource("1"), fakeWalGroupId)) {
// Read the newly added entry, make sure we made progress
- WAL.Entry entry = entryStream.next();
+ WAL.Entry entry = next(entryStream);
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
oldPos = entryStream.getPosition();
@@ -116,19 +134,20 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
log.rollWriter();
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
- null, new MetricsSource("1"), fakeWalGroupId)) {
- WAL.Entry entry = entryStream.next();
+ try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
+ oldPos, log, new MetricsSource("1"), fakeWalGroupId)) {
+ WAL.Entry entry = next(entryStream);
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
// next item should come from the new log
- entry = entryStream.next();
+ entry = next(entryStream);
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
- // no more entries to read
- assertFalse(entryStream.hasNext());
+ // no more entries to read, disable retry otherwise we will get a wait too much time error
+ entryStream.disableRetry();
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
oldPos = entryStream.getPosition();
}
}
@@ -138,25 +157,35 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
* don't mistakenly dequeue the current log thinking we're done with it
*/
@Test
- public void testLogrollWhileStreaming() throws Exception {
+ public void testLogRollWhileStreaming() throws Exception {
appendToLog("1");
- appendToLog("2");// 2
- try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
- new MetricsSource("1"), fakeWalGroupId)) {
- assertEquals("1", getRow(entryStream.next()));
-
- appendToLog("3"); // 3 - comes in after reader opened
- log.rollWriter(); // log roll happening while we're reading
- appendToLog("4"); // 4 - this append is in the rolled log
-
- assertEquals("2", getRow(entryStream.next()));
- assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
- // entry in first log
- assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
- // and 3 would be skipped
- assertEquals("4", getRow(entryStream.next())); // 4
- assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
- assertFalse(entryStream.hasNext());
+ // 2
+ appendToLog("2");
+ try (WALEntryStreamWithRetries entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF,
+ 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+ assertEquals("1", getRow(next(entryStream)));
+
+ // 3 - comes in after reader opened
+ appendToLog("3");
+ // log roll happening while we're reading
+ log.rollWriter();
+ // 4 - this append is in the rolled log
+ appendToLog("4");
+
+ assertEquals("2", getRow(next(entryStream)));
+ // we should not have dequeued yet since there's still an entry in first log
+ assertEquals(2, getQueue().size());
+ // if implemented improperly, this would be 4 and 3 would be skipped
+ assertEquals("3", getRow(next(entryStream)));
+ // 4
+ assertEquals("4", getRow(next(entryStream)));
+ // now we've dequeued and moved on to next log properly
+ assertEquals(1, getQueue().size());
+
+ // disable so we can get the return value immediately, otherwise we will fail with wait too
+ // much time...
+ entryStream.disableRetry();
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
}
}
@@ -168,21 +197,21 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
- entryStream.next(); // we've hit the end of the stream at this point
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+ assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
appendToLog("2");
appendToLog("3");
// don't see them
- assertFalse(entryStream.hasNext());
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
- // But we do if we reset
- entryStream.reset();
- assertEquals("2", getRow(entryStream.next()));
- assertEquals("3", getRow(entryStream.next()));
- assertFalse(entryStream.hasNext());
+ // But we do if we retry next time, as the entryStream will reset the reader
+ assertEquals("2", getRow(next(entryStream)));
+ assertEquals("3", getRow(next(entryStream)));
+ // reached the end again
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
}
}
@@ -191,18 +220,18 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
- entryStream.next(); // we've hit the end of the stream at this point
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+ assertNotNull(next(entryStream)); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
lastPosition = entryStream.getPosition();
}
// next stream should picks up where we left off
- try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
new MetricsSource("1"), fakeWalGroupId)) {
- assertEquals("2", getRow(entryStream.next()));
- assertEquals("3", getRow(entryStream.next()));
- assertFalse(entryStream.hasNext()); // done
+ assertEquals("2", getRow(next(entryStream)));
+ assertEquals("3", getRow(next(entryStream)));
+ assertEquals(HasNext.RETRY, entryStream.hasNext()); // done
assertEquals(1, getQueue().size());
}
}
@@ -211,31 +240,30 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
* Tests that if we stop before hitting the end of a stream, we can continue where we left off
* using the last position
*/
-
@Test
public void testPosition() throws Exception {
long lastPosition = 0;
appendEntriesToLogAndSync(3);
// read only one element
- try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
new MetricsSource("1"), fakeWalGroupId)) {
- entryStream.next();
+ assertNotNull(next(entryStream));
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
- try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, lastPosition, log,
new MetricsSource("1"), fakeWalGroupId)) {
- assertNotNull(entryStream.next());
- assertNotNull(entryStream.next());
- assertFalse(entryStream.hasNext());
+ assertNotNull(next(entryStream));
+ assertNotNull(next(entryStream));
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
}
}
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
- assertFalse(entryStream.hasNext());
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
}
}
@@ -309,10 +337,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
- entryStream.next();
- entryStream.next();
- entryStream.next();
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+ for (int i = 0; i < 3; i++) {
+ assertNotNull(next(entryStream));
+ }
position = entryStream.getPosition();
}
@@ -340,10 +368,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
- entryStream.next();
- entryStream.next();
- entryStream.next();
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+ for (int i = 0; i < 3; i++) {
+ assertNotNull(next(entryStream));
+ }
position = entryStream.getPosition();
}
@@ -455,10 +483,10 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
- entryStream.next();
- entryStream.next();
- entryStream.next();
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
+ for (int i = 0; i < 3; i++) {
+ assertNotNull(next(entryStream));
+ }
position = entryStream.getPosition();
}
@@ -562,28 +590,24 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
- try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
- p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
- assertTrue(entryStream.hasNext());
- assertNotNull(entryStream.next());
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, CONF, 0,
+ p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"), fakeWalGroupId)) {
+ assertNotNull(next(entryStream));
// can not get log 2
- assertFalse(entryStream.hasNext());
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
Thread.sleep(1000);
- entryStream.reset();
// still can not get log 2
- assertFalse(entryStream.hasNext());
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
// can get log 2 now
fileLength.set(size);
- entryStream.reset();
- assertTrue(entryStream.hasNext());
- assertNotNull(entryStream.next());
+ assertNotNull(next(entryStream));
- assertFalse(entryStream.hasNext());
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
}
}
- /*
+ /**
* Test removal of 0 length log from logQueue if the source is a recovered source and size of
* logQueue is only 1.
*/
@@ -625,13 +649,12 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
ReplicationSource source = mockReplicationSource(true, conf);
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
// Create a 0 length log.
- Path emptyLog = new Path(fs.getHomeDirectory(), "log.2");
- FSDataOutputStream fsdos = fs.create(emptyLog);
- fsdos.close();
+ Path emptyLog = new Path(fs.getHomeDirectory(), "log.2." + isCompressionEnabled);
+ fs.create(emptyLog).close();
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
- final Path log1 = new Path(fs.getHomeDirectory(), "log.1");
+ final Path log1 = new Path(fs.getHomeDirectory(), "log.1." + isCompressionEnabled);
WALProvider.Writer writer1 = WALFactory.createWALWriter(fs, log1, TEST_UTIL.getConfiguration());
appendEntries(writer1, 3);
localLogQueue.enqueueLog(log1, fakeWalGroupId);
@@ -678,7 +701,7 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
writer.close();
}
- /**
+ /***
* Tests size of log queue is incremented and decremented properly.
*/
@Test
@@ -688,16 +711,21 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
appendToLogAndSync();
log.rollWriter();
+ // wait until the previous WAL file is cleanly closed, so later we can aleays see
+ // RETRY_IMMEDIATELY instead of RETRY. The wait here is necessary because the closing of a WAL
+ // writer is asynchronouns
+ TEST_UTIL.waitFor(30000, () -> fs.getClient().isFileClosed(logQueue.getQueue(fakeWalGroupId)
+ .peek().makeQualified(fs.getUri(), fs.getWorkingDirectory()).toUri().getPath()));
// After rolling there will be 2 wals in the queue
assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
+ new WALEntryStream(logQueue, fs, CONF, 0, log, logQueue.getMetrics(), fakeWalGroupId)) {
// There's one edit in the log, read it.
- assertTrue(entryStream.hasNext());
- WAL.Entry entry = entryStream.next();
- assertNotNull(entry);
- assertFalse(entryStream.hasNext());
+ assertNotNull(next(entryStream));
+ // we've switched to the next WAL, and the previous WAL file is closed cleanly, so it is
+ // RETRY_IMMEDIATELY
+ assertEquals(HasNext.RETRY_IMMEDIATELY, entryStream.hasNext());
}
// After removing one wal, size of log queue will be 1 again.
assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
@@ -709,26 +737,25 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
*/
@Test
public void testCleanClosedWALs() throws Exception {
- try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
+ try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, fs, CONF, 0, log,
logQueue.getMetrics(), fakeWalGroupId)) {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
appendToLogAndSync();
- assertNotNull(entryStream.next());
+ assertNotNull(next(entryStream));
log.rollWriter();
appendToLogAndSync();
- assertNotNull(entryStream.next());
+ assertNotNull(next(entryStream));
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}
/**
* Tests that we handle EOFException properly if the wal has moved to oldWALs directory.
- * @throws Exception exception
*/
@Test
public void testEOFExceptionInOldWALsDirectory() throws Exception {
assertEquals(1, logQueue.getQueueSize(fakeWalGroupId));
- AbstractFSWAL abstractWAL = (AbstractFSWAL) log;
+ AbstractFSWAL<?> abstractWAL = (AbstractFSWAL<?>) log;
Path emptyLogFile = abstractWAL.getCurrentFileName();
log.rollWriter(true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
index 8e2a01c177e..2c37e34ae4e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* TestBasicWALEntryStream with {@link AsyncFSWALProvider} as the WAL provider.
*/
+@RunWith(Parameterized.class)
@Category({ ReplicationTests.class, MediumTests.class })
public class TestBasicWALEntryStreamAsyncFSWAL extends TestBasicWALEntryStream {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
index db3e7fe8cf5..7ab9d3fc5a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java
@@ -26,10 +26,13 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* TestBasicWALEntryStream with {@link FSHLogProvider} as the WAL provider.
*/
+@RunWith(Parameterized.class)
@Category({ ReplicationTests.class, MediumTests.class })
public class TestBasicWALEntryStreamFSHLog extends TestBasicWALEntryStream {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
index 53ee0d720d8..082b42c23b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -42,10 +42,10 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -189,7 +189,8 @@ public class TestRaceWhenCreatingReplicationSource {
@Override
public boolean evaluate() throws Exception {
- try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+ try (WALStreamReader reader =
+ WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) {
return reader.next() != null;
} catch (IOException e) {
return false;
@@ -201,7 +202,8 @@ public class TestRaceWhenCreatingReplicationSource {
return "Replication has not catched up";
}
});
- try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+ try (WALStreamReader reader =
+ WALFactory.createStreamReader(FS, LOG_PATH, UTIL.getConfiguration())) {
Cell cell = reader.next().getEdit().getCells().get(0);
assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
assertArrayEquals(CF, CellUtil.cloneFamily(cell));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 454b8387fa7..35eb343dbff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALStreamReader;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -226,7 +227,8 @@ public class TestReplicationSource {
}
writer.close();
- WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
+ WALStreamReader reader =
+ WALFactory.createStreamReader(FS, logPath, TEST_UTIL.getConfiguration());
WAL.Entry entry = reader.next();
assertNotNull(entry);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
index 3f151acceca..88f4f4539b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java
@@ -18,13 +18,13 @@
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.HasNext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
@@ -74,16 +74,17 @@ public abstract class TestWALEntryStreamDifferentCounts extends WALEntryStreamTe
log.rollWriter();
try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
+ new WALEntryStream(logQueue, fs, CONF, 0, log, new MetricsSource("1"), fakeWalGroupId)) {
int i = 0;
- while (entryStream.hasNext()) {
+ while (entryStream.hasNext() == HasNext.YES) {
assertNotNull(entryStream.next());
i++;
}
assertEquals(nbRows, i);
- // should've read all entries
- assertFalse(entryStream.hasNext());
+ // should've read all entries, and since the last file is still opened for writing so we will
+ // get a RETRY instead of NO here
+ assertEquals(HasNext.RETRY, entryStream.hasNext());
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
index 399923b2892..5f29cb3553c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java
@@ -28,10 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNameTestRule;
-import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -39,10 +36,10 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -59,7 +56,7 @@ public abstract class WALEntryStreamTestBase {
protected static final long TEST_TIMEOUT_MS = 5000;
protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();;
protected static Configuration CONF;
- protected static FileSystem fs;
+ protected static DistributedFileSystem fs;
protected static MiniDFSCluster cluster;
protected static final TableName tableName = TableName.valueOf("tablename");
protected static final byte[] family = Bytes.toBytes("column");
@@ -80,22 +77,30 @@ public abstract class WALEntryStreamTestBase {
* the test code simpler.
*/
protected static class WALEntryStreamWithRetries extends WALEntryStream {
- // Class member to be able to set a non-final from within a lambda.
- private Entry result;
- public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
- long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
- MetricsSource metrics, String walGroupId) throws IOException {
- super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
+ private boolean retry = true;
+
+ public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, FileSystem fs,
+ Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
+ MetricsSource metrics, String walGroupId) {
+ super(logQueue, fs, conf, startPosition, walFileLengthProvider, metrics, walGroupId);
+ }
+
+ public void enableRetry() {
+ retry = true;
+ }
+
+ public void disableRetry() {
+ retry = false;
}
@Override
- public Entry next() {
- Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> {
- result = WALEntryStreamWithRetries.super.next();
- return result != null;
- });
- return result;
+ public HasNext hasNext() {
+ // hasNext is idempotent, so we can call it again and do not need to store its return value
+ if (retry) {
+ TEST_UTIL.waitFor(TEST_TIMEOUT_MS, () -> super.hasNext() == HasNext.YES);
+ }
+ return super.hasNext();
}
}
@@ -146,8 +151,7 @@ public abstract class WALEntryStreamTestBase {
metricsSource.clear();
logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
pathWatcher = new PathWatcher();
- final WALFactory wals =
- new WALFactory(CONF, TableNameTestRule.cleanUpTestName(tn.getMethodName()));
+ final WALFactory wals = new WALFactory(CONF, tn.getMethodName().replaceAll("[\\[:]", "_"));
wals.getWALProvider().addWALActionsListener(pathWatcher);
log = wals.getWAL(info);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
index b5f7dc634c7..cb2e5daa987 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/CompressedWALTestBase.java
@@ -97,7 +97,7 @@ public class CompressedWALTestBase {
wals.shutdown();
// Confirm the WAL can be read back
- try (WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath)) {
+ try (WALStreamReader reader = wals.createStreamReader(TEST_UTIL.getTestFileSystem(), walPath)) {
int count = 0;
WAL.Entry entry = new WAL.Entry();
while (reader.next(entry) != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/NoEOFWALStreamReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/NoEOFWALStreamReader.java
new file mode 100644
index 00000000000..2eb63b99bf8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/NoEOFWALStreamReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A helper class for writing UTs, where we will eat the EOF and return null when reaching EOF, so
+ * in UTs we do not need to deal with partial WAL files if this does not affect the correctness. In
+ * production code you usually you should not do this, as it may cause data loss if you always
+ * ignore the EOFException.
+ */
+public final class NoEOFWALStreamReader implements WALStreamReader {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NoEOFWALStreamReader.class);
+
+ private WALStreamReader reader;
+
+ private NoEOFWALStreamReader(WALStreamReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public Entry next(Entry reuse) throws IOException {
+ try {
+ return reader.next(reuse);
+ } catch (EOFException e) {
+ LOG.warn("Got EOF while reading", e);
+ return null;
+ }
+ }
+
+ @Override
+ public long getPosition() throws IOException {
+ return reader.getPosition();
+ }
+
+ @Override
+ public void close() {
+ reader.close();
+ }
+
+ private int count() throws IOException {
+ int count = 0;
+ Entry entry = new Entry();
+ while (next(entry) != null) {
+ count++;
+ }
+ return count;
+ }
+
+ public static NoEOFWALStreamReader create(FileSystem fs, Path path, Configuration conf)
+ throws IOException {
+ return new NoEOFWALStreamReader(WALFactory.createStreamReader(fs, path, conf));
+ }
+
+ public static NoEOFWALStreamReader create(WALFactory walFactory, FileSystem fs, Path path)
+ throws IOException {
+ return new NoEOFWALStreamReader(walFactory.createStreamReader(fs, path));
+ }
+
+ public static int count(FileSystem fs, Path path, Configuration conf) throws IOException {
+ try (NoEOFWALStreamReader reader = create(fs, path, conf)) {
+ return reader.count();
+ }
+ }
+
+ public static int count(WALFactory walFactory, FileSystem fs, Path path) throws IOException {
+ try (NoEOFWALStreamReader reader = create(walFactory, fs, path)) {
+ return reader.count();
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestParsePartialWALFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestParsePartialWALFile.java
new file mode 100644
index 00000000000..ae6db543537
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestParsePartialWALFile.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * In this test, we write a small WAL file first, and then generate partial WAL file which length is
+ * in range [0, fileLength)(we test all the possible length in the range), to see if we can
+ * successfully get the completed entries, and also get an EOF at the end.
+ * <p/>
+ * It is very important to make sure 3 things:
+ * <ul>
+ * <li>We do not get incorrect entries. Otherwise there will be data corruption.</li>
+ * <li>We can get all the completed entries, i.e, we do not miss some data. Otherwise there will be
+ * data loss.</li>
+ * <li>We will get an EOF finally, instead of a general IOException. Otherwise the split or
+ * replication will be stuck.</li>
+ * </ul>
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestParsePartialWALFile {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestParsePartialWALFile.class);
+
+ private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+ private static FileSystem FS;
+
+ private static TableName TN = TableName.valueOf("test");
+ private static RegionInfo RI = RegionInfoBuilder.newBuilder(TN).build();
+ private static byte[] ROW = Bytes.toBytes("row");
+ private static byte[] FAMILY = Bytes.toBytes("family");
+ private static byte[] QUAL = Bytes.toBytes("qualifier");
+ private static byte[] VALUE = Bytes.toBytes("value");
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
+ FS = FileSystem.getLocal(UTIL.getConfiguration());
+ if (!FS.mkdirs(UTIL.getDataTestDir())) {
+ throw new IOException("can not create " + UTIL.getDataTestDir());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ UTIL.cleanupTestDir();
+ }
+
+ private Path generateBrokenWALFile(byte[] content, int length) throws IOException {
+ Path walFile = UTIL.getDataTestDir("wal-" + length);
+ try (FSDataOutputStream out = FS.create(walFile)) {
+ out.write(content, 0, length);
+ }
+ return walFile;
+ }
+
+ private void assertEntryEquals(WAL.Entry entry, int index) {
+ WALKeyImpl key = entry.getKey();
+ assertEquals(TN, key.getTableName());
+ assertArrayEquals(RI.getEncodedNameAsBytes(), key.getEncodedRegionName());
+ WALEdit edit = entry.getEdit();
+ assertEquals(1, edit.getCells().size());
+ Cell cell = edit.getCells().get(0);
+ assertArrayEquals(ROW, CellUtil.cloneRow(cell));
+ assertArrayEquals(FAMILY, CellUtil.cloneFamily(cell));
+ if (index % 2 == 0) {
+ assertEquals(Type.Put, cell.getType());
+ assertArrayEquals(QUAL, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(VALUE, CellUtil.cloneValue(cell));
+ } else {
+ assertEquals(Type.DeleteFamily, cell.getType());
+ }
+ }
+
+ private void testReadEntry(Path file, int entryCount) throws IOException {
+ try (
+ WALStreamReader reader = WALFactory.createStreamReader(FS, file, UTIL.getConfiguration())) {
+ for (int i = 0; i < entryCount; i++) {
+ assertEntryEquals(reader.next(), i);
+ }
+ assertThrows(EOFException.class, () -> reader.next());
+ }
+ try (WALTailingReader reader =
+ WALFactory.createTailingReader(FS, file, UTIL.getConfiguration(), -1)) {
+ for (int i = 0; i < entryCount; i++) {
+ WALTailingReader.Result result = reader.next(-1);
+ assertEquals(WALTailingReader.State.NORMAL, result.getState());
+ assertEntryEquals(result.getEntry(), i);
+ }
+ WALTailingReader.Result result = reader.next(-1);
+ assertEquals(WALTailingReader.State.EOF_AND_RESET, result.getState());
+ }
+ }
+
+ @Test
+ public void testPartialParse() throws Exception {
+ Path walFile = UTIL.getDataTestDir("wal");
+ long headerLength;
+ List<Long> endOffsets = new ArrayList<>();
+ try (WALProvider.Writer writer =
+ WALFactory.createWALWriter(FS, walFile, UTIL.getConfiguration())) {
+ headerLength = writer.getLength();
+ for (int i = 0; i < 3; i++) {
+ WALKeyImpl key = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, i,
+ EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
+ WALEdit edit = new WALEdit();
+ if (i % 2 == 0) {
+ edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
+ .setRow(ROW).setFamily(FAMILY).setQualifier(QUAL).setValue(VALUE).build());
+ } else {
+ edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setType(Type.DeleteFamily).setRow(ROW).setFamily(FAMILY).build());
+ }
+ writer.append(new WAL.Entry(key, edit));
+ writer.sync(true);
+ endOffsets.add(writer.getLength());
+ }
+ }
+ long fileLength = FS.getFileStatus(walFile).getLen();
+ byte[] content = new byte[(int) fileLength];
+ try (FSDataInputStream in = FS.open(walFile)) {
+ in.readFully(content);
+ }
+ // partial header, should throw WALHeaderEOFException
+ for (int i = 0; i < headerLength; i++) {
+ Path brokenFile = generateBrokenWALFile(content, i);
+ assertThrows(WALHeaderEOFException.class,
+ () -> WALFactory.createStreamReader(FS, brokenFile, UTIL.getConfiguration()));
+ assertThrows(WALHeaderEOFException.class,
+ () -> WALFactory.createTailingReader(FS, brokenFile, UTIL.getConfiguration(), -1));
+ FS.delete(brokenFile, false);
+ }
+ // partial WAL entries, should be able to read some entries and the last one we will get an EOF
+ for (int i = 0; i <= endOffsets.size(); i++) {
+ int startOffset;
+ int endOffset;
+ if (i == 0) {
+ startOffset = (int) headerLength;
+ endOffset = endOffsets.get(i).intValue();
+ } else if (i == endOffsets.size()) {
+ startOffset = endOffsets.get(i - 1).intValue();
+ endOffset = (int) fileLength;
+ } else {
+ startOffset = endOffsets.get(i - 1).intValue();
+ endOffset = endOffsets.get(i).intValue();
+ }
+ for (int j = startOffset; j < endOffset; j++) {
+ Path brokenFile = generateBrokenWALFile(content, j);
+ testReadEntry(brokenFile, i);
+ FS.delete(brokenFile, false);
+ }
+ }
+ // partial trailer, should be able to read all the entries but get an EOF when trying read
+ // again, as we do not know it is a trailer
+ for (int i = endOffsets.get(endOffsets.size() - 1).intValue(); i < fileLength; i++) {
+ Path brokenFile = generateBrokenWALFile(content, i);
+ testReadEntry(brokenFile, endOffsets.size());
+ FS.delete(brokenFile, false);
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
index 3b784427b54..67b03fb103c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -87,8 +86,6 @@ public class TestSecureWAL {
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- WAL.Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
WALProvider.Writer.class);
conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
@@ -146,23 +143,23 @@ public class TestSecureWAL {
assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
// Confirm the WAL can be read back
- WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
int count = 0;
- WAL.Entry entry = new WAL.Entry();
- while (reader.next(entry) != null) {
- count++;
- List<Cell> cells = entry.getEdit().getCells();
- assertTrue("Should be one KV per WALEdit", cells.size() == 1);
- for (Cell cell : cells) {
- assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
- cell.getRowLength(), row, 0, row.length));
- assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
- cell.getFamilyLength(), family, 0, family.length));
- assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
- cell.getValueLength(), value, 0, value.length));
+ try (WALStreamReader reader = wals.createStreamReader(TEST_UTIL.getTestFileSystem(), walPath)) {
+ WAL.Entry entry = new WAL.Entry();
+ while (reader.next(entry) != null) {
+ count++;
+ List<Cell> cells = entry.getEdit().getCells();
+ assertTrue("Should be one KV per WALEdit", cells.size() == 1);
+ for (Cell cell : cells) {
+ assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength(), row, 0, row.length));
+ assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength(), family, 0, family.length));
+ assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
+ cell.getValueLength(), value, 0, value.length));
+ }
}
+ assertEquals("Should have read back as many KVs as written", total, count);
}
- assertEquals("Should have read back as many KVs as written", total, count);
- reader.close();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index d519d3720e5..1afdc6e0fa7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -261,91 +262,63 @@ public class TestWALFactory {
in.close();
final int total = 20;
- WAL.Reader reader = null;
+ RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
+ NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ scopes.put(tableName.getName(), 0);
+ final WAL wal = wals.getWAL(info);
- try {
- RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
- NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- scopes.put(tableName.getName(), 0);
- final WAL wal = wals.getWAL(info);
-
- for (int i = 0; i < total; i++) {
- WALEdit kvs = new WALEdit();
- kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
- wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
- EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
- }
- // Now call sync and try reading. Opening a Reader before you sync just
- // gives you EOFE.
- wal.sync();
- // Open a Reader.
- Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
- reader = wals.createReader(fs, walPath);
- int count = 0;
- WAL.Entry entry = new WAL.Entry();
- while ((entry = reader.next(entry)) != null)
- count++;
- assertEquals(total, count);
- reader.close();
- // Add test that checks to see that an open of a Reader works on a file
- // that has had a sync done on it.
- for (int i = 0; i < total; i++) {
- WALEdit kvs = new WALEdit();
- kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
- wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
- EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
- }
- wal.sync();
- reader = wals.createReader(fs, walPath);
- count = 0;
- while ((entry = reader.next(entry)) != null)
- count++;
- assertTrue(count >= total);
- reader.close();
- // If I sync, should see double the edits.
- wal.sync();
- reader = wals.createReader(fs, walPath);
- count = 0;
- while ((entry = reader.next(entry)) != null)
- count++;
- assertEquals(total * 2, count);
- reader.close();
- // Now do a test that ensures stuff works when we go over block boundary,
- // especially that we return good length on file.
- final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
- for (int i = 0; i < total; i++) {
- WALEdit kvs = new WALEdit();
- kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
- wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
- EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
- }
- // Now I should have written out lots of blocks. Sync then read.
- wal.sync();
- reader = wals.createReader(fs, walPath);
- count = 0;
- while ((entry = reader.next(entry)) != null)
- count++;
- assertEquals(total * 3, count);
- reader.close();
- // shutdown and ensure that Reader gets right length also.
- wal.shutdown();
- reader = wals.createReader(fs, walPath);
- count = 0;
- while ((entry = reader.next(entry)) != null)
- count++;
- assertEquals(total * 3, count);
- reader.close();
- } finally {
- if (reader != null) reader.close();
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
+ wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+ EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
}
+ // Now call sync and try reading. Opening a Reader before you sync just
+ // gives you EOFE.
+ wal.sync();
+ // Open a Reader.
+ Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
+ int count = NoEOFWALStreamReader.count(wals, fs, walPath);
+ assertEquals(total, count);
+ // Add test that checks to see that an open of a Reader works on a file
+ // that has had a sync done on it.
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
+ wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+ EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
+ }
+ wal.sync();
+ count = NoEOFWALStreamReader.count(wals, fs, walPath);
+ assertTrue(count >= total);
+ // If I sync, should see double the edits.
+ wal.sync();
+ count = NoEOFWALStreamReader.count(wals, fs, walPath);
+ assertEquals(total * 2, count);
+ // Now do a test that ensures stuff works when we go over block boundary,
+ // especially that we return good length on file.
+ final byte[] value = new byte[1025 * 1024]; // Make a 1M value.
+ for (int i = 0; i < total; i++) {
+ WALEdit kvs = new WALEdit();
+ kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
+ wal.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
+ EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
+ }
+ // Now I should have written out lots of blocks. Sync then read.
+ wal.sync();
+ count = NoEOFWALStreamReader.count(wals, fs, walPath);
+ assertEquals(total * 3, count);
+ // shutdown and ensure that Reader gets right length also.
+ wal.shutdown();
+ count = NoEOFWALStreamReader.count(wals, fs, walPath);
+ assertEquals(total * 3, count);
}
private void verifySplits(final List<Path> splits, final int howmany) throws IOException {
assertEquals(howmany * howmany, splits.size());
for (int i = 0; i < splits.size(); i++) {
LOG.info("Verifying=" + splits.get(i));
- WAL.Reader reader = wals.createReader(fs, splits.get(i));
- try {
+ try (WALStreamReader reader = wals.createStreamReader(fs, splits.get(i))) {
int count = 0;
String previousRegion = null;
long seqno = -1;
@@ -364,8 +337,6 @@ public class TestWALFactory {
count++;
}
assertEquals(howmany, count);
- } finally {
- reader.close();
}
}
}
@@ -475,15 +446,15 @@ public class TestWALFactory {
if (t.exception != null) throw t.exception;
// Make sure you can read all the content
- WAL.Reader reader = wals.createReader(fs, walPath);
int count = 0;
- WAL.Entry entry = new WAL.Entry();
- while (reader.next(entry) != null) {
- count++;
- assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1);
+ try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, walPath)) {
+ WAL.Entry entry = new WAL.Entry();
+ while (reader.next(entry) != null) {
+ count++;
+ assertTrue("Should be one KeyValue per WALEdit", entry.getEdit().getCells().size() == 1);
+ }
}
assertEquals(total, count);
- reader.close();
// Reset the lease period
setLeasePeriod.invoke(cluster, new Object[] { 60000L, 3600000L });
@@ -503,31 +474,29 @@ public class TestWALFactory {
scopes.put(fam, 0);
}
byte[] row = Bytes.toBytes("row");
- WAL.Reader reader = null;
- try {
- final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
- // Write columns named 1, 2, 3, etc. and then values of single byte
- // 1, 2, 3...
- long timestamp = EnvironmentEdgeManager.currentTime();
- WALEdit cols = new WALEdit();
- for (int i = 0; i < colCount; i++) {
- cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
- timestamp, new byte[] { (byte) (i + '0') }));
- }
- RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
- .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
- final WAL log = wals.getWAL(info);
-
- final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
- htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
- log.sync(txid);
- log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
- log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
- log.shutdown();
- Path filename = AbstractFSWALProvider.getCurrentFileName(log);
- // Now open a reader on the log and assert append worked.
- reader = wals.createReader(fs, filename);
+ // Write columns named 1, 2, 3, etc. and then values of single byte
+ // 1, 2, 3...
+ long timestamp = EnvironmentEdgeManager.currentTime();
+ WALEdit cols = new WALEdit();
+ for (int i = 0; i < colCount; i++) {
+ cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
+ timestamp, new byte[] { (byte) (i + '0') }));
+ }
+ RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(row)
+ .setEndKey(Bytes.toBytes(Bytes.toString(row) + "1")).build();
+ final WAL log = wals.getWAL(info);
+
+ final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(),
+ htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
+ log.sync(txid);
+ log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+ log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+ log.shutdown();
+ Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+ // Now open a reader on the log and assert append worked.
+ try (NoEOFWALStreamReader reader = NoEOFWALStreamReader.create(wals, fs, filename)) {
// Above we added all columns on a single row so we only read one
// entry in the below... thats why we have '1'.
for (int i = 0; i < 1; i++) {
@@ -541,11 +510,7 @@ public class TestWALFactory {
assertTrue(Bytes.equals(row, 0, row.length, cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength()));
assertEquals((byte) (i + '0'), CellUtil.cloneValue(cell)[0]);
- System.out.println(key + " " + val);
- }
- } finally {
- if (reader != null) {
- reader.close();
+ LOG.info(key + " " + val);
}
}
}
@@ -561,28 +526,26 @@ public class TestWALFactory {
scopes.put(fam, 0);
}
byte[] row = Bytes.toBytes("row");
- WAL.Reader reader = null;
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
- try {
- // Write columns named 1, 2, 3, etc. and then values of single byte
- // 1, 2, 3...
- long timestamp = EnvironmentEdgeManager.currentTime();
- WALEdit cols = new WALEdit();
- for (int i = 0; i < colCount; i++) {
- cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
- timestamp, new byte[] { (byte) (i + '0') }));
- }
- RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
- final WAL log = wals.getWAL(hri);
- final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
- htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
- log.sync(txid);
- log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
- log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
- log.shutdown();
- Path filename = AbstractFSWALProvider.getCurrentFileName(log);
- // Now open a reader on the log and assert append worked.
- reader = wals.createReader(fs, filename);
+ // Write columns named 1, 2, 3, etc. and then values of single byte
+ // 1, 2, 3...
+ long timestamp = EnvironmentEdgeManager.currentTime();
+ WALEdit cols = new WALEdit();
+ for (int i = 0; i < colCount; i++) {
+ cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)),
+ timestamp, new byte[] { (byte) (i + '0') }));
+ }
+ RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+ final WAL log = wals.getWAL(hri);
+ final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
+ htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
+ log.sync(txid);
+ log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+ log.shutdown();
+ Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+ // Now open a reader on the log and assert append worked.
+ try (WALStreamReader reader = wals.createStreamReader(fs, filename)) {
WAL.Entry entry = reader.next();
assertEquals(colCount, entry.getEdit().size());
int idx = 0;
@@ -596,10 +559,6 @@ public class TestWALFactory {
System.out.println(entry.getKey() + " " + val);
idx++;
}
- } finally {
- if (reader != null) {
- reader.close();
- }
}
}
@@ -771,45 +730,34 @@ public class TestWALFactory {
scopes.put(fam, 0);
}
byte[] row = Bytes.toBytes("row");
- WAL.Reader reader = null;
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
- try {
- // Write one column in one edit.
- WALEdit cols = new WALEdit();
- cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
- EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
- final WAL log = customFactory.getWAL(hri);
- final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
- htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
- // Sync the edit to the WAL
- log.sync(txid);
- log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
- log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
- log.shutdown();
-
- // Inject our failure, object is constructed via reflection.
- BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
-
- // Now open a reader on the log which will throw an exception when
- // we try to instantiate the custom Codec.
- Path filename = AbstractFSWALProvider.getCurrentFileName(log);
- try {
- reader = customFactory.createReader(proxyFs, filename);
- fail("Expected to see an exception when creating WAL reader");
- } catch (Exception e) {
- // Expected that we get an exception
- }
- // We should have exactly one reader
- assertEquals(1, openedReaders.size());
- // And that reader should be closed.
- long unclosedReaders =
- openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
- assertEquals("Should not find any open readers", 0, (int) unclosedReaders);
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
+ // Write one column in one edit.
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes("0"),
+ EnvironmentEdgeManager.currentTime(), new byte[] { 0 }));
+ final WAL log = customFactory.getWAL(hri);
+ final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
+ htd.getTableName(), EnvironmentEdgeManager.currentTime(), mvcc, scopes), cols);
+ // Sync the edit to the WAL
+ log.sync(txid);
+ log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+ log.shutdown();
+
+ // Inject our failure, object is constructed via reflection.
+ BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
+
+ // Now open a reader on the log which will throw an exception when
+ // we try to instantiate the custom Codec.
+ Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+ assertThrows("Expected to see an exception when creating WAL reader", IOException.class,
+ () -> customFactory.createStreamReader(proxyFs, filename));
+ // We should have exactly one reader
+ assertEquals(1, openedReaders.size());
+ // And that reader should be closed.
+ long unclosedReaders =
+ openedReaders.stream().filter((r) -> !r.isClosed.get()).collect(Collectors.counting());
+ assertEquals("Should not find any open readers", 0, unclosedReaders);
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
index 3067164221f..096ebe020a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
@@ -121,8 +121,8 @@ public class TestWALOpenAfterDNRollingStart {
currentFile = new Path(oldLogDir, currentFile.getName());
}
// if the log is not rolled, then we can never open this wal forever.
- try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile,
- TEST_UTIL.getConfiguration())) {
+ try (WALStreamReader reader = WALFactory.createStreamReader(TEST_UTIL.getTestFileSystem(),
+ currentFile, TEST_UTIL.getConfiguration())) {
reader.next();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
deleted file mode 100644
index 1ac0620ed54..00000000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.wal;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ByteBufferKeyValue;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
-import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureAsyncProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.SecureWALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-/**
- * Test that verifies WAL written by SecureProtobufLogWriter is not readable by ProtobufLogReader
- */
-@Category({ RegionServerTests.class, SmallTests.class })
-public class TestWALReaderOnSecureWAL {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestWALReaderOnSecureWAL.class);
-
- static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- final byte[] value = Bytes.toBytes("Test value");
-
- private static final String WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
-
- @Rule
- public TestName currentTest = new TestName();
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
- conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
- conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
- CommonFSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
- }
-
- private Path writeWAL(final WALFactory wals, final String tblName, boolean offheap)
- throws IOException {
- Configuration conf = TEST_UTIL.getConfiguration();
- String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
- conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, SecureWALCellCodec.class,
- WALCellCodec.class);
- try {
- TableName tableName = TableName.valueOf(tblName);
- NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- scopes.put(tableName.getName(), 0);
- RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
- final int total = 10;
- final byte[] row = Bytes.toBytes("row");
- final byte[] family = Bytes.toBytes("family");
- final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
-
- // Write the WAL
- WAL wal = wals.getWAL(regionInfo);
- for (int i = 0; i < total; i++) {
- WALEdit kvs = new WALEdit();
- KeyValue kv = new KeyValue(row, family, Bytes.toBytes(i), value);
- if (offheap) {
- ByteBuffer bb = ByteBuffer.allocateDirect(kv.getBuffer().length);
- bb.put(kv.getBuffer());
- ByteBufferKeyValue offheapKV = new ByteBufferKeyValue(bb, 0, kv.getLength());
- kvs.add(offheapKV);
- } else {
- kvs.add(kv);
- }
- wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
- EnvironmentEdgeManager.currentTime(), mvcc, scopes), kvs);
- }
- wal.sync();
- final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
- wal.shutdown();
-
- return walPath;
- } finally {
- // restore the cell codec class
- conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName);
- }
- }
-
- @Test()
- public void testWALReaderOnSecureWALWithKeyValues() throws Exception {
- testSecureWALInternal(false);
- }
-
- @Test()
- public void testWALReaderOnSecureWALWithOffheapKeyValues() throws Exception {
- testSecureWALInternal(true);
- }
-
- private void testSecureWALInternal(boolean offheap) throws IOException, FileNotFoundException {
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, WAL.Reader.class);
- conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
- WALProvider.Writer.class);
- conf.setClass("hbase.regionserver.hlog.async.writer.impl", SecureAsyncProtobufLogWriter.class,
- WALProvider.AsyncWriter.class);
- conf.setBoolean(WAL_ENCRYPTION, true);
- FileSystem fs = TEST_UTIL.getTestFileSystem();
- final WALFactory wals = new WALFactory(conf, currentTest.getMethodName());
- Path walPath = writeWAL(wals, currentTest.getMethodName(), offheap);
-
- // Insure edits are not plaintext
- long length = fs.getFileStatus(walPath).getLen();
- FSDataInputStream in = fs.open(walPath);
- byte[] fileData = new byte[(int) length];
- IOUtils.readFully(in, fileData);
- in.close();
- assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
-
- // Confirm the WAL cannot be read back by ProtobufLogReader
- try {
- wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
- assertFalse(true);
- } catch (IOException ioe) {
- System.out.println("Expected ioe " + ioe.getMessage());
- }
-
- FileStatus[] listStatus = fs.listStatus(walPath.getParent());
- Path rootdir = CommonFSUtils.getRootDir(conf);
- WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
- WALSplitter.SplitWALResult swr = s.splitWAL(listStatus[0], null);
- assertTrue(swr.isCorrupt());
- wals.close();
- }
-
- @Test()
- public void testSecureWALReaderOnWAL() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- WAL.Reader.class);
- conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class,
- WALProvider.Writer.class);
- conf.setBoolean(WAL_ENCRYPTION, false);
- FileSystem fs = TEST_UTIL.getTestFileSystem();
- final WALFactory wals = new WALFactory(conf,
- ServerName.valueOf(currentTest.getMethodName(), 16010, EnvironmentEdgeManager.currentTime())
- .toString());
- Path walPath = writeWAL(wals, currentTest.getMethodName(), false);
-
- // Ensure edits are plaintext
- long length = fs.getFileStatus(walPath).getLen();
- FSDataInputStream in = fs.open(walPath);
- byte[] fileData = new byte[(int) length];
- IOUtils.readFully(in, fileData);
- in.close();
- assertTrue("Cells should be plaintext", Bytes.contains(fileData, value));
-
- // Confirm the WAL can be read back by SecureProtobufLogReader
- try {
- WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
- reader.close();
- } catch (IOException ioe) {
- assertFalse(true);
- }
-
- FileStatus[] listStatus = fs.listStatus(walPath.getParent());
- Path rootdir = CommonFSUtils.getRootDir(conf);
- try {
- WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
- s.splitWAL(listStatus[0], null);
- Path file =
- new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt");
- assertTrue(!fs.exists(file));
- } catch (IOException ioe) {
- assertTrue("WAL should have been processed", false);
- }
- wals.close();
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index 5ee699e7ce4..0c4387c87b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -66,9 +66,9 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
+import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -79,7 +79,6 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -606,10 +605,11 @@ public class TestWALSplit {
@Test
public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
- List<FaultyProtobufLogReader.FailureType> failureTypes =
- Arrays.asList(FaultyProtobufLogReader.FailureType.values()).stream()
- .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
- for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
+ List<FaultyProtobufWALStreamReader.FailureType> failureTypes =
+ Arrays.asList(FaultyProtobufWALStreamReader.FailureType.values()).stream()
+ .filter(x -> x != FaultyProtobufWALStreamReader.FailureType.NONE)
+ .collect(Collectors.toList());
+ for (FaultyProtobufWALStreamReader.FailureType failureType : failureTypes) {
final Set<String> walDirContents = splitCorruptWALs(failureType);
final Set<String> archivedLogs = new HashSet<>();
final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
@@ -627,14 +627,14 @@ public class TestWALSplit {
* @return set of wal names present prior to split attempt.
* @throws IOException if the split process fails
*/
- private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
+ private Set<String> splitCorruptWALs(final FaultyProtobufWALStreamReader.FailureType failureType)
throws IOException {
- Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl", Reader.class);
+ String backupClass = conf.get(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
InstrumentedLogWriter.activateFailure = false;
try {
- conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
- Reader.class);
+ conf.setClass(WALFactory.WAL_STREAM_READER_CLASS_IMPL, FaultyProtobufWALStreamReader.class,
+ WALStreamReader.class);
conf.set("faultyprotobuflogreader.failuretype", failureType.name());
// Clean up from previous tests or previous loop
try {
@@ -663,21 +663,25 @@ public class TestWALSplit {
WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
return walDirContents;
} finally {
- conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass, Reader.class);
+ if (backupClass != null) {
+ conf.set(WALFactory.WAL_STREAM_READER_CLASS_IMPL, backupClass);
+ } else {
+ conf.unset(WALFactory.WAL_STREAM_READER_CLASS_IMPL);
+ }
}
}
@Test(expected = IOException.class)
public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
- splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
+ splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
}
@Test
public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
try {
- splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
+ splitCorruptWALs(FaultyProtobufWALStreamReader.FailureType.BEGINNING);
} catch (IOException e) {
LOG.debug("split with 'skip errors' set to 'false' correctly threw");
}
@@ -704,13 +708,12 @@ public class TestWALSplit {
assertEquals(1, splitLog.length);
int actualCount = 0;
- Reader in = wals.createReader(fs, splitLog[0]);
- @SuppressWarnings("unused")
- Entry entry;
- while ((entry = in.next()) != null)
- ++actualCount;
+ try (WALStreamReader in = wals.createStreamReader(fs, splitLog[0])) {
+ while (in.next() != null) {
+ ++actualCount;
+ }
+ }
assertEquals(expectedCount, actualCount);
- in.close();
// should not have stored the EOF files as corrupt
FileStatus[] archivedLogs =
@@ -1047,15 +1050,17 @@ public class TestWALSplit {
/* Produce a mock reader that generates fake entries */
@Override
- protected Reader getReader(FileStatus file, boolean skipErrors,
+ protected WALStreamReader getReader(FileStatus file, boolean skipErrors,
CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
- Reader mockReader = Mockito.mock(Reader.class);
+ WALStreamReader mockReader = Mockito.mock(WALStreamReader.class);
Mockito.doAnswer(new Answer<Entry>() {
int index = 0;
@Override
public Entry answer(InvocationOnMock invocation) throws Throwable {
- if (index >= numFakeEdits) return null;
+ if (index >= numFakeEdits) {
+ return null;
+ }
// Generate r0 through r4 in round robin fashion
int regionIdx = index % regions.size();
@@ -1354,8 +1359,8 @@ public class TestWALSplit {
case TRUNCATE:
fs.delete(path, false);
out = fs.create(path);
- out.write(corrupted_bytes, 0,
- fileSize - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
+ out.write(corrupted_bytes, 0, fileSize
+ - (32 + AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
closeOrFlush(close, out);
break;
@@ -1394,11 +1399,11 @@ public class TestWALSplit {
private int countWAL(Path log) throws IOException {
int count = 0;
- Reader in = wals.createReader(fs, log);
- while (in.next() != null) {
- count++;
+ try (WALStreamReader in = wals.createStreamReader(fs, log)) {
+ while (in.next() != null) {
+ count++;
+ }
}
- in.close();
return count;
}
@@ -1475,7 +1480,8 @@ public class TestWALSplit {
}
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
- try (Reader in1 = wals.createReader(fs, p1); Reader in2 = wals.createReader(fs, p2)) {
+ try (WALStreamReader in1 = wals.createStreamReader(fs, p1);
+ WALStreamReader in2 = wals.createStreamReader(fs, p2)) {
Entry entry1;
Entry entry2;
while ((entry1 = in1.next()) != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index ad0201ea891..061633f7e21 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.SecureProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.trace.TraceUtil;
@@ -258,8 +257,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
Configuration conf = getConf();
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
- conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class,
- WAL.Reader.class);
conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class,
Writer.class);
conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true);
@@ -377,7 +374,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
*/
private long verify(final WALFactory wals, final Path wal, final boolean verbose)
throws IOException {
- WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
+ WALStreamReader reader = wals.createStreamReader(wal.getFileSystem(getConf()), wal);
long count = 0;
Map<String, Long> sequenceIds = new HashMap<>();
try {