You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/02/11 12:21:15 UTC

[hbase] branch branch-2 updated (4ac84b8aa0c -> 1fb311f22a5)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


    from 4ac84b8aa0c HBASE-27626 Suppress noisy logging in client.ConnectionImplementation (#5019)
     new 7402ebb3a0d HBASE-27602 Remove the impact of operating env on testHFileCleaning (#5003)
     new 1fb311f22a5 HBASE-27621 Also clear the Dictionary when resetting when reading compressed WAL file (#5016)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/hbase/io/TagCompressionContext.java     |   4 +-
 .../apache/hadoop/hbase/io/util/StreamUtils.java   |  17 ++
 .../hadoop/hbase/regionserver/wal/Compressor.java  |   4 +-
 .../hbase/regionserver/wal/ProtobufLogReader.java  |  16 ++
 .../hadoop/hbase/regionserver/wal/ReaderBase.java  |  13 +-
 .../hbase/regionserver/wal/WALCellCodec.java       |  12 +-
 .../hbase/master/cleaner/TestHFileCleaner.java     |   6 +-
 .../TestWALEntryStreamCompressionReset.java        | 251 +++++++++++++++++++++
 8 files changed, 312 insertions(+), 11 deletions(-)
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java


[hbase] 01/02: HBASE-27602 Remove the impact of operating env on testHFileCleaning (#5003)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7402ebb3a0d759fae3fab0a1b43f247d2afccb88
Author: tianhang <ta...@gmail.com>
AuthorDate: Sat Feb 11 19:39:03 2023 +0800

    HBASE-27602 Remove the impact of operating env on testHFileCleaning (#5003)
    
    Co-authored-by: tianhang.tang <ti...@shopee.com>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    (cherry picked from commit e71253f4d8db0f808b4a2d27f66fca37b1ad0632)
---
 .../org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
index 68ad6b1d0b9..4f50b472e6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java
@@ -71,6 +71,9 @@ public class TestHFileCleaner {
 
   private static DirScanPool POOL;
 
+  private static String MOCK_ARCHIVED_HFILE_DIR =
+    HConstants.HFILE_ARCHIVE_DIRECTORY + "/namespace/table/region";
+
   @BeforeClass
   public static void setupCluster() throws Exception {
     // have to use a minidfs cluster because the localfs doesn't modify file times correctly
@@ -162,8 +165,7 @@ public class TestHFileCleaner {
         + "org.apache.hadoop.hbase.mob.ManualMobMaintHFileCleaner");
     conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl);
     Server server = new DummyServer();
-    Path archivedHfileDir =
-      new Path(UTIL.getDataTestDirOnTestFS(), HConstants.HFILE_ARCHIVE_DIRECTORY);
+    Path archivedHfileDir = new Path(UTIL.getDataTestDirOnTestFS(), MOCK_ARCHIVED_HFILE_DIR);
     FileSystem fs = FileSystem.get(conf);
     HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archivedHfileDir, POOL);
 


[hbase] 02/02: HBASE-27621 Also clear the Dictionary when resetting when reading compressed WAL file (#5016)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 1fb311f22a5cc464bf0988118eb73617bbf9dfd7
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Feb 11 19:34:17 2023 +0800

    HBASE-27621 Also clear the Dictionary when resetting when reading compressed WAL file (#5016)
    
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
    (cherry picked from commit 833b10e8bab7c28457caa854c0f714b489f88fa3)
---
 .../hadoop/hbase/io/TagCompressionContext.java     |   4 +-
 .../apache/hadoop/hbase/io/util/StreamUtils.java   |  17 ++
 .../hadoop/hbase/regionserver/wal/Compressor.java  |   4 +-
 .../hbase/regionserver/wal/ProtobufLogReader.java  |  16 ++
 .../hadoop/hbase/regionserver/wal/ReaderBase.java  |  13 +-
 .../hbase/regionserver/wal/WALCellCodec.java       |  12 +-
 .../TestWALEntryStreamCompressionReset.java        | 251 +++++++++++++++++++++
 7 files changed, 308 insertions(+), 9 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
index 74b0f2db108..f938fdaab35 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TagCompressionContext.java
@@ -107,7 +107,7 @@ public class TagCompressionContext {
     throws IOException {
     int endOffset = offset + length;
     while (offset < endOffset) {
-      byte status = (byte) src.read();
+      byte status = StreamUtils.readByte(src);
       if (status == Dictionary.NOT_IN_DICTIONARY) {
         int tagLen = StreamUtils.readRawVarint32(src);
         offset = Bytes.putAsShort(dest, offset, tagLen);
@@ -115,7 +115,7 @@ public class TagCompressionContext {
         tagDict.addEntry(dest, offset, tagLen);
         offset += tagLen;
       } else {
-        short dictIdx = StreamUtils.toShort(status, (byte) src.read());
+        short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
         byte[] entry = tagDict.getEntry(dictIdx);
         if (entry == null) {
           throw new IOException("Missing dictionary entry for index " + dictIdx);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
index 97e1e9d3345..0bda535d6b1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.util;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -206,6 +207,22 @@ public class StreamUtils {
     return new Pair<>(result, newOffset - offset);
   }
 
+  /**
+   * Read a byte from the given stream using the read method, and throw EOFException if it returns
+   * -1, like the implementation in {@code DataInputStream}.
+   * <p/>
+   * This is useful because casting the return value of read method into byte directly will make us
+   * lose the ability to check whether there is a byte and its value is -1 or we reach EOF, as
+   * casting int -1 to byte also returns -1.
+   */
+  public static byte readByte(InputStream in) throws IOException {
+    int r = in.read();
+    if (r < 0) {
+      throw new EOFException();
+    }
+    return (byte) r;
+  }
+
   public static short toShort(byte hi, byte lo) {
     short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
     Preconditions.checkArgument(s >= 0);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
index d283a19e45f..bed31530d87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java
@@ -108,7 +108,9 @@ public class Compressor {
       // if this isn't in the dictionary, we need to add to the dictionary.
       byte[] arr = new byte[length];
       in.readFully(arr);
-      if (dict != null) dict.addEntry(arr, 0, length);
+      if (dict != null) {
+        dict.addEntry(arr, 0, length);
+      }
       return arr;
     } else {
       // Status here is the higher-order byte of index of the dictionary entry
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 12f0efc5728..2e3fb4f5b5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -98,6 +98,9 @@ public class ProtobufLogReader extends ReaderBase {
   // cell codec classname
   private String codecClsName = null;
 
+  // a flag indicate that whether we need to reset compression context when seeking back
+  private boolean resetCompression;
+
   @InterfaceAudience.Private
   public long trailerSize() {
     if (trailerPresent) {
@@ -160,6 +163,9 @@ public class ProtobufLogReader extends ReaderBase {
   @Override
   public void reset() throws IOException {
     String clsName = initInternal(null, false);
+    if (resetCompression) {
+      resetCompression();
+    }
     initAfterCompression(clsName); // We need a new decoder (at least).
   }
 
@@ -361,6 +367,8 @@ public class ProtobufLogReader extends ReaderBase {
     WALKey.Builder builder = WALKey.newBuilder();
     long size = 0;
     boolean resetPosition = false;
+    // by default, we should reset the compression when seeking back after reading something
+    resetCompression = true;
     try {
       long available = -1;
       try {
@@ -372,6 +380,14 @@ public class ProtobufLogReader extends ReaderBase {
         // available may be < 0 on local fs for instance. If so, can't depend on it.
         available = this.inputStream.available();
         if (available > 0 && available < size) {
+          // if we quit here, we have just read the length, no actual data yet, which means we
+          // haven't put anything into the compression dictionary yet, so when seeking back to the
+          // last good position, we do not need to reset compression context.
+          // This is very useful for saving the extra effort for reconstructing the compression
+          // dictionary, where we need to read from the beginning instead of just seek to the
+          // position, as DFSInputStream implement the available method, so in most cases we will
+          // reach here if there are not enough data.
+          resetCompression = false;
           throw new EOFException("Available stream not enough for edit, "
             + "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
             + size + " at offset = " + this.inputStream.getPos());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 5e14c475ae3..5caceeac09b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -45,7 +45,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
    * Compression context to use reading. Can be null if no compression.
    */
   protected CompressionContext compressionContext = null;
-  protected boolean emptyCompressionContext = true;
+  private boolean emptyCompressionContext = true;
 
   /**
    * Default constructor.
@@ -130,6 +130,17 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
     seekOnFs(pos);
   }
 
+  /**
+   * Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext}
+   * to true, so when seeking, we will try to skip to the position and reconstruct the dictionary.
+   */
+  protected final void resetCompression() {
+    if (compressionContext != null) {
+      compressionContext.clear();
+      emptyCompressionContext = true;
+    }
+  }
+
   /**
    * Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
    * the stream if not null and may use it. Called once.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 5b60b10e128..816ce3ed45a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -197,18 +197,20 @@ public class WALCellCodec implements Codec {
 
   private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
     InputStream in = bs.newInput();
-    byte status = (byte) in.read();
+    byte status = StreamUtils.readByte(in);
     if (status == Dictionary.NOT_IN_DICTIONARY) {
       byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
       int bytesRead = in.read(arr);
       if (bytesRead != arr.length) {
         throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
       }
-      if (dict != null) dict.addEntry(arr, 0, arr.length);
+      if (dict != null) {
+        dict.addEntry(arr, 0, arr.length);
+      }
       return arr;
     } else {
       // Status here is the higher-order byte of index of the dictionary entry.
-      short dictIdx = StreamUtils.toShort(status, (byte) in.read());
+      short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
       byte[] entry = dict.getEntry(dictIdx);
       if (entry == null) {
         throw new IOException("Missing dictionary entry for index " + dictIdx);
@@ -350,7 +352,7 @@ public class WALCellCodec implements Codec {
     }
 
     private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
-      byte status = (byte) in.read();
+      byte status = StreamUtils.readByte(in);
       if (status == Dictionary.NOT_IN_DICTIONARY) {
         // status byte indicating that data to be read is not in dictionary.
         // if this isn't in the dictionary, we need to add to the dictionary.
@@ -360,7 +362,7 @@ public class WALCellCodec implements Codec {
         return length;
       } else {
         // the status byte also acts as the higher order byte of the dictionary entry.
-        short dictIdx = StreamUtils.toShort(status, (byte) in.read());
+        short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
         byte[] entry = dict.getEntry(dictIdx);
         if (entry == null) {
           throw new IOException("Missing dictionary entry for index " + dictIdx);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java
new file mode 100644
index 00000000000..dda72f4304a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamCompressionReset.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
+
+/**
+ * Enable compression and reset the WALEntryStream while reading in ReplicationSourceWALReader.
+ * <p/>
+ * This is used to confirm that we can work well when hitting EOFException in the middle when
+ * reading a WAL entry, when compression is enabled. See HBASE-27621 for more details.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestWALEntryStreamCompressionReset {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestWALEntryStreamCompressionReset.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("reset");
+
+  private static RegionInfo REGION_INFO = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
+
+  private static byte[] FAMILY = Bytes.toBytes("family");
+
+  private static MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
+
+  private static NavigableMap<byte[], Integer> SCOPE;
+
+  private static String GROUP_ID = "group";
+
+  private static FileSystem FS;
+
+  private static ReplicationSource SOURCE;
+
+  private static MetricsSource METRICS_SOURCE;
+
+  private static ReplicationSourceLogQueue LOG_QUEUE;
+
+  private static Path TEMPLATE_WAL_FILE;
+
+  private static int END_OFFSET_OF_WAL_ENTRIES;
+
+  private static Path WAL_FILE;
+
+  private static volatile long WAL_LENGTH;
+
+  private static ReplicationSourceWALReader READER;
+
+  // return the wal path, and also the end offset of last wal entry
+  private static Pair<Path, Long> generateWAL() throws Exception {
+    Path path = UTIL.getDataTestDir("wal");
+    ProtobufLogWriter writer = new ProtobufLogWriter();
+    writer.init(FS, path, UTIL.getConfiguration(), false, FS.getDefaultBlockSize(path), null);
+    for (int i = 0; i < Byte.MAX_VALUE; i++) {
+      WALEdit edit = new WALEdit();
+      edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
+        .setRow(Bytes.toBytes(i)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-" + i))
+        .setValue(Bytes.toBytes("v-" + i)).build());
+      writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
+        EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit));
+    }
+
+    WALEdit edit2 = new WALEdit();
+    edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
+      .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier"))
+      .setValue(Bytes.toBytes("vv")).build());
+    edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
+      .setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-1"))
+      .setValue(Bytes.toBytes("vvv")).build());
+    writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
+      EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit2));
+    writer.sync(false);
+    long offset = writer.getSyncedLength();
+    writer.close();
+    return Pair.newPair(path, offset);
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    FS = UTIL.getTestFileSystem();
+    conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+    conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    FS.mkdirs(UTIL.getDataTestDir());
+    Pair<Path, Long> pair = generateWAL();
+    TEMPLATE_WAL_FILE = pair.getFirst();
+    END_OFFSET_OF_WAL_ENTRIES = pair.getSecond().intValue();
+    WAL_FILE = UTIL.getDataTestDir("rep_source");
+
+    METRICS_SOURCE = new MetricsSource("reset");
+    SOURCE = mock(ReplicationSource.class);
+    when(SOURCE.isPeerEnabled()).thenReturn(true);
+    when(SOURCE.getWALFileLengthProvider()).thenReturn(p -> OptionalLong.of(WAL_LENGTH));
+    when(SOURCE.getServerWALsBelongTo())
+      .thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
+    when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE);
+    ReplicationSourceManager rsm = mock(ReplicationSourceManager.class);
+    when(rsm.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    when(rsm.getTotalBufferLimit())
+      .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
+    when(rsm.getGlobalMetrics()).thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
+    when(SOURCE.getSourceManager()).thenReturn(rsm);
+
+    LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE);
+    LOG_QUEUE.enqueueLog(WAL_FILE, GROUP_ID);
+    READER = new ReplicationSourceWALReader(FS, conf, LOG_QUEUE, 0, e -> e, SOURCE, GROUP_ID);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    READER.setReaderRunning(false);
+    READER.join();
+    UTIL.cleanupTestDir();
+  }
+
+  private void test(byte[] content, FSDataOutputStream out) throws Exception {
+    // minus 15 so the second entry is incomplete
+    // 15 is a magic number here, we want the reader parse the first cell but not the second cell,
+    // especially not the qualifier of the second cell. The value of the second cell is 'vvv', which
+    // is 3 bytes, plus 8 bytes timestamp, and also qualifier, family and row(which should have been
+    // compressed), so 15 is a proper value, of course 14 or 16 could also work here.
+    out.write(content, 0, END_OFFSET_OF_WAL_ENTRIES - 15);
+    out.hflush();
+    WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES - 15;
+    READER.start();
+    List<WAL.Entry> entries = new ArrayList<>();
+    for (;;) {
+      WALEntryBatch batch = READER.poll(1000);
+      if (batch == null) {
+        break;
+      }
+      entries.addAll(batch.getWalEntries());
+    }
+    // should return all the entries except the last one
+    assertEquals(Byte.MAX_VALUE, entries.size());
+    for (int i = 0; i < Byte.MAX_VALUE; i++) {
+      WAL.Entry entry = entries.get(i);
+      assertEquals(1, entry.getEdit().size());
+      Cell cell = entry.getEdit().getCells().get(0);
+      assertEquals(i, Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
+      assertEquals(Bytes.toString(FAMILY),
+        Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
+      assertEquals("qualifier-" + i, Bytes.toString(cell.getQualifierArray(),
+        cell.getQualifierOffset(), cell.getQualifierLength()));
+      assertEquals("v-" + i,
+        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
+    }
+
+    // confirm that we can not get the last one since it is incomplete
+    assertNull(READER.poll(1000));
+    // write the last byte out
+    out.write(content, END_OFFSET_OF_WAL_ENTRIES - 15, 15);
+    out.hflush();
+    WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES;
+
+    // should get the last entry
+    WALEntryBatch batch = READER.poll(10000);
+    assertEquals(1, batch.getNbEntries());
+    WAL.Entry entry = batch.getWalEntries().get(0);
+    assertEquals(2, entry.getEdit().size());
+    Cell cell2 = entry.getEdit().getCells().get(0);
+    assertEquals(-1, Bytes.toInt(cell2.getRowArray(), cell2.getRowOffset()));
+    assertEquals(Bytes.toString(FAMILY),
+      Bytes.toString(cell2.getFamilyArray(), cell2.getFamilyOffset(), cell2.getFamilyLength()));
+    assertEquals("qualifier", Bytes.toString(cell2.getQualifierArray(), cell2.getQualifierOffset(),
+      cell2.getQualifierLength()));
+    assertEquals("vv",
+      Bytes.toString(cell2.getValueArray(), cell2.getValueOffset(), cell2.getValueLength()));
+
+    Cell cell3 = entry.getEdit().getCells().get(1);
+    assertEquals(-1, Bytes.toInt(cell3.getRowArray(), cell3.getRowOffset()));
+    assertEquals(Bytes.toString(FAMILY),
+      Bytes.toString(cell3.getFamilyArray(), cell3.getFamilyOffset(), cell3.getFamilyLength()));
+    assertEquals("qualifier-1", Bytes.toString(cell3.getQualifierArray(),
+      cell3.getQualifierOffset(), cell3.getQualifierLength()));
+    assertEquals("vvv",
+      Bytes.toString(cell3.getValueArray(), cell3.getValueOffset(), cell3.getValueLength()));
+  }
+
+  @Test
+  public void testReset() throws Exception {
+    byte[] content;
+    try (FSDataInputStream in = FS.open(TEMPLATE_WAL_FILE)) {
+      content = ByteStreams.toByteArray(in);
+    }
+    try (FSDataOutputStream out = FS.create(WAL_FILE)) {
+      test(content, out);
+    }
+  }
+}