You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/07/04 13:15:10 UTC
[hbase] branch branch-2 updated: HBASE-24625
AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced
file length. (#1970)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new f834919 HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (#1970)
f834919 is described below
commit f8349199290a642c91908dd13037227f9eaebb35
Author: chenglei <ch...@apache.org>
AuthorDate: Sat Jul 4 21:00:35 2020 +0800
HBASE-24625 AsyncFSWAL.getLogFileSizeIfBeingWritten does not return the expected synced file length. (#1970)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hadoop/hbase/io/asyncfs/AsyncFSOutput.java | 5 +++
.../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 5 +++
.../hbase/io/asyncfs/WrapperAsyncFSOutput.java | 13 +++++-
.../hbase/regionserver/wal/AbstractFSWAL.java | 2 +-
.../regionserver/wal/AsyncProtobufLogWriter.java | 5 +++
.../hbase/regionserver/wal/ProtobufLogWriter.java | 11 +++++
.../org/apache/hadoop/hbase/wal/WALProvider.java | 17 ++++++++
.../regionserver/TestFailedAppendAndSync.java | 49 ++++++++++++----------
.../hadoop/hbase/regionserver/TestHRegion.java | 5 +++
.../hadoop/hbase/regionserver/TestWALLockup.java | 10 +++++
.../hbase/regionserver/wal/TestAsyncFSWAL.java | 5 +++
.../regionserver/wal/TestAsyncFSWALDurability.java | 5 +++
.../regionserver/wal/TestFSHLogDurability.java | 5 +++
.../hbase/regionserver/wal/TestLogRolling.java | 10 +++++
14 files changed, 123 insertions(+), 24 deletions(-)
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 3c520b8..059ca00 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -89,4 +89,9 @@ public interface AsyncFSOutput extends Closeable {
*/
@Override
void close() throws IOException;
+
+ /**
+ * @return byteSize success synced to underlying filesystem.
+ */
+ long getSyncedLength();
}
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index ed5bbf0..457b7c1 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -574,4 +574,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
public boolean isBroken() {
return state == State.BROKEN;
}
+
+ @Override
+ public long getSyncedLength() {
+ return this.ackedBlockLength;
+ }
}
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
index bbb4e54..39f1f71 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
@@ -45,6 +45,8 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
private final ExecutorService executor;
+ private volatile long syncedLength = 0;
+
public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
@@ -91,7 +93,11 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
out.hflush();
}
}
- future.complete(out.getPos());
+ long pos = out.getPos();
+ if(pos > this.syncedLength) {
+ this.syncedLength = pos;
+ }
+ future.complete(pos);
} catch (IOException e) {
future.completeExceptionally(e);
return;
@@ -124,4 +130,9 @@ public class WrapperAsyncFSOutput implements AsyncFSOutput {
public boolean isBroken() {
return false;
}
+
+ @Override
+ public long getSyncedLength() {
+ return this.syncedLength;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index bf53352..a978dbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1061,7 +1061,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
Path currentPath = getOldPath();
if (path.equals(currentPath)) {
W writer = this.writer;
- return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
+ return writer != null ? OptionalLong.of(writer.getSyncedLength()) : OptionalLong.empty();
} else {
return OptionalLong.empty();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e731611..8c944b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -231,4 +231,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
protected OutputStream getOutputStreamForCellEncoder() {
return asyncOutputWrapper;
}
+
+ @Override
+ public long getSyncedLength() {
+ return this.output.getSyncedLength();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index ff08da8..4bbc13d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -19,11 +19,14 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
@@ -46,6 +49,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
protected FSDataOutputStream output;
+ private final AtomicLong syncedLength = new AtomicLong(0);
+
@Override
public void append(Entry entry) throws IOException {
entry.getKey().getBuilder(compressor).
@@ -85,6 +90,12 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
} else {
fsdos.hflush();
}
+ AtomicUtils.updateMax(this.syncedLength, fsdos.getPos());
+ }
+
+ @Override
+ public long getSyncedLength() {
+ return this.syncedLength.get();
}
public FSDataOutputStream getStream() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 6f0b983..c3bd149 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -25,6 +25,7 @@ import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -74,6 +75,22 @@ public interface WALProvider {
interface WriterBase extends Closeable {
long getLength();
+ /**
+ * NOTE: We add this method for {@link WALFileLengthProvider} used for replication,
+ * considering the case if we use {@link AsyncFSWAL},we write to 3 DNs concurrently,
+ * according to the visibility guarantee of HDFS, the data will be available immediately
+ * when arriving at DN since all the DNs will be considered as the last one in pipeline.
+ * This means replication may read uncommitted data and replicate it to the remote cluster
+ * and cause data inconsistency.
+ * The method {@link WriterBase#getLength} may return length which just in hdfs client
+ * buffer and not successfully synced to HDFS, so we use this method to return the length
+ * successfully synced to HDFS and replication thread could only read writing WAL file
+ * limited by this length.
+ * see also HBASE-14004 and this document for more details:
+ * https://docs.google.com/document/d/11AyWtGhItQs6vsLRIx32PwTxmBY3libXwGXI25obVEY/edit#
+ * @return byteSize successfully synced to underlying filesystem.
+ */
+ long getSyncedLength();
}
// Writers are used internally. Users outside of the WAL should be relying on the
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 25ea112..198e64b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -130,35 +130,40 @@ public class TestFailedAppendAndSync {
@Override
protected Writer createWriterInstance(Path path) throws IOException {
final Writer w = super.createWriterInstance(path);
- return new Writer() {
- @Override
- public void close() throws IOException {
- w.close();
- }
+ return new Writer() {
+ @Override
+ public void close() throws IOException {
+ w.close();
+ }
- @Override
- public void sync(boolean forceSync) throws IOException {
- if (throwSyncException) {
- throw new IOException("FAKE! Failed to replace a bad datanode...");
- }
- w.sync(forceSync);
+ @Override
+ public void sync(boolean forceSync) throws IOException {
+ if (throwSyncException) {
+ throw new IOException("FAKE! Failed to replace a bad datanode...");
}
+ w.sync(forceSync);
+ }
- @Override
- public void append(Entry entry) throws IOException {
- if (throwAppendException) {
- throw new IOException("FAKE! Failed to replace a bad datanode...");
- }
- w.append(entry);
+ @Override
+ public void append(Entry entry) throws IOException {
+ if (throwAppendException) {
+ throw new IOException("FAKE! Failed to replace a bad datanode...");
}
+ w.append(entry);
+ }
- @Override
- public long getLength() {
- return w.getLength();
- }
- };
+ @Override
+ public long getLength() {
+ return w.getLength();
}
+
+ @Override
+ public long getSyncedLength() {
+ return w.getSyncedLength();
+ }
+ };
}
+ }
// Make up mocked server and services.
RegionServerServices services = mock(RegionServerServices.class);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 3448eb7..41ca8a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -1251,6 +1251,11 @@ public class TestHRegion {
public long getLength() {
return w.getLength();
}
+
+ @Override
+ public long getSyncedLength() {
+ return w.getSyncedLength();
+ }
};
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index a50ef78..21f1774 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -190,6 +190,11 @@ public class TestWALLockup {
public long getLength() {
return w.getLength();
}
+
+ @Override
+ public long getSyncedLength() {
+ return w.getSyncedLength();
+ }
};
}
}
@@ -374,6 +379,11 @@ public class TestWALLockup {
public long getLength() {
return w.getLength();
}
+
+ @Override
+ public long getSyncedLength() {
+ return w.getSyncedLength();
+ }
};
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 704cdfa..f31a908 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -156,6 +156,11 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
}
@Override
+ public long getSyncedLength() {
+ return writer.getSyncedLength();
+ }
+
+ @Override
public CompletableFuture<Long> sync(boolean forceSync) {
CompletableFuture<Long> result = writer.sync(forceSync);
if (failedCount.incrementAndGet() < 1000) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
index 353f549..a482d93 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALDurability.java
@@ -110,6 +110,11 @@ class CustomAsyncFSWAL extends AsyncFSWAL {
}
@Override
+ public long getSyncedLength() {
+ return writer.getSyncedLength();
+ }
+
+ @Override
public CompletableFuture<Long> sync(boolean forceSync) {
writerSyncFlag = forceSync;
return writer.sync(forceSync);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
index 9c46058..3c25044 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLogDurability.java
@@ -85,6 +85,11 @@ class CustomFSHLog extends FSHLog {
}
@Override
+ public long getSyncedLength() {
+ return writer.getSyncedLength();
+ }
+
+ @Override
public void sync(boolean forceSync) throws IOException {
writerSyncFlag = forceSync;
writer.sync(forceSync);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index 691250a..0712b59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -174,6 +174,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
public long getLength() {
return oldWriter1.getLength();
}
+
+ @Override
+ public long getSyncedLength() {
+ return oldWriter1.getSyncedLength();
+ }
};
log.setWriter(newWriter1);
@@ -231,6 +236,11 @@ public class TestLogRolling extends AbstractTestLogRolling {
public long getLength() {
return oldWriter2.getLength();
}
+
+ @Override
+ public long getSyncedLength() {
+ return oldWriter2.getSyncedLength();
+ }
};
log.setWriter(newWriter2);