You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/02/21 18:04:41 UTC
hbase git commit: HBASE-20037 Race when calling
SequenceIdAccounting.resetHighest
Repository: hbase
Updated Branches:
refs/heads/branch-2 2b4df5e36 -> 30c2dcd88
HBASE-20037 Race when calling SequenceIdAccounting.resetHighest
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30c2dcd8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30c2dcd8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30c2dcd8
Branch: refs/heads/branch-2
Commit: 30c2dcd8832d6148fc1c4a430f90e7660b1af3c1
Parents: 2b4df5e
Author: zhangduo <zh...@apache.org>
Authored: Wed Feb 21 18:19:01 2018 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Wed Feb 21 10:04:33 2018 -0800
----------------------------------------------------------------------
.../hbase/regionserver/wal/AbstractFSWAL.java | 37 +++++++++--------
.../hbase/regionserver/wal/AsyncFSWAL.java | 42 ++++++++++----------
.../wal/AsyncProtobufLogWriter.java | 4 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 6 +--
.../regionserver/wal/ProtobufLogWriter.java | 11 +++--
.../regionserver/wal/SequenceIdAccounting.java | 40 ++++++++++---------
6 files changed, 74 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 14fbe10..ce8dafa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -661,9 +661,26 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
+ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) {
+ int oldNumEntries = this.numEntries.getAndSet(0);
+ String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
+ if (oldPath != null) {
+ this.walFile2Props.put(oldPath,
+ new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
+ this.totalLogSize.addAndGet(oldFileLen);
+ LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
+ CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
+ newPathString);
+ } else {
+ LOG.info("New WAL {}", newPathString);
+ }
+ }
+
/**
+ * <p>
* Cleans up current writer closing it and then puts in place the passed in
* <code>nextWriter</code>.
+ * </p>
* <p>
* <ul>
* <li>In the case of creating a new WAL, oldPath will be null.</li>
@@ -672,26 +689,17 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be
* null.</li>
* </ul>
+ * </p>
* @param oldPath may be null
* @param newPath may be null
* @param nextWriter may be null
* @return the passed in <code>newPath</code>
* @throws IOException if there is a problem flushing or closing the underlying FS
*/
+ @VisibleForTesting
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) {
- long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
- int oldNumEntries = this.numEntries.getAndSet(0);
- final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
- if (oldPath != null) {
- this.walFile2Props.put(oldPath,
- new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
- this.totalLogSize.addAndGet(oldFileLen);
- LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
- ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
- } else {
- LOG.info("New WAL " + newPathString);
- }
+ doReplaceWriter(oldPath, newPath, nextWriter);
return newPath;
}
}
@@ -1021,10 +1029,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected abstract W createWriterInstance(Path path)
throws IOException, CommonFSUtils.StreamLacksCapabilityException;
- /**
- * @return old wal file size
- */
- protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
+ protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter)
throws IOException;
protected abstract void doShutdown() throws IOException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 19d89df..d22d1ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -631,11 +631,29 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
+ private long closeWriter() {
+ AsyncWriter oldWriter = this.writer;
+ if (oldWriter != null) {
+ long fileLength = oldWriter.getLength();
+ closeExecutor.execute(() -> {
+ try {
+ oldWriter.close();
+ } catch (IOException e) {
+ LOG.warn("close old writer failed", e);
+ }
+ });
+ return fileLength;
+ } else {
+ return 0L;
+ }
+ }
+
@Override
- protected long doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
+ protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
throws IOException {
waitForSafePoint();
- final AsyncWriter oldWriter = this.writer;
+ long oldFileLen = closeWriter();
+ logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
@@ -654,13 +672,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
} finally {
consumeLock.unlock();
}
- return executeClose(closeExecutor, oldWriter);
}
@Override
protected void doShutdown() throws IOException {
waitForSafePoint();
- executeClose(closeExecutor, writer);
+ closeWriter();
closeExecutor.shutdown();
try {
if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) {
@@ -698,23 +715,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
}
- private static long executeClose(ExecutorService closeExecutor, AsyncWriter writer) {
- long fileLength;
- if (writer != null) {
- fileLength = writer.getLength();
- closeExecutor.execute(() -> {
- try {
- writer.close();
- } catch (IOException e) {
- LOG.warn("close old writer failed", e);
- }
- });
- } else {
- fileLength = 0L;
- }
- return fileLength;
- }
-
@Override
protected void doAppend(AsyncWriter writer, FSWALEntry entry) {
writer.append(entry);
http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index 9688bbd..67258ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -34,13 +33,14 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index a9a3835..d927b7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -298,7 +298,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
@Override
- protected long doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
+ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) throws IOException {
// Ask the ring buffer writer to pause at a safe point. Once we do this, the writer
// thread will eventually pause. An error hereafter needs to release the writer thread
// regardless -- hence the finally block below. Note, this method is called from the FSHLog
@@ -320,7 +320,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
}
afterCreatingZigZagLatch();
- long oldFileLen = 0L;
try {
// Wait on the safe point to be achieved. Send in a sync in case nothing has hit the
// ring buffer between the above notification of writer that we want it to go to
@@ -343,6 +342,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
LOG.warn(
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
}
+ long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
// TODO: This is close is inline with critical section. Should happen in background?
if (this.writer != null) {
@@ -363,6 +363,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
}
+ logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
@@ -397,7 +398,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
}
}
- return oldFileLen;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index aeb2c19..9d36429 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
@@ -25,15 +23,16 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
-import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Writer for protobuf-based WAL.
http://git-wip-us.apache.org/repos/asf/hbase/blob/30c2dcd8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
index f4cacb2..e14ce0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -29,25 +27,28 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ImmutableByteArray;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ImmutableByteArray;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
+ * <p>
* Accounting of sequence ids per region and then by column family. So we can our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
* keep abreast of the state of sequence id persistence. Also call update per append.
+ * </p>
* <p>
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
- * {@link HRegionInfo#getEncodedNameAsBytes()}. So it is safe to use it as a hash key. And for
- * family name, we use {@link ImmutableByteArray} as key. This is because hash based map is much
- * faster than RBTree or CSLM and here we are on the critical write path. See HBASE-16278 for more
- * details.
+ * {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
+ * it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
+ * hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
+ * HBASE-16278 for more details.
+ * </p>
*/
@InterfaceAudience.Private
class SequenceIdAccounting {
@@ -93,14 +94,17 @@ class SequenceIdAccounting {
*/
private final Map<byte[], Map<ImmutableByteArray, Long>> flushingSequenceIds = new HashMap<>();
- /**
- * Map of region encoded names to the latest/highest region sequence id. Updated on each
- * call to append.
- * <p>
- * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
- * use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
- * the same array.
- */
+ /**
+ * <p>
+ * Map of region encoded names to the latest/highest region sequence id. Updated on each call to
+ * append.
+ * </p>
+ * <p>
+ * This map uses byte[] as the key, and uses reference equality. It works in our use case as we
+ * use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a
+ * given region, it always returns the same array.
+ * </p>
+ */
private Map<byte[], Long> highestSequenceIds = new HashMap<>();
/**