You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/12/25 04:24:50 UTC
[hbase] branch branch-2 updated: HBASE-23326 Implement a
ProcedureStore which stores procedures in a HRegion (#941)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 5cae75e HBASE-23326 Implement a ProcedureStore which stores procedures in a HRegion (#941)
5cae75e is described below
commit 5cae75e12435190c519dd1a3e8aa89ff308c2f19
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Wed Dec 25 12:02:12 2019 +0800
HBASE-23326 Implement a ProcedureStore which stores procedures in a HRegion (#941)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
Signed-off-by: stack <st...@apache.org>
---
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 8 +-
.../org/apache/hadoop/hbase/client/RegionInfo.java | 2 +-
.../procedure2/CompletedProcedureCleaner.java | 3 +
.../hadoop/hbase/procedure2/ProcedureUtil.java | 13 +
.../store/InMemoryProcedureIterator.java | 94 ++++
...edureStoreException.java => LeaseRecovery.java} | 28 +-
.../hbase/procedure2/store/ProcedureStore.java | 24 +-
.../WALProcedureTree.java => ProcedureTree.java} | 134 +----
...eStoreException.java => ProtoAndProcedure.java} | 38 +-
.../procedure2/store/{ => wal}/BitSetNode.java | 7 +-
.../wal/CorruptedWALProcedureStoreException.java | 3 +
.../store/{ => wal}/ProcedureStoreTracker.java | 7 +-
.../procedure2/store/wal/ProcedureWALFile.java | 6 +-
.../procedure2/store/wal/ProcedureWALFormat.java | 6 +-
.../store/wal/ProcedureWALFormatReader.java | 13 +-
.../store/wal/ProcedureWALPrettyPrinter.java | 3 +
.../procedure2/store/wal/WALProcedureMap.java | 3 +
.../procedure2/store/wal/WALProcedureStore.java | 21 +-
.../hbase/procedure2/ProcedureTestingUtility.java | 127 ++---
...ALProcedureTree.java => TestProcedureTree.java} | 12 +-
.../wal/ProcedureWALPerformanceEvaluation.java | 3 +-
.../procedure2/store/{ => wal}/TestBitSetNode.java | 4 +-
.../store/{ => wal}/TestProcedureStoreTracker.java | 2 +-
.../store/wal/TestWALProcedureStore.java | 6 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 38 +-
.../hbase/master/procedure/MasterProcedureEnv.java | 6 +-
.../master/procedure/MasterProcedureUtil.java | 10 +-
.../store/region/RegionFlusherAndCompactor.java | 239 +++++++++
.../store/region/RegionProcedureStore.java | 584 +++++++++++++++++++++
.../region/RegionProcedureStoreWALRoller.java | 120 +++++
.../apache/hadoop/hbase/regionserver/HRegion.java | 216 ++++----
.../hadoop/hbase/regionserver/LogRoller.java | 189 +------
.../hbase/regionserver/wal/AbstractFSWAL.java | 5 +-
.../LogRoller.java => wal/AbstractWALRoller.java} | 100 ++--
.../org/apache/hadoop/hbase/wal/WALFactory.java | 4 +-
.../resources/hbase-webapps/master/procedures.jsp | 114 ----
.../hbase/master/TestLoadProcedureError.java | 2 +-
.../hbase/master/TestMasterMetricsWrapper.java | 6 +-
.../procedure/TestMasterProcedureWalLease.java | 238 ---------
.../region/RegionProcedureStoreTestHelper.java | 54 ++
.../region/RegionProcedureStoreTestProcedure.java | 77 +++
.../store/region/TestRegionProcedureStore.java | 159 ++++++
.../region/TestRegionProcedureStoreMigration.java | 143 +++++
.../region/TestRegionProcedureStoreWALCleaner.java | 129 +++++
.../TestRegionServerCrashDisableWAL.java | 3 +-
45 files changed, 2038 insertions(+), 965 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 98dda51..1527f10 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -1182,12 +1182,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
private byte[] toEncodeRegionName(byte[] regionName) {
- try {
- return RegionInfo.isEncodedRegionName(regionName) ? regionName
- : Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
- } catch (IOException e) {
- return regionName;
- }
+ return RegionInfo.isEncodedRegionName(regionName) ? regionName :
+ Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
}
private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
index 2f9e88d..a971332 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionInfo.java
@@ -351,7 +351,7 @@ public interface RegionInfo {
* @return True if <code>regionName</code> represents an encoded name.
*/
@InterfaceAudience.Private // For use by internals only.
- public static boolean isEncodedRegionName(byte[] regionName) throws IOException {
+ public static boolean isEncodedRegionName(byte[] regionName) {
// If not parseable as region name, presume encoded. TODO: add stringency; e.g. if hex.
return parseRegionNameOrReturnNull(regionName) == null && regionName.length <= MD5_HEX_LENGTH;
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
index e51b77b..796a8e4 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/CompletedProcedureCleaner.java
@@ -134,5 +134,8 @@ class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEn
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
}
+ // let the store do some cleanup works, i.e, delete the place marker for preserving the max
+ // procedure id.
+ store.cleanup();
}
}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
index 30201ca..c557c20 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureUtil.java
@@ -370,4 +370,17 @@ public final class ProcedureUtil {
.setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
return new RetryCounter(retryConfig);
}
+
+ public static boolean isFinished(ProcedureProtos.Procedure proc) {
+ if (!proc.hasParentId()) {
+ switch (proc.getState()) {
+ case ROLLEDBACK:
+ case SUCCESS:
+ return true;
+ default:
+ break;
+ }
+ }
+ return false;
+ }
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/InMemoryProcedureIterator.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/InMemoryProcedureIterator.java
new file mode 100644
index 0000000..aba71b9
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/InMemoryProcedureIterator.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A procedure iterator which holds all the procedure protos in memory. For fast access.
+ */
+@InterfaceAudience.Private
+public class InMemoryProcedureIterator implements ProcedureIterator {
+
+ private final List<ProtoAndProcedure> procs;
+
+ private Iterator<ProtoAndProcedure> iter;
+
+ private ProtoAndProcedure current;
+
+ public InMemoryProcedureIterator(List<ProtoAndProcedure> procs) {
+ this.procs = procs;
+ reset();
+ }
+
+ @Override
+ public void reset() {
+ iter = procs.iterator();
+ if (iter.hasNext()) {
+ current = iter.next();
+ } else {
+ current = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ private void checkNext() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public boolean isNextFinished() {
+ checkNext();
+ return ProcedureUtil.isFinished(current.getProto());
+ }
+
+ private void moveToNext() {
+ if (iter.hasNext()) {
+ current = iter.next();
+ } else {
+ current = null;
+ }
+ }
+
+ @Override
+ public void skipNext() {
+ checkNext();
+ moveToNext();
+ }
+
+ @Override
+ public Procedure<?> next() throws IOException {
+ checkNext();
+ Procedure<?> proc = current.getProcedure();
+ moveToNext();
+ return proc;
+ }
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/LeaseRecovery.java
similarity index 62%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
copy to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/LeaseRecovery.java
index ba4480f..7a9ea1b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/LeaseRecovery.java
@@ -15,29 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
-/**
- * Thrown when a procedure WAL is corrupted
- */
@InterfaceAudience.Private
-public class CorruptedWALProcedureStoreException extends HBaseIOException {
-
- private static final long serialVersionUID = -3407300445435898074L;
-
- /** default constructor */
- public CorruptedWALProcedureStoreException() {
- super();
- }
+public interface LeaseRecovery {
- /**
- * Constructor
- * @param s message
- */
- public CorruptedWALProcedureStoreException(String s) {
- super(s);
- }
-}
+ void recoverFileLease(FileSystem fs, Path path) throws IOException;
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 4398888..4965e17 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -24,9 +24,18 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
- * The ProcedureStore is used by the executor to persist the state of each procedure execution.
- * This allows to resume the execution of pending/in-progress procedures in case
- * of machine failure or service shutdown.
+ * The ProcedureStore is used by the executor to persist the state of each procedure execution. This
+ * allows to resume the execution of pending/in-progress procedures in case of machine failure or
+ * service shutdown.
+ * <p/>
+ * Notice that, the implementation must guarantee that the maxProcId when loading is the maximum one
+ * in the whole history, not only the current live procedures. This is very important as for
+ * executing remote procedures, we have some nonce checks at region server side to prevent executing
+ * non-idempotent operations more than once. If the procedure id could go back, then we may
+ * accidentally ignore some important operations such as region assign or unassign.<br/>
+ * This may lead to some garbages so we provide a {@link #cleanup()} method, the framework will call
+ * this method periodically and the store implementation could do some clean up works in this
+ * method.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -240,4 +249,13 @@ public interface ProcedureStore {
* @param count the number of IDs to delete
*/
void delete(long[] procIds, int offset, int count);
+
+ /**
+ * Will be called by the framework to give the store a chance to do some clean up works.
+ * <p/>
+ * Notice that this is for periodical clean up work, not for the clean up after close, if you want
+ * to close the store just call the {@link #stop(boolean)} method above.
+ */
+ default void cleanup() {
+ }
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureTree.java
similarity index 69%
rename from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
rename to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureTree.java
index 6e624b4..4e615b9 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureTree.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureTree.java
@@ -15,18 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
@@ -50,9 +47,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
*/
@InterfaceAudience.Private
-public final class WALProcedureTree {
+public final class ProcedureTree {
- private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ProcedureTree.class);
private static final class Entry {
@@ -78,50 +75,18 @@ public final class WALProcedureTree {
}
}
- // when loading we will iterator the procedures twice, so use this class to cache the deserialized
- // result to prevent deserializing multiple times.
- private static final class ProtoAndProc {
- private final ProcedureProtos.Procedure proto;
+ private final List<ProtoAndProcedure> validProcs = new ArrayList<>();
- private Procedure<?> proc;
+ private final List<ProtoAndProcedure> corruptedProcs = new ArrayList<>();
- public ProtoAndProc(ProcedureProtos.Procedure proto) {
- this.proto = proto;
- }
-
- public Procedure<?> getProc() throws IOException {
- if (proc == null) {
- proc = ProcedureUtil.convertToProcedure(proto);
- }
- return proc;
- }
- }
-
- private final List<ProtoAndProc> validProcs = new ArrayList<>();
-
- private final List<ProtoAndProc> corruptedProcs = new ArrayList<>();
-
- private static boolean isFinished(ProcedureProtos.Procedure proc) {
- if (!proc.hasParentId()) {
- switch (proc.getState()) {
- case ROLLEDBACK:
- case SUCCESS:
- return true;
- default:
- break;
- }
- }
- return false;
- }
-
- private WALProcedureTree(Map<Long, Entry> procMap) {
+ private ProcedureTree(Map<Long, Entry> procMap) {
List<Entry> rootEntries = buildTree(procMap);
for (Entry rootEntry : rootEntries) {
checkReady(rootEntry, procMap);
}
checkOrphan(procMap);
- Comparator<ProtoAndProc> cmp =
- (p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId());
+ Comparator<ProtoAndProcedure> cmp =
+ (p1, p2) -> Long.compare(p1.getProto().getProcId(), p2.getProto().getProcId());
Collections.sort(validProcs, cmp);
Collections.sort(corruptedProcs, cmp);
}
@@ -144,7 +109,7 @@ public final class WALProcedureTree {
}
private void collectStackId(Entry entry, Map<Integer, List<Entry>> stackId2Proc,
- MutableInt maxStackId) {
+ MutableInt maxStackId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Procedure {} stack ids={}", entry, entry.proc.getStackIdList());
}
@@ -159,8 +124,8 @@ public final class WALProcedureTree {
}
private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
- Map<Long, Entry> remainingProcMap) {
- corruptedProcs.add(new ProtoAndProc(entry.proc));
+ Map<Long, Entry> remainingProcMap) {
+ corruptedProcs.add(new ProtoAndProcedure(entry.proc));
remainingProcMap.remove(entry.proc.getProcId());
for (Entry e : entry.subProcs) {
addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
@@ -168,7 +133,7 @@ public final class WALProcedureTree {
}
private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
- validProcs.add(new ProtoAndProc(entry.proc));
+ validProcs.add(new ProtoAndProcedure(entry.proc));
remainingProcMap.remove(entry.proc.getProcId());
for (Entry e : entry.subProcs) {
addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
@@ -180,7 +145,7 @@ public final class WALProcedureTree {
// remainingProcMap, so at last, if there are still procedures in the map, we know that there are
// orphan procedures.
private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
- if (isFinished(rootEntry.proc)) {
+ if (ProcedureUtil.isFinished(rootEntry.proc)) {
if (!rootEntry.subProcs.isEmpty()) {
LOG.error("unexpected active children for root-procedure: {}", rootEntry);
rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
@@ -217,86 +182,23 @@ public final class WALProcedureTree {
private void checkOrphan(Map<Long, Entry> procMap) {
procMap.values().forEach(entry -> {
LOG.error("Orphan procedure: {}", entry);
- corruptedProcs.add(new ProtoAndProc(entry.proc));
+ corruptedProcs.add(new ProtoAndProcedure(entry.proc));
});
}
- private static final class Iter implements ProcedureIterator {
-
- private final List<ProtoAndProc> procs;
-
- private Iterator<ProtoAndProc> iter;
-
- private ProtoAndProc current;
-
- public Iter(List<ProtoAndProc> procs) {
- this.procs = procs;
- reset();
- }
-
- @Override
- public void reset() {
- iter = procs.iterator();
- if (iter.hasNext()) {
- current = iter.next();
- } else {
- current = null;
- }
- }
-
- @Override
- public boolean hasNext() {
- return current != null;
- }
-
- private void checkNext() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public boolean isNextFinished() {
- checkNext();
- return isFinished(current.proto);
- }
-
- private void moveToNext() {
- if (iter.hasNext()) {
- current = iter.next();
- } else {
- current = null;
- }
- }
-
- @Override
- public void skipNext() {
- checkNext();
- moveToNext();
- }
-
- @Override
- public Procedure<?> next() throws IOException {
- checkNext();
- Procedure<?> proc = current.getProc();
- moveToNext();
- return proc;
- }
- }
-
public ProcedureIterator getValidProcs() {
- return new Iter(validProcs);
+ return new InMemoryProcedureIterator(validProcs);
}
public ProcedureIterator getCorruptedProcs() {
- return new Iter(corruptedProcs);
+ return new InMemoryProcedureIterator(corruptedProcs);
}
- public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
+ public static ProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
Map<Long, Entry> procMap = new HashMap<>();
for (ProcedureProtos.Procedure proc : procedures) {
procMap.put(proc.getProcId(), new Entry(proc));
}
- return new WALProcedureTree(procMap);
+ return new ProcedureTree(procMap);
}
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProtoAndProcedure.java
similarity index 51%
copy from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
copy to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProtoAndProcedure.java
index ba4480f..0cdc480 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProtoAndProcedure.java
@@ -15,29 +15,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
-import org.apache.hadoop.hbase.HBaseIOException;
+import java.io.IOException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
/**
- * Thrown when a procedure WAL is corrupted
+ * when loading we will iterator the procedures twice, so use this class to cache the deserialized
+ * result to prevent deserializing multiple times.
*/
@InterfaceAudience.Private
-public class CorruptedWALProcedureStoreException extends HBaseIOException {
+public class ProtoAndProcedure {
+ private final ProcedureProtos.Procedure proto;
+
+ private Procedure<?> proc;
- private static final long serialVersionUID = -3407300445435898074L;
+ public ProtoAndProcedure(ProcedureProtos.Procedure proto) {
+ this.proto = proto;
+ }
- /** default constructor */
- public CorruptedWALProcedureStoreException() {
- super();
+ public Procedure<?> getProcedure() throws IOException {
+ if (proc == null) {
+ proc = ProcedureUtil.convertToProcedure(proto);
+ }
+ return proc;
}
- /**
- * Constructor
- * @param s message
- */
- public CorruptedWALProcedureStoreException(String s) {
- super(s);
+ public ProcedureProtos.Procedure getProto() {
+ return proto;
}
-}
+}
\ No newline at end of file
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/BitSetNode.java
similarity index 97%
rename from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
rename to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/BitSetNode.java
index 78d2d91..98416a5 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/BitSetNode.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
+import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureStoreTracker.DeleteState;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@@ -51,7 +51,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the
* partial one, the initial modified value is 0 and the initial deleted value is also 0. In
* {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
class BitSetNode {
private static final long WORD_MASK = 0xffffffffffffffffL;
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
index ba4480f..dc9d16c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java
@@ -22,7 +22,10 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Thrown when a procedure WAL is corrupted
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
public class CorruptedWALProcedureStoreException extends HBaseIOException {
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureStoreTracker.java
similarity index 98%
rename from hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
rename to hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureStoreTracker.java
index 03e2ce3..3436e8b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureStoreTracker.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import java.util.Arrays;
@@ -36,9 +36,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
*
* It can be used by the ProcedureStore to identify which procedures are already
* deleted/completed to avoid the deserialization step on restart
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
-public class ProcedureStoreTracker {
+class ProcedureStoreTracker {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
// Key is procedure id corresponding to first bit of the bitmap.
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
index aeae900..947d5bd 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* Describes a WAL File
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
-public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
+class ProcedureWALFile implements Comparable<ProcedureWALFile> {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);
private ProcedureWALHeader header;
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 9686593..bc60584 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.yetus.audience.InterfaceAudience;
@@ -40,9 +39,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* Helper class that contains the WAL serialization utils.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
-public final class ProcedureWALFormat {
+final class ProcedureWALFormat {
static final byte LOG_TYPE_STREAM = 0;
static final byte LOG_TYPE_COMPACTED = 1;
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 1b19abb..31150ca 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,9 +31,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* Helper class that loads the procedures stored in a WAL.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
-public class ProcedureWALFormatReader {
+class ProcedureWALFormatReader {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
/**
@@ -44,8 +47,8 @@ public class ProcedureWALFormatReader {
* See the comments of {@link WALProcedureMap} for more details.
* <p/>
* After reading all the proc wal files, we will use the procedures in the procedureMap to build a
- * {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of
- * {@link WALProcedureTree} and the code in {@link #finish()} for more details.
+ * {@link ProcedureTree}, and then give the result to the upper layer. See the comments of
+ * {@link ProcedureTree} and the code in {@link #finish()} for more details.
*/
private final WALProcedureMap localProcedureMap = new WALProcedureMap();
private final WALProcedureMap procedureMap = new WALProcedureMap();
@@ -144,7 +147,7 @@ public class ProcedureWALFormatReader {
// build the procedure execution tree. When building we will verify that whether a procedure is
// valid.
- WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures());
+ ProcedureTree tree = ProcedureTree.build(procedureMap.getProcedures());
loader.load(tree.getValidProcs());
loader.handleCorrupted(tree.getCorruptedProcs());
}
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
index a11a46b..89e32c3 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java
@@ -48,7 +48,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file
* @see WALProcedureStore#main(String[]) if you want to check parse of a directory of WALs.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@InterfaceStability.Evolving
public class ProcedureWALPrettyPrinter extends Configured implements Tool {
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
index 9cda1bc..5e1983f 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java
@@ -46,7 +46,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
* method, for the same procedure, the one comes earlier will win, as we read the proc wal files
* from new to old(the reverse order).
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
class WALProcedureMap {
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index ae1088a..c685d8b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -106,7 +106,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* deleted.
* @see ProcedureWALPrettyPrinter for printing content of a single WAL.
* @see #main(String[]) to parse a directory of MasterWALProcs.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
+ * use the new region based procedure store.
*/
+@Deprecated
@InterfaceAudience.Private
public class WALProcedureStore extends ProcedureStoreBase {
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
@@ -115,10 +118,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
- public interface LeaseRecovery {
- void recoverFileLease(FileSystem fs, Path path) throws IOException;
- }
-
public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
"hbase.procedure.store.wal.warn.threshold";
private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
@@ -233,12 +232,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
- public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
- throws IOException {
- this(conf,
- new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
- new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
- leaseRecovery);
+ public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException {
+ this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
+ new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
+ leaseRecovery);
}
@VisibleForTesting
@@ -1411,7 +1408,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
System.exit(-1);
}
WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null,
- new WALProcedureStore.LeaseRecovery() {
+ new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index b885ba5..6c66a49 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@@ -52,13 +53,13 @@ public final class ProcedureTestingUtility {
}
public static ProcedureStore createStore(final Configuration conf, final Path dir)
- throws IOException {
+ throws IOException {
return createWalStore(conf, dir);
}
public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
- throws IOException {
- return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
+ throws IOException {
+ return new WALProcedureStore(conf, dir, null, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
@@ -66,13 +67,13 @@ public final class ProcedureTestingUtility {
});
}
- public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
- boolean abort, boolean startWorkers) throws Exception {
- restart(procExecutor, false, true, null, null, null, abort, startWorkers);
+ public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort,
+ boolean startWorkers) throws Exception {
+ restart(procExecutor, false, true, null, null, null, abort, startWorkers);
}
- public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
- boolean abort) throws Exception {
+ public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort)
+ throws Exception {
restart(procExecutor, false, true, null, null, null, abort, true);
}
@@ -81,12 +82,12 @@ public final class ProcedureTestingUtility {
}
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
- boolean abortOnCorruption) throws IOException {
+ boolean abortOnCorruption) throws IOException {
initAndStartWorkers(procExecutor, numThreads, abortOnCorruption, true);
}
public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads,
- boolean abortOnCorruption, boolean startWorkers) throws IOException {
+ boolean abortOnCorruption, boolean startWorkers) throws IOException {
procExecutor.init(numThreads, abortOnCorruption);
if (startWorkers) {
procExecutor.startWorkers();
@@ -94,16 +95,16 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
- boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
- Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
+ boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
+ Callable<Void> actionBeforeStartWorker, Callable<Void> startAction) throws Exception {
restart(procExecutor, avoidTestKillDuringRestart, failOnCorrupted, stopAction,
actionBeforeStartWorker, startAction, false, true);
}
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
- boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
- Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
- boolean startWorkers) throws Exception {
+ boolean avoidTestKillDuringRestart, boolean failOnCorrupted, Callable<Void> stopAction,
+ Callable<Void> actionBeforeStartWorker, Callable<Void> startAction, boolean abort,
+ boolean startWorkers) throws Exception {
final ProcedureStore procStore = procExecutor.getStore();
final int storeThreads = procExecutor.getCorePoolSize();
final int execThreads = procExecutor.getCorePoolSize();
@@ -144,15 +145,20 @@ public final class ProcedureTestingUtility {
}
public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
- throws Exception {
- procStore.stop(false);
+ throws Exception {
+ storeRestart(procStore, false, loader);
+ }
+
+ public static void storeRestart(ProcedureStore procStore, boolean abort,
+ ProcedureStore.ProcedureLoader loader) throws Exception {
+ procStore.stop(abort);
procStore.start(procStore.getNumThreads());
procStore.recoverLease();
procStore.load(loader);
}
public static LoadCounter storeRestartAndAssert(ProcedureStore procStore, long maxProcId,
- long runnableCount, int completedCount, int corruptedCount) throws Exception {
+ long runnableCount, int completedCount, int corruptedCount) throws Exception {
final LoadCounter loader = new LoadCounter();
storeRestart(procStore, loader);
assertEquals(maxProcId, loader.getMaxProcId());
@@ -169,19 +175,19 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> void setKillIfHasParent(ProcedureExecutor<TEnv> procExecutor,
- boolean value) {
+ boolean value) {
createExecutorTesting(procExecutor);
procExecutor.testing.killIfHasParent = value;
}
public static <TEnv> void setKillIfSuspended(ProcedureExecutor<TEnv> procExecutor,
- boolean value) {
+ boolean value) {
createExecutorTesting(procExecutor);
procExecutor.testing.killIfSuspended = value;
}
public static <TEnv> void setKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
- boolean value) {
+ boolean value) {
createExecutorTesting(procExecutor);
procExecutor.testing.killBeforeStoreUpdate = value;
LOG.warn("Set Kill before store update to: " + procExecutor.testing.killBeforeStoreUpdate);
@@ -189,7 +195,7 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> void setToggleKillBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
- boolean value) {
+ boolean value) {
createExecutorTesting(procExecutor);
procExecutor.testing.toggleKillBeforeStoreUpdate = value;
assertSingleExecutorForKillTests(procExecutor);
@@ -210,27 +216,27 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> void setKillAndToggleBeforeStoreUpdate(ProcedureExecutor<TEnv> procExecutor,
- boolean value) {
+ boolean value) {
ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, value);
ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
assertSingleExecutorForKillTests(procExecutor);
}
- private static <TEnv> void assertSingleExecutorForKillTests(
- final ProcedureExecutor<TEnv> procExecutor) {
+ private static <TEnv> void
+ assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) {
if (procExecutor.testing == null) {
return;
}
if (procExecutor.testing.killBeforeStoreUpdate ||
- procExecutor.testing.toggleKillBeforeStoreUpdate) {
- assertEquals("expected only one executor running during test with kill/restart",
- 1, procExecutor.getCorePoolSize());
+ procExecutor.testing.toggleKillBeforeStoreUpdate) {
+ assertEquals("expected only one executor running during test with kill/restart", 1,
+ procExecutor.getCorePoolSize());
}
}
public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
- throws IOException {
+ throws IOException {
NoopProcedureStore procStore = new NoopProcedureStore();
ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore);
procStore.start(1);
@@ -248,14 +254,14 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
- final long nonceGroup, final long nonce) {
+ final long nonceGroup, final long nonce) {
long procId = submitProcedure(procExecutor, proc, nonceGroup, nonce);
waitProcedure(procExecutor, procId);
return procId;
}
public static <TEnv> long submitProcedure(ProcedureExecutor<TEnv> procExecutor, Procedure proc,
- final long nonceGroup, final long nonce) {
+ final long nonceGroup, final long nonce) {
final NonceKey nonceKey = procExecutor.createNonceKey(nonceGroup, nonce);
long procId = procExecutor.registerNonce(nonceKey);
assertFalse(procId >= 0);
@@ -301,13 +307,12 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> void assertProcNotYetCompleted(ProcedureExecutor<TEnv> procExecutor,
- long procId) {
+ long procId) {
assertFalse("expected a running proc", procExecutor.isFinished(procId));
assertEquals(null, procExecutor.getResult(procId));
}
- public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
- long procId) {
+ public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) {
Procedure<?> result = procExecutor.getResult(procId);
assertTrue("expected procedure result", result != null);
assertProcNotFailed(result);
@@ -318,7 +323,7 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> Throwable assertProcFailed(final ProcedureExecutor<TEnv> procExecutor,
- final long procId) {
+ final long procId) {
Procedure<?> result = procExecutor.getResult(procId);
assertTrue("expected procedure result", result != null);
return assertProcFailed(result);
@@ -332,7 +337,8 @@ public final class ProcedureTestingUtility {
public static void assertIsAbortException(final Procedure<?> result) {
Throwable cause = assertProcFailed(result);
- assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException);
+ assertTrue("expected abort exception, got " + cause,
+ cause instanceof ProcedureAbortedException);
}
public static void assertIsTimeoutException(final Procedure<?> result) {
@@ -353,31 +359,30 @@ public final class ProcedureTestingUtility {
}
/**
- * Run through all procedure flow states TWICE while also restarting
- * procedure executor at each step; i.e force a reread of procedure store.
- *
- *<p>It does
- * <ol><li>Execute step N - kill the executor before store update
+ * Run through all procedure flow states TWICE while also restarting procedure executor at each
+ * step; i.e force a reread of procedure store.
+ * <p>
+ * It does
+ * <ol>
+ * <li>Execute step N - kill the executor before store update
* <li>Restart executor/store
* <li>Execute step N - and then save to store
* </ol>
- *
- *<p>This is a good test for finding state that needs persisting and steps that are not
- * idempotent.
+ * <p>
+ * This is a good test for finding state that needs persisting and steps that are not idempotent.
*/
public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
- final long procId) throws Exception {
+ final long procId) throws Exception {
testRecoveryAndDoubleExecution(procExec, procId, false);
}
public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
- final long procId, final boolean expectFailure) throws Exception {
+ final long procId, final boolean expectFailure) throws Exception {
testRecoveryAndDoubleExecution(procExec, procId, expectFailure, null);
}
public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
- final long procId, final boolean expectFailure, final Runnable customRestart)
- throws Exception {
+ final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception {
Procedure proc = procExec.getProcedure(procId);
waitProcedure(procExec, procId);
assertEquals(false, procExec.isRunning());
@@ -401,11 +406,12 @@ public final class ProcedureTestingUtility {
}
public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
- public NoopProcedure() {}
+ public NoopProcedure() {
+ }
@Override
protected Procedure<TEnv>[] execute(TEnv env)
- throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+ throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
return null;
}
@@ -419,18 +425,16 @@ public final class ProcedureTestingUtility {
}
@Override
- protected void serializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
- protected void deserializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
public static class NoopStateMachineProcedure<TEnv, TState>
- extends StateMachineProcedure<TEnv, TState> {
+ extends StateMachineProcedure<TEnv, TState> {
private TState initialState;
private TEnv env;
@@ -444,7 +448,7 @@ public final class ProcedureTestingUtility {
@Override
protected Flow executeFromState(TEnv env, TState tState)
- throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
return null;
}
@@ -472,7 +476,8 @@ public final class ProcedureTestingUtility {
public static class TestProcedure extends NoopProcedure<Void> {
private byte[] data = null;
- public TestProcedure() {}
+ public TestProcedure() {
+ }
public TestProcedure(long procId) {
this(procId, 0);
@@ -510,16 +515,14 @@ public final class ProcedureTestingUtility {
}
@Override
- protected void serializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
serializer.serialize(builder.build());
}
@Override
- protected void deserializeStateData(ProcedureStateSerializer serializer)
- throws IOException {
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
BytesValue bytesValue = serializer.deserialize(BytesValue.class);
ByteString dataString = bytesValue.getValue();
@@ -603,7 +606,7 @@ public final class ProcedureTestingUtility {
}
public boolean isRunnable(final long procId) {
- for (Procedure proc: runnable) {
+ for (Procedure proc : runnable) {
if (proc.getProcId() == procId) {
return true;
}
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureTree.java
similarity index 93%
rename from hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
rename to hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureTree.java
index 890d0e3..29d114a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureTree.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureTree.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store.wal;
+package org.apache.hadoop.hbase.procedure2.store;
import static org.junit.Assert.assertEquals;
@@ -41,11 +41,11 @@ import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@Category({ MasterTests.class, SmallTests.class })
-public class TestWALProcedureTree {
+public class TestProcedureTree {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestWALProcedureTree.class);
+ HBaseClassTestRule.forClass(TestProcedureTree.class);
public static final class TestProcedure extends Procedure<Void> {
@@ -123,7 +123,7 @@ public class TestWALProcedureTree {
proc1.addStackIndex(1);
TestProcedure proc2 = createProc(3, 2);
proc2.addStackIndex(3);
- WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
+ ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2));
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
assertEquals(0, validProcs.size());
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
@@ -141,7 +141,7 @@ public class TestWALProcedureTree {
proc1.addStackIndex(1);
TestProcedure proc2 = createProc(3, 2);
proc2.addStackIndex(1);
- WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
+ ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2));
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
assertEquals(0, validProcs.size());
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
@@ -161,7 +161,7 @@ public class TestWALProcedureTree {
proc2.addStackIndex(0);
TestProcedure proc3 = createProc(5, 4);
proc3.addStackIndex(1);
- WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
+ ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
assertEquals(3, validProcs.size());
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
index 2f37f0c..7ad26d7 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
@@ -246,7 +247,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
private static class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
- super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
+ super(conf, logDir, null, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestBitSetNode.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestBitSetNode.java
similarity index 96%
rename from hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestBitSetNode.java
rename to hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestBitSetNode.java
index cfc3809..9d897cf 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestBitSetNode.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestBitSetNode.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
+import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureStoreTracker.DeleteState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestProcedureStoreTracker.java
similarity index 99%
rename from hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
rename to hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestProcedureStoreTracker.java
index 24c4ad0..25678e3 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestProcedureStoreTracker.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.procedure2.store;
+package org.apache.hadoop.hbase.procedure2.store.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 5c7f532..9049ffe 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.io.IOUtils;
@@ -627,7 +627,7 @@ public class TestWALProcedureStore {
// simulate another active master removing the wals
procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
- new WALProcedureStore.LeaseRecovery() {
+ new LeaseRecovery() {
private int count = 0;
@Override
@@ -795,7 +795,7 @@ public class TestWALProcedureStore {
}
private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
- return new WALProcedureStore(conf, new WALProcedureStore.LeaseRecovery() {
+ return new WALProcedureStore(conf, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 40b0566..880cf80 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -161,8 +161,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
@@ -331,6 +332,10 @@ public class HMaster extends HRegionServer implements MasterServices {
"hbase.master.wait.on.service.seconds";
public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
+ public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval";
+
+ public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
+
// Metrics for the HMaster
final MetricsMaster metricsMaster;
// file system manager for the master FS operations
@@ -425,7 +430,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private SnapshotQuotaObserverChore snapshotQuotaChore;
private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
- private WALProcedureStore procedureStore;
+ private ProcedureStore procedureStore;
// handle table states
private TableStateManager tableStateManager;
@@ -909,10 +914,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
- // Only initialize the MemStoreLAB when master carry table
- if (LoadBalancer.isTablesOnMaster(conf)) {
- initializeMemStoreChunkCreator();
- }
+ // always initialize the MemStoreLAB as we use a region to store procedure now.
+ initializeMemStoreChunkCreator();
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
@@ -1422,18 +1425,19 @@ public class HMaster extends HRegionServer implements MasterServices {
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
- // We depend on there being only one instance of this executor running
- // at a time. To do concurrency, would need fencing of enable/disable of
- // tables.
- // Any time changing this maxThreads to > 1, pls see the comment at
- // AccessController#postCompletedCreateTableAction
- this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
- startProcedureExecutor();
+ // We depend on there being only one instance of this executor running
+ // at a time. To do concurrency, would need fencing of enable/disable of
+ // tables.
+ // Any time changing this maxThreads to > 1, pls see the comment at
+ // AccessController#postCompletedCreateTableAction
+ this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
+ startProcedureExecutor();
// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Start log cleaner thread
- int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
+ int cleanerInterval =
+ conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
getChoreService().scheduleChore(logCleaner);
@@ -1537,7 +1541,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private void createProcedureExecutor() throws IOException {
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
procedureStore =
- new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
+ new RegionProcedureStore(this, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
procedureStore.registerListener(new ProcedureStoreListener() {
@Override
@@ -2707,10 +2711,10 @@ public class HMaster extends HRegionServer implements MasterServices {
}
public int getNumWALFiles() {
- return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
+ return 0;
}
- public WALProcedureStore getWalProcedureStore() {
+ public ProcedureStore getProcedureStore() {
return procedureStore;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
index 4fcf7e0..a3772fd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -47,10 +47,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureEnv.class);
@InterfaceAudience.Private
- public static class WALStoreLeaseRecovery implements WALProcedureStore.LeaseRecovery {
+ public static class FsUtilsLeaseRecovery implements LeaseRecovery {
private final MasterServices master;
- public WALStoreLeaseRecovery(final MasterServices master) {
+ public FsUtilsLeaseRecovery(final MasterServices master) {
this.master = master;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
index 1e488b6..49bf5c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java
@@ -146,8 +146,14 @@ public final class MasterProcedureUtil {
/**
* Pattern used to validate a Procedure WAL file name see
* {@link #validateProcedureWALFilename(String)} for description.
+ * @deprecated Since 2.3.0, will be removed in 4.0.0. We do not use this style of procedure wal
+ * file name any more.
*/
- private static final Pattern pattern = Pattern.compile(".*pv2-\\d{20}.log");
+ @Deprecated
+ private static final Pattern PATTERN = Pattern.compile(".*pv2-\\d{20}.log");
+
+ // Use the character $ to let the log cleaner know that this is not the normal wal file.
+ public static final String ARCHIVED_PROC_WAL_SUFFIX = "$masterproc$";
/**
* A Procedure WAL file name is of the format: pv-<wal-id>.log where wal-id is 20 digits.
@@ -155,7 +161,7 @@ public final class MasterProcedureUtil {
* @return <tt>true</tt> if the filename matches a Procedure WAL, <tt>false</tt> otherwise
*/
public static boolean validateProcedureWALFilename(String filename) {
- return pattern.matcher(filename).matches();
+ return PATTERN.matcher(filename).matches() || filename.endsWith(ARCHIVED_PROC_WAL_SUFFIX);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
new file mode 100644
index 0000000..fb24802
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * As long as there is no RegionServerServices for the procedure store region, we need implement the
+ * flush and compaction logic by our own.
+ * <p/>
+ * The flush logic is very simple, every time after calling a modification method in
+ * {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
+ * method, we will check the memstore size and if it is above the flush size, we will call
+ * {@link HRegion#flush(boolean)} to force flush all stores.
+ * <p/>
+ * And for compaction, the logic is also very simple. After flush, we will check the store file
+ * count, if it is above the compactMin, we will do a major compaction.
+ */
+@InterfaceAudience.Private
+class RegionFlusherAndCompactor implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegionFlusherAndCompactor.class);
+
+ static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
+
+ private static final long DEFAULT_FLUSH_SIZE = 16L * 1024 * 1024;
+
+ static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";
+
+ private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
+
+ static final String FLUSH_INTERVAL_MS_KEY = "hbase.procedure.store.region.flush.interval.ms";
+
+ // default to flush every 15 minutes, for safety
+ private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
+
+ static final String COMPACT_MIN_KEY = "hbase.procedure.store.region.compact.min";
+
+ private static final int DEFAULT_COMPACT_MIN = 4;
+
+ private final Abortable abortable;
+
+ private final HRegion region;
+
+ // as we can only count this outside the region's write/flush process so it is not accurate, but
+ // it is enough.
+ private final AtomicLong changesAfterLastFlush = new AtomicLong(0);
+
+ private final long flushSize;
+
+ private final long flushPerChanges;
+
+ private final long flushIntervalMs;
+
+ private final int compactMin;
+
+ private final Thread flushThread;
+
+ private final Lock flushLock = new ReentrantLock();
+
+ private final Condition flushCond = flushLock.newCondition();
+
+ private boolean flushRequest = false;
+
+ private long lastFlushTime;
+
+ private final ExecutorService compactExecutor;
+
+ private final Lock compactLock = new ReentrantLock();
+
+ private boolean compactRequest = false;
+
+ private volatile boolean closed = false;
+
+ RegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region) {
+ this.abortable = abortable;
+ this.region = region;
+ flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+ flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
+ flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
+ compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
+ flushThread = new Thread(this::flushLoop, "Procedure-Region-Store-Flusher");
+ flushThread.setDaemon(true);
+ flushThread.start();
+ compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
+ }
+
+ // inject our flush related configurations
+ static void setupConf(Configuration conf) {
+ long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
+ conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
+ long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
+ conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
+ long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
+ conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
+ }
+
+ private void compact() {
+ try {
+ region.compact(true);
+ } catch (IOException e) {
+ LOG.error("Failed to compact procedure store region", e);
+ }
+ compactLock.lock();
+ try {
+ if (needCompaction()) {
+ compactExecutor.execute(this::compact);
+ } else {
+ compactRequest = false;
+ }
+ } finally {
+ compactLock.unlock();
+ }
+ }
+
+ private boolean needCompaction() {
+ return Iterables.getOnlyElement(region.getStores()).getStorefilesCount() >= compactMin;
+ }
+
+ private void flushLoop() {
+ lastFlushTime = EnvironmentEdgeManager.currentTime();
+ while (!closed) {
+ flushLock.lock();
+ try {
+ while (!flushRequest) {
+ long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime();
+ if (waitTimeMs <= 0) {
+ flushRequest = true;
+ break;
+ }
+ flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS);
+ if (closed) {
+ return;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ continue;
+ } finally {
+ flushLock.unlock();
+ }
+ assert flushRequest;
+ changesAfterLastFlush.set(0);
+ try {
+ region.flush(true);
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
+ abortable.abort("Failed to flush procedure store region", e);
+ return;
+ }
+ compactLock.lock();
+ try {
+ if (!compactRequest && needCompaction()) {
+ compactRequest = true;
+ compactExecutor.execute(this::compact);
+ }
+ } finally {
+ compactLock.unlock();
+ }
+ flushLock.lock();
+ try {
+ // reset the flushRequest flag
+ if (!shouldFlush(changesAfterLastFlush.get())) {
+ flushRequest = false;
+ }
+ } finally {
+ flushLock.unlock();
+ }
+ }
+ }
+
+ private boolean shouldFlush(long changes) {
+ return region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
+ changes > flushPerChanges;
+ }
+
+ void onUpdate() {
+ long changes = changesAfterLastFlush.incrementAndGet();
+ if (shouldFlush(changes)) {
+ requestFlush();
+ }
+ }
+
+ void requestFlush() {
+ flushLock.lock();
+ try {
+ if (flushRequest) {
+ return;
+ }
+ flushRequest = true;
+ flushCond.signalAll();
+ } finally {
+ flushLock.unlock();
+ }
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ flushThread.interrupt();
+ compactExecutor.shutdown();
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
new file mode 100644
index 0000000..d96b356
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
+import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * A procedure store which uses a region to store all the procedures.
+ * <p/>
+ * FileSystem layout:
+ *
+ * <pre>
+ * hbase
+ * |
+ * --MasterProcs
+ * |
+ * --data
+ * | |
+ * | --/master/procedure/<encoded-region-name> <---- The region data
+ * | |
+ * | --replay <---- The edits to replay
+ * |
+ * --WALs
+ * |
+ * --<master-server-name> <---- The WAL dir for active master
+ * |
+ * --<master-server-name>-dead <---- The WAL dir dead master
+ * </pre>
+ *
+ * We use p:d column to store the serialized protobuf format procedure, and when deleting we
+ * will first fill the info:proc column with an empty byte array, and then actually delete them in
+ * the {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we
+ * can not directly delete a procedure row as we do not know if it is the one with the max procedure
+ * id.
+ */
+@InterfaceAudience.Private
+public class RegionProcedureStore extends ProcedureStoreBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
+
+ static final String MAX_WALS_KEY = "hbase.procedure.store.region.maxwals";
+
+ private static final int DEFAULT_MAX_WALS = 10;
+
+ static final String USE_HSYNC_KEY = "hbase.procedure.store.region.wal.hsync";
+
+ static final String MASTER_PROCEDURE_DIR = "MasterProcs";
+
+ static final String LOGCLEANER_PLUGINS = "hbase.procedure.store.region.logcleaner.plugins";
+
+ private static final String DATA_DIR = "data";
+
+ private static final String REPLAY_EDITS_DIR = "replay";
+
+ private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
+
+ private static final TableName TABLE_NAME = TableName.valueOf("master:procedure");
+
+ private static final byte[] FAMILY = Bytes.toBytes("p");
+
+ private static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
+
+ private static final int REGION_ID = 1;
+
+ private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
+
+ private final Server server;
+
+ private final LeaseRecovery leaseRecovery;
+
+ private WALFactory walFactory;
+
+ @VisibleForTesting
+ HRegion region;
+
+ private RegionFlusherAndCompactor flusherAndCompactor;
+
+ @VisibleForTesting
+ RegionProcedureStoreWALRoller walRoller;
+
+ private int numThreads;
+
+ public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) {
+ this.server = server;
+ this.leaseRecovery = leaseRecovery;
+ }
+
+ @Override
+ public void start(int numThreads) throws IOException {
+ if (!setRunning(true)) {
+ return;
+ }
+ LOG.info("Starting the Region Procedure Store...");
+ this.numThreads = numThreads;
+ }
+
+ private void shutdownWAL() {
+ if (walFactory != null) {
+ try {
+ walFactory.shutdown();
+ } catch (IOException e) {
+ LOG.warn("Failed to shutdown WAL", e);
+ }
+ }
+ }
+
+ private void closeRegion(boolean abort) {
+ if (region != null) {
+ try {
+ region.close(abort);
+ } catch (IOException e) {
+ LOG.warn("Failed to close region", e);
+ }
+ }
+
+ }
+
+ @Override
+ public void stop(boolean abort) {
+ if (!setRunning(false)) {
+ return;
+ }
+ LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
+ if (flusherAndCompactor != null) {
+ flusherAndCompactor.close();
+ }
+ // if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
+ // region, otherwise there will be dead lock.
+ if (abort) {
+ shutdownWAL();
+ closeRegion(true);
+ } else {
+ closeRegion(false);
+ shutdownWAL();
+ }
+
+ if (walRoller != null) {
+ walRoller.close();
+ }
+ }
+
+ @Override
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ @Override
+ public int setRunningProcedureCount(int count) {
+ // useless for region based storage.
+ return count;
+ }
+
+ private WAL createWAL(FileSystem fs, Path rootDir, RegionInfo regionInfo) throws IOException {
+ String logName = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
+ Path walDir = new Path(rootDir, logName);
+ LOG.debug("WALDir={}", walDir);
+ if (fs.exists(walDir)) {
+ throw new HBaseIOException(
+ "Master procedure store has already created directory at " + walDir);
+ }
+ if (!fs.mkdirs(walDir)) {
+ throw new IOException("Can not create master procedure wal directory " + walDir);
+ }
+ WAL wal = walFactory.getWAL(regionInfo);
+ walRoller.addWAL(wal);
+ return wal;
+ }
+
+ private HRegion bootstrap(Configuration conf, FileSystem fs, Path rootDir, Path dataDir)
+ throws IOException {
+ RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).setRegionId(REGION_ID).build();
+ Path tmpDataDir = new Path(dataDir.getParent(), dataDir.getName() + "-tmp");
+ if (fs.exists(tmpDataDir) && !fs.delete(tmpDataDir, true)) {
+ throw new IOException("Can not delete partial created proc region " + tmpDataDir);
+ }
+ Path tableDir = CommonFSUtils.getTableDir(tmpDataDir, TABLE_NAME);
+ HRegion.createHRegion(conf, regionInfo, fs, tableDir, TABLE_DESC).close();
+ if (!fs.rename(tmpDataDir, dataDir)) {
+ throw new IOException("Can not rename " + tmpDataDir + " to " + dataDir);
+ }
+ WAL wal = createWAL(fs, rootDir, regionInfo);
+ return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
+ null);
+ }
+
+ private HRegion open(Configuration conf, FileSystem fs, Path rootDir, Path dataDir)
+ throws IOException {
+ String factoryId = server.getServerName().toString();
+ Path tableDir = CommonFSUtils.getTableDir(dataDir, TABLE_NAME);
+ Path regionDir =
+ fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
+ .getPath();
+ Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
+ if (!fs.exists(replayEditsDir) && !fs.mkdirs(replayEditsDir)) {
+ throw new IOException("Failed to create replay directory: " + replayEditsDir);
+ }
+ Path walsDir = new Path(rootDir, HREGION_LOGDIR_NAME);
+ for (FileStatus walDir : fs.listStatus(walsDir)) {
+ if (!walDir.isDirectory()) {
+ continue;
+ }
+ if (walDir.getPath().getName().startsWith(factoryId)) {
+ LOG.warn("This should not happen in real production as we have not created our WAL " +
+ "directory yet, ignore if you are running a procedure related UT");
+ }
+ Path deadWALDir;
+ if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
+ deadWALDir =
+ new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
+ if (!fs.rename(walDir.getPath(), deadWALDir)) {
+ throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
+ " when recovering lease of proc store");
+ }
+ LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
+ } else {
+ deadWALDir = walDir.getPath();
+ LOG.info("{} is already marked as dead", deadWALDir);
+ }
+ for (FileStatus walFile : fs.listStatus(deadWALDir)) {
+ Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
+ leaseRecovery.recoverFileLease(fs, walFile.getPath());
+ if (!fs.rename(walFile.getPath(), replayEditsFile)) {
+ throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
+ " when recovering lease of proc store");
+ }
+ LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
+ }
+ LOG.info("Delete empty proc wal dir {}", deadWALDir);
+ fs.delete(deadWALDir, true);
+ }
+ RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
+ WAL wal = createWAL(fs, rootDir, regionInfo);
+ conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
+ replayEditsDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+ return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
+ null);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void tryMigrate(FileSystem fs) throws IOException {
+ Configuration conf = server.getConfiguration();
+ Path procWALDir =
+ new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
+ if (!fs.exists(procWALDir)) {
+ return;
+ }
+ LOG.info("The old procedure wal directory {} exists, start migrating", procWALDir);
+ WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
+ store.start(numThreads);
+ store.recoverLease();
+ MutableLong maxProcIdSet = new MutableLong(-1);
+ MutableLong maxProcIdFromProcs = new MutableLong(-1);
+ store.load(new ProcedureLoader() {
+
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ maxProcIdSet.setValue(maxProcId);
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ long procCount = 0;
+ while (procIter.hasNext()) {
+ Procedure<?> proc = procIter.next();
+ update(proc);
+ procCount++;
+ if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
+ maxProcIdFromProcs.setValue(proc.getProcId());
+ }
+ }
+ LOG.info("Migrated {} procedures", procCount);
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ long corruptedCount = 0;
+ while (procIter.hasNext()) {
+ LOG.error("Corrupted procedure {}", procIter.next());
+ corruptedCount++;
+ }
+ if (corruptedCount > 0) {
+ throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
+ " migrating from the old WAL based store to the new region based store, please" +
+ " fix them before upgrading again.");
+ }
+ }
+ });
+ LOG.info("The max pid is {}, and the max pid of all loaded procedures is {}",
+ maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
+ // Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
+ // anyway, let's do a check here.
+ if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
+ if (maxProcIdSet.longValue() > 0) {
+ // let's add a fake row to retain the max proc id
+ region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
+ PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+ }
+ } else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
+ LOG.warn("The max pid is less than the max pid of all loaded procedures");
+ }
+ if (!fs.delete(procWALDir, true)) {
+ throw new IOException("Failed to delete the migrated proc wal directory " + procWALDir);
+ }
+ LOG.info("Migration finished");
+ }
+
+ @Override
+ public void recoverLease() throws IOException {
+ LOG.debug("Starting Region Procedure Store lease recovery...");
+ Configuration baseConf = server.getConfiguration();
+ FileSystem fs = CommonFSUtils.getWALFileSystem(baseConf);
+ Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
+ Path rootDir = new Path(globalWALRootDir, MASTER_PROCEDURE_DIR);
+ // we will override some configurations so create a new one.
+ Configuration conf = new Configuration(baseConf);
+ CommonFSUtils.setRootDir(conf, rootDir);
+ CommonFSUtils.setWALRootDir(conf, rootDir);
+ RegionFlusherAndCompactor.setupConf(conf);
+
+ walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
+ walRoller.start();
+ conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
+ if (conf.get(USE_HSYNC_KEY) != null) {
+ conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
+ }
+ walFactory = new WALFactory(conf, server.getServerName().toString());
+ Path dataDir = new Path(rootDir, DATA_DIR);
+ if (fs.exists(dataDir)) {
+ // load the existing region.
+ region = open(conf, fs, rootDir, dataDir);
+ } else {
+ // bootstrapping...
+ region = bootstrap(conf, fs, rootDir, dataDir);
+ }
+ flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
+ walRoller.setFlusherAndCompactor(flusherAndCompactor);
+ tryMigrate(fs);
+ }
+
+ @Override
+ public void load(ProcedureLoader loader) throws IOException {
+ List<ProcedureProtos.Procedure> procs = new ArrayList<>();
+ long maxProcId = 0;
+
+ try (RegionScanner scanner = region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER))) {
+ List<Cell> cells = new ArrayList<>();
+ boolean moreRows;
+ do {
+ moreRows = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Cell cell = cells.get(0);
+ cells.clear();
+ maxProcId = Math.max(maxProcId,
+ Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ if (cell.getValueLength() > 0) {
+ ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
+ .parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ procs.add(proto);
+ }
+ } while (moreRows);
+ }
+ loader.setMaxProcId(maxProcId);
+ ProcedureTree tree = ProcedureTree.build(procs);
+ loader.load(tree.getValidProcs());
+ loader.handleCorrupted(tree.getCorruptedProcs());
+ }
+
+ private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock)
+ throws IOException {
+ ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
+ byte[] row = Bytes.toBytes(proc.getProcId());
+ mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, proto.toByteArray()));
+ rowsToLock.add(row);
+ }
+
+ // As we need to keep the max procedure id, here we can not simply delete the procedure, just fill
+ // the proc column with an empty array.
+ private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
+ byte[] row = Bytes.toBytes(procId);
+ mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+ rowsToLock.add(row);
+ }
+
+ @Override
+ public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
+ if (subProcs == null || subProcs.length == 0) {
+ // same with update, just insert a single procedure
+ update(proc);
+ return;
+ }
+ List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
+ List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
+ try {
+ serializePut(proc, mutations, rowsToLock);
+ for (Procedure<?> subProc : subProcs) {
+ serializePut(subProc, mutations, rowsToLock);
+ }
+ region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
+ Arrays.toString(subProcs), e);
+ throw new UncheckedIOException(e);
+ }
+ flusherAndCompactor.onUpdate();
+ }
+
+ @Override
+ public void insert(Procedure<?>[] procs) {
+ List<Mutation> mutations = new ArrayList<>(procs.length);
+ List<byte[]> rowsToLock = new ArrayList<>(procs.length);
+ try {
+ for (Procedure<?> proc : procs) {
+ serializePut(proc, mutations, rowsToLock);
+ }
+ region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
+ throw new UncheckedIOException(e);
+ }
+ flusherAndCompactor.onUpdate();
+ }
+
+ @Override
+ public void update(Procedure<?> proc) {
+ try {
+ ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
+ region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
+ proto.toByteArray()));
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
+ throw new UncheckedIOException(e);
+ }
+ flusherAndCompactor.onUpdate();
+ }
+
+ @Override
+ public void delete(long procId) {
+ try {
+ region
+ .put(new Put(Bytes.toBytes(procId)).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
+ throw new UncheckedIOException(e);
+ }
+ flusherAndCompactor.onUpdate();
+ }
+
+ @Override
+ public void delete(Procedure<?> parentProc, long[] subProcIds) {
+ List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1);
+ List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1);
+ try {
+ serializePut(parentProc, mutations, rowsToLock);
+ for (long subProcId : subProcIds) {
+ serializeDelete(subProcId, mutations, rowsToLock);
+ }
+ region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
+ Arrays.toString(subProcIds), e);
+ throw new UncheckedIOException(e);
+ }
+ flusherAndCompactor.onUpdate();
+ }
+
+ @Override
+ public void delete(long[] procIds, int offset, int count) {
+ if (count == 0) {
+ return;
+ }
+ if (count == 1) {
+ delete(procIds[offset]);
+ return;
+ }
+ List<Mutation> mutations = new ArrayList<>(count);
+ List<byte[]> rowsToLock = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ long procId = procIds[offset + i];
+ serializeDelete(procId, mutations, rowsToLock);
+ }
+ try {
+ region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+ } catch (IOException e) {
+ LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
+ throw new UncheckedIOException(e);
+ }
+ flusherAndCompactor.onUpdate();
+ }
+
+ @Override
+ public void cleanup() {
+ // actually delete the procedures if it is not the one with the max procedure id.
+ List<Cell> cells = new ArrayList<Cell>();
+ try (RegionScanner scanner =
+ region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER).setReversed(true))) {
+ // skip the row with max procedure id
+ boolean moreRows = scanner.next(cells);
+ if (cells.isEmpty()) {
+ return;
+ }
+ cells.clear();
+ while (moreRows) {
+ moreRows = scanner.next(cells);
+ if (cells.isEmpty()) {
+ continue;
+ }
+ Cell cell = cells.get(0);
+ cells.clear();
+ if (cell.getValueLength() == 0) {
+ region.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to clean up delete procedures", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
new file mode 100644
index 0000000..fc84c27
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.AbstractWALRoller;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * As long as there is no RegionServerServices for the procedure store region, we need implement log
+ * roller logic by our own.
+ * <p/>
+ * We can reuse most of the code for normal wal roller, the only difference is that there is only
+ * one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
+ * procedure store region.
+ */
+@InterfaceAudience.Private
+final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStoreWALRoller.class);
+
+ static final String ROLL_PERIOD_MS_KEY = "hbase.procedure.store.region.walroll.period.ms";
+
+ private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
+
+ private volatile RegionFlusherAndCompactor flusherAndCompactor;
+
+ private final FileSystem fs;
+
+ private final Path walArchiveDir;
+
+ private final Path globalWALArchiveDir;
+
+ private RegionProcedureStoreWALRoller(Configuration conf, Abortable abortable, FileSystem fs,
+ Path walRootDir, Path globalWALRootDir) {
+ super("RegionProcedureStoreWALRoller", conf, abortable);
+ this.fs = fs;
+ this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
+ this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
+ }
+
+ @Override
+ protected void afterRoll(WAL wal) {
+ // move the archived WAL files to the global archive path
+ try {
+ if (!fs.exists(globalWALArchiveDir) && !fs.mkdirs(globalWALArchiveDir)) {
+ LOG.warn("Failed to create global archive dir {}", globalWALArchiveDir);
+ return;
+ }
+ FileStatus[] archivedWALFiles = fs.listStatus(walArchiveDir);
+ if (archivedWALFiles == null) {
+ return;
+ }
+ for (FileStatus status : archivedWALFiles) {
+ Path file = status.getPath();
+ Path newFile = new Path(globalWALArchiveDir,
+ file.getName() + MasterProcedureUtil.ARCHIVED_PROC_WAL_SUFFIX);
+ if (fs.rename(file, newFile)) {
+ LOG.info("Successfully moved {} to {}", file, newFile);
+ } else {
+ LOG.warn("Failed to move archived wal from {} to global place {}", file, newFile);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
+ globalWALArchiveDir, e);
+ }
+ }
+
+ @Override
+ protected void scheduleFlush(String encodedRegionName) {
+ RegionFlusherAndCompactor flusher = this.flusherAndCompactor;
+ if (flusher != null) {
+ flusher.requestFlush();
+ }
+ }
+
+ void setFlusherAndCompactor(RegionFlusherAndCompactor flusherAndCompactor) {
+ this.flusherAndCompactor = flusherAndCompactor;
+ }
+
+ static RegionProcedureStoreWALRoller create(Configuration conf, Abortable abortable,
+ FileSystem fs, Path walRootDir, Path globalWALRootDir) {
+ // we can not run with wal disabled, so force set it to true.
+ conf.setBoolean(WALFactory.WAL_ENABLED, true);
+ // we do not need this feature, so force disable it.
+ conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
+ conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
+ return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e7b3b23..067ce2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
+import java.util.Objects;
import java.util.Optional;
import java.util.RandomAccess;
import java.util.Set;
@@ -71,6 +72,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -160,8 +162,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.hbase.util.CompressionTest;
-import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
@@ -236,12 +236,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize";
public static final int DEFAULT_MAX_CELL_SIZE = 10485760;
- /**
- * This is the global default value for durability. All tables/mutations not
- * defining a durability or using USE_DEFAULT will default to this value.
- */
- private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
-
public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE =
"hbase.regionserver.minibatch.size";
public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000;
@@ -249,6 +243,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync";
public static final boolean DEFAULT_WAL_HSYNC = false;
+ /**
+ * This is for for using HRegion as a local storage, where we may put the recovered edits in a
+ * special place. Once this is set, we will only replay the recovered edits under this directory
+ * and ignore the original replay directory configs.
+ */
+ public static final String SPECIAL_RECOVERED_EDITS_DIR =
+ "hbase.hregion.special.recovered.edits.dir";
+
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
@@ -4556,6 +4558,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return size.getHeapSize() + size.getOffHeapSize() > getMemStoreFlushSize();
}
+ private void deleteRecoveredEdits(FileSystem fs, Iterable<Path> files) throws IOException {
+ for (Path file : files) {
+ if (!fs.delete(file, false)) {
+ LOG.error("Failed delete of {}", file);
+ } else {
+ LOG.debug("Deleted recovered.edits file={}", file);
+ }
+ }
+ }
+
/**
* Read the edits put under this region by wal splitting process. Put
* the recovered edits back up into this region.
@@ -4587,11 +4599,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* the maxSeqId for the store to be applied, else its skipped.
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
- * @throws IOException
*/
- protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
- final CancelableProgressable reporter, final MonitoredTask status)
- throws IOException {
+ @VisibleForTesting
+ long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
+ final CancelableProgressable reporter, final MonitoredTask status) throws IOException {
long minSeqIdForTheRegion = -1;
for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
@@ -4599,63 +4610,74 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
long seqId = minSeqIdForTheRegion;
-
- FileSystem walFS = getWalFileSystem();
- FileSystem rootFS = getFilesystem();
- Path wrongRegionWALDir = FSUtils.getWrongWALRegionDir(conf, getRegionInfo().getTable(),
- getRegionInfo().getEncodedName());
- Path regionWALDir = getWALRegionDir();
- Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), getRegionInfo());
-
- // We made a mistake in HBASE-20734 so we need to do this dirty hack...
- NavigableSet<Path> filesUnderWrongRegionWALDir =
- WALSplitUtil.getSplitEditFilesSorted(walFS, wrongRegionWALDir);
- seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
- filesUnderWrongRegionWALDir, reporter, regionDir));
- // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
- // under the root dir even if walDir is set.
- NavigableSet<Path> filesUnderRootDir = Collections.emptyNavigableSet();
- if (!regionWALDir.equals(regionDir)) {
- filesUnderRootDir = WALSplitUtil.getSplitEditFilesSorted(rootFS, regionDir);
- seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS,
- filesUnderRootDir, reporter, regionDir));
- }
-
- NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionWALDir);
- seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
- files, reporter, regionWALDir));
-
- if (seqId > minSeqIdForTheRegion) {
- // Then we added some edits to memory. Flush and cleanup split edit files.
- internalFlushcache(null, seqId, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
- }
- // Now delete the content of recovered edits. We're done w/ them.
- if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
- // For debugging data loss issues!
- // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
- // column family. Have to fake out file type too by casting our recovered.edits as storefiles
- String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionWALDir).getName();
- Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
- for (Path file : files) {
- fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true));
- }
- getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles);
+ String specialRecoveredEditsDirStr = conf.get(SPECIAL_RECOVERED_EDITS_DIR);
+ if (org.apache.commons.lang3.StringUtils.isBlank(specialRecoveredEditsDirStr)) {
+ FileSystem walFS = getWalFileSystem();
+ FileSystem rootFS = getFilesystem();
+ Path wrongRegionWALDir = FSUtils.getWrongWALRegionDir(conf, getRegionInfo().getTable(),
+ getRegionInfo().getEncodedName());
+ Path regionWALDir = getWALRegionDir();
+ Path regionDir = FSUtils.getRegionDirFromRootDir(FSUtils.getRootDir(conf), getRegionInfo());
+
+ // We made a mistake in HBASE-20734 so we need to do this dirty hack...
+ NavigableSet<Path> filesUnderWrongRegionWALDir =
+ WALSplitUtil.getSplitEditFilesSorted(walFS, wrongRegionWALDir);
+ seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
+ filesUnderWrongRegionWALDir, reporter, regionDir));
+ // This is to ensure backwards compatability with HBASE-20723 where recovered edits can appear
+ // under the root dir even if walDir is set.
+ NavigableSet<Path> filesUnderRootDir = Collections.emptyNavigableSet();
+ if (!regionWALDir.equals(regionDir)) {
+ filesUnderRootDir = WALSplitUtil.getSplitEditFilesSorted(rootFS, regionDir);
+ seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS,
+ filesUnderRootDir, reporter, regionDir));
+ }
+
+ NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionWALDir);
+ seqId = Math.max(seqId,
+ replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS, files, reporter, regionWALDir));
+ if (seqId > minSeqIdForTheRegion) {
+ // Then we added some edits to memory. Flush and cleanup split edit files.
+ internalFlushcache(null, seqId, stores.values(), status, false,
+ FlushLifeCycleTracker.DUMMY);
+ }
+ // Now delete the content of recovered edits. We're done w/ them.
+ if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
+ // For debugging data loss issues!
+ // If this flag is set, make use of the hfile archiving by making recovered.edits a fake
+ // column family. Have to fake out file type too by casting our recovered.edits as
+ // storefiles
+ String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionWALDir).getName();
+ Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
+ for (Path file : files) {
+ fakeStoreFiles.add(new HStoreFile(walFS, file, this.conf, null, null, true));
+ }
+ getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles);
+ } else {
+ deleteRecoveredEdits(walFS, Iterables.concat(files, filesUnderWrongRegionWALDir));
+ deleteRecoveredEdits(rootFS, filesUnderRootDir);
+ }
} else {
- for (Path file : Iterables.concat(files, filesUnderWrongRegionWALDir)) {
- if (!walFS.delete(file, false)) {
- LOG.error("Failed delete of {}", file);
- } else {
- LOG.debug("Deleted recovered.edits file={}", file);
+ Path recoveredEditsDir = new Path(specialRecoveredEditsDirStr);
+ FileSystem fs = recoveredEditsDir.getFileSystem(conf);
+ FileStatus[] files = fs.listStatus(recoveredEditsDir);
+ LOG.debug("Found {} recovered edits file(s) under {}", files == null ? 0 : files.length,
+ recoveredEditsDir);
+ if (files != null) {
+ for (FileStatus file : files) {
+ seqId =
+ Math.max(seqId, replayRecoveredEdits(file.getPath(), maxSeqIdInStores, reporter, fs));
}
}
- for (Path file : filesUnderRootDir) {
- if (!rootFS.delete(file, false)) {
- LOG.error("Failed delete of {}", file);
- } else {
- LOG.debug("Deleted recovered.edits file={}", file);
- }
+ if (seqId > minSeqIdForTheRegion) {
+ // Then we added some edits to memory. Flush and cleanup split edit files.
+ internalFlushcache(null, seqId, stores.values(), status, false,
+ FlushLifeCycleTracker.DUMMY);
}
+ deleteRecoveredEdits(fs,
+ Stream.of(files).map(FileStatus::getPath).collect(Collectors.toList()));
}
+
return seqId;
}
@@ -4720,18 +4742,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return seqid;
}
- /*
+ /**
* @param edits File of recovered edits.
- * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal
- * must be larger than this to be replayed for each store.
- * @param reporter
- * @return the sequence id of the last edit added to this region out of the
- * recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
- * @throws IOException
+ * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal must be larger
+ * than this to be replayed for each store.
+ * @return the sequence id of the last edit added to this region out of the recovered edits log or
+ * <code>minSeqId</code> if nothing added from editlogs.
*/
- private long replayRecoveredEdits(final Path edits,
- Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs)
- throws IOException {
+ private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdInStores,
+ final CancelableProgressable reporter, FileSystem fs) throws IOException {
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
@@ -7100,15 +7119,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param wal shared WAL
* @param initialize - true to initialize the region
* @return new HRegion
- * @throws IOException
*/
public static HRegion createHRegion(final RegionInfo info, final Path rootDir,
- final Configuration conf, final TableDescriptor hTableDescriptor,
- final WAL wal, final boolean initialize)
- throws IOException {
- LOG.info("creating " + info
- + ", tableDescriptor=" + (hTableDescriptor == null? "null": hTableDescriptor) +
- ", regionDir=" + rootDir);
+ final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal,
+ final boolean initialize) throws IOException {
+ LOG.info("creating " + info + ", tableDescriptor=" +
+ (hTableDescriptor == null ? "null" : hTableDescriptor) + ", regionDir=" + rootDir);
createRegionDir(conf, info, rootDir);
FileSystem fs = rootDir.getFileSystem(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
@@ -7120,6 +7136,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
+ * Create a region under the given table directory.
+ */
+ public static HRegion createHRegion(Configuration conf, RegionInfo regionInfo, FileSystem fs,
+ Path tableDir, TableDescriptor tableDesc) throws IOException {
+ LOG.info("Creating {}, tableDescriptor={}, under table dir {}", regionInfo, tableDesc,
+ tableDir);
+ HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, regionInfo);
+ HRegion region = HRegion.newHRegion(tableDir, null, fs, conf, regionInfo, tableDesc, null);
+ return region;
+ }
+
+ /**
* Create the region directory in the filesystem.
*/
public static HRegionFileSystem createRegionDir(Configuration configuration, RegionInfo ri,
@@ -7266,18 +7294,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return new HRegion
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final RegionInfo info, final TableDescriptor htd, final WAL wal,
- final RegionServerServices rsServices, final CancelableProgressable reporter)
- throws IOException {
+ final Path rootDir, final RegionInfo info, final TableDescriptor htd, final WAL wal,
+ final RegionServerServices rsServices, final CancelableProgressable reporter)
+ throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
- return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
+ return openHRegionFromTableDir(conf, fs, tableDir, info, htd, wal, rsServices, reporter);
}
/**
* Open a Region.
* @param conf The Configuration object to use.
* @param fs Filesystem to use
- * @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
* @param wal WAL for region to use. This method will call
@@ -7288,15 +7315,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param reporter An interface we can report progress against.
* @return new HRegion
*/
- public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
- final Path rootDir, final Path tableDir, final RegionInfo info, final TableDescriptor htd,
- final WAL wal, final RegionServerServices rsServices,
- final CancelableProgressable reporter)
- throws IOException {
- if (info == null) throw new NullPointerException("Passed region info is null");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening region: " + info);
- }
+ public static HRegion openHRegionFromTableDir(final Configuration conf, final FileSystem fs,
+ final Path tableDir, final RegionInfo info, final TableDescriptor htd, final WAL wal,
+ final RegionServerServices rsServices, final CancelableProgressable reporter)
+ throws IOException {
+ Objects.requireNonNull(info, "RegionInfo cannot be null");
+ LOG.debug("Opening region: {}", info);
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
return r.openHRegion(reporter);
}
@@ -8819,4 +8843,4 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
}
-}
+}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
index 272925c..f5049c9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
@@ -18,23 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.HasThread;
+import java.util.Map;
+import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,166 +38,21 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
*/
@InterfaceAudience.Private
@VisibleForTesting
-public class LogRoller extends HasThread implements Closeable {
+public class LogRoller extends AbstractWALRoller<RegionServerServices> {
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
- private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
- protected final RegionServerServices services;
- private volatile long lastRollTime = System.currentTimeMillis();
- // Period to roll log.
- private final long rollPeriod;
- private final int threadWakeFrequency;
- // The interval to check low replication on hlog's pipeline
- private long checkLowReplicationInterval;
-
- private volatile boolean running = true;
-
- public void addWAL(WAL wal) {
- // check without lock first
- if (walNeedsRoll.containsKey(wal)) {
- return;
- }
- // this is to avoid race between addWAL and requestRollAll.
- synchronized (this) {
- if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
- wal.registerWALActionsListener(new WALActionsListener() {
- @Override
- public void logRollRequested(WALActionsListener.RollRequestReason reason) {
- // TODO logs will contend with each other here, replace with e.g. DelayedQueue
- synchronized (LogRoller.this) {
- walNeedsRoll.put(wal, Boolean.TRUE);
- LogRoller.this.notifyAll();
- }
- }
- });
- }
- }
- }
-
- public void requestRollAll() {
- synchronized (this) {
- List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
- for (WAL wal : wals) {
- walNeedsRoll.put(wal, Boolean.TRUE);
- }
- notifyAll();
- }
- }
public LogRoller(RegionServerServices services) {
- super("LogRoller");
- this.services = services;
- this.rollPeriod = this.services.getConfiguration().
- getLong("hbase.regionserver.logroll.period", 3600000);
- this.threadWakeFrequency = this.services.getConfiguration().
- getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
- "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
- }
-
- /**
- * we need to check low replication in period, see HBASE-18132
- */
- private void checkLowReplication(long now) {
- try {
- for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
- WAL wal = entry.getKey();
- boolean needRollAlready = entry.getValue();
- if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
- continue;
- }
- ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
- }
- } catch (Throwable e) {
- LOG.warn("Failed checking low replication", e);
- }
- }
-
- private void abort(String reason, Throwable cause) {
- // close all WALs before calling abort on RS.
- // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
- // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
- // is already broken.
- for (WAL wal : walNeedsRoll.keySet()) {
- // shutdown rather than close here since we are going to abort the RS and the wals need to be
- // split when recovery
- try {
- wal.shutdown();
- } catch (IOException e) {
- LOG.warn("Failed to shutdown wal", e);
- }
- }
- this.services.abort(reason, cause);
+ super("LogRoller", services.getConfiguration(), services);
}
- @Override
- public void run() {
- while (running) {
- boolean periodic = false;
- long now = System.currentTimeMillis();
- checkLowReplication(now);
- periodic = (now - this.lastRollTime) > this.rollPeriod;
- if (periodic) {
- // Time for periodic roll, fall through
- LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
- } else {
- synchronized (this) {
- if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
- // WAL roll requested, fall through
- LOG.debug("WAL roll requested");
- } else {
- try {
- wait(this.threadWakeFrequency);
- } catch (InterruptedException e) {
- // restore the interrupt state
- Thread.currentThread().interrupt();
- }
- // goto the beginning to check whether again whether we should fall through to roll
- // several WALs, and also check whether we should quit.
- continue;
- }
- }
- }
- try {
- this.lastRollTime = System.currentTimeMillis();
- for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
- .hasNext();) {
- Entry<WAL, Boolean> entry = iter.next();
- WAL wal = entry.getKey();
- // reset the flag in front to avoid missing roll request before we return from rollWriter.
- walNeedsRoll.put(wal, Boolean.FALSE);
- // Force the roll if the logroll.period is elapsed or if a roll was requested.
- // The returned value is an array of actual region names.
- byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
- if (regionsToFlush != null) {
- for (byte[] r : regionsToFlush) {
- scheduleFlush(Bytes.toString(r));
- }
- }
- }
- } catch (FailedLogCloseException | ConnectException e) {
- abort("Failed log close in log roller", e);
- } catch (IOException ex) {
- // Abort if we get here. We probably won't recover an IOE. HBASE-1132
- abort("IOE in log roller",
- ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
- } catch (Exception ex) {
- LOG.error("Log rolling failed", ex);
- abort("Log rolling failed", ex);
- }
- }
- LOG.info("LogRoller exiting.");
- }
-
- /**
- * @param encodedRegionName Encoded name of region to flush.
- */
- private void scheduleFlush(String encodedRegionName) {
- HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
+ protected void scheduleFlush(String encodedRegionName) {
+ RegionServerServices services = this.abortable;
+ HRegion r = (HRegion) services.getRegion(encodedRegionName);
if (r == null) {
LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
return;
}
- FlushRequester requester = this.services.getFlushRequester();
+ FlushRequester requester = services.getFlushRequester();
if (requester == null) {
LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
encodedRegionName, r);
@@ -221,18 +62,8 @@ public class LogRoller extends HasThread implements Closeable {
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
}
- /**
- * For testing only
- * @return true if all WAL roll finished
- */
@VisibleForTesting
- public boolean walRollFinished() {
- return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
- }
-
- @Override
- public void close() {
- running = false;
- interrupt();
+ Map<WAL, Boolean> getWalNeedsRoll() {
+ return this.walNeedsRoll;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 6e8443f..422c217 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -132,6 +132,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
+ public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
+
/**
* file system instance
*/
@@ -434,8 +436,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
this.logrollsize = (long)(this.blocksize * multiplier);
- this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
- Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
+ this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
similarity index 76%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
index 272925c..cd4dc52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver;
+package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
@@ -27,35 +26,39 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
/**
* Runs periodically to determine if the WAL should be rolled.
- *
- * NOTE: This class extends Thread rather than Chore because the sleep time
- * can be interrupted when there is something to do, rather than the Chore
- * sleep time which is invariant.
- *
+ * <p/>
+ * NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
+ * there is something to do, rather than the Chore sleep time which is invariant.
+ * <p/>
+ * The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a
+ * region server but we still want to roll its WAL.
+ * <p/>
* TODO: change to a pool of threads
*/
@InterfaceAudience.Private
-@VisibleForTesting
-public class LogRoller extends HasThread implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
- private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
- protected final RegionServerServices services;
+public abstract class AbstractWALRoller<T extends Abortable> extends HasThread
+ implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class);
+
+ protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
+
+ protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
+ protected final T abortable;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
private final long rollPeriod;
@@ -77,9 +80,9 @@ public class LogRoller extends HasThread implements Closeable {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
- synchronized (LogRoller.this) {
+ synchronized (AbstractWALRoller.this) {
walNeedsRoll.put(wal, Boolean.TRUE);
- LogRoller.this.notifyAll();
+ AbstractWALRoller.this.notifyAll();
}
}
});
@@ -97,15 +100,13 @@ public class LogRoller extends HasThread implements Closeable {
}
}
- public LogRoller(RegionServerServices services) {
- super("LogRoller");
- this.services = services;
- this.rollPeriod = this.services.getConfiguration().
- getLong("hbase.regionserver.logroll.period", 3600000);
- this.threadWakeFrequency = this.services.getConfiguration().
- getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
- "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
+ protected AbstractWALRoller(String name, Configuration conf, T abortable) {
+ super(name);
+ this.abortable = abortable;
+ this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000);
+ this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.checkLowReplicationInterval =
+ conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
}
/**
@@ -140,7 +141,7 @@ public class LogRoller extends HasThread implements Closeable {
LOG.warn("Failed to shutdown wal", e);
}
}
- this.services.abort(reason, cause);
+ abortable.abort(reason, cause);
}
@Override
@@ -174,7 +175,7 @@ public class LogRoller extends HasThread implements Closeable {
try {
this.lastRollTime = System.currentTimeMillis();
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
- .hasNext();) {
+ .hasNext();) {
Entry<WAL, Boolean> entry = iter.next();
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
@@ -187,11 +188,12 @@ public class LogRoller extends HasThread implements Closeable {
scheduleFlush(Bytes.toString(r));
}
}
+ afterRoll(wal);
}
} catch (FailedLogCloseException | ConnectException e) {
abort("Failed log close in log roller", e);
} catch (IOException ex) {
- // Abort if we get here. We probably won't recover an IOE. HBASE-1132
+ // Abort if we get here. We probably won't recover an IOE. HBASE-1132
abort("IOE in log roller",
ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
} catch (Exception ex) {
@@ -203,31 +205,35 @@ public class LogRoller extends HasThread implements Closeable {
}
/**
+ * Called after we finish rolling the give {@code wal}.
+ */
+ protected void afterRoll(WAL wal) {
+ }
+
+ /**
* @param encodedRegionName Encoded name of region to flush.
*/
- private void scheduleFlush(String encodedRegionName) {
- HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
- if (r == null) {
- LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
- return;
- }
- FlushRequester requester = this.services.getFlushRequester();
- if (requester == null) {
- LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
- encodedRegionName, r);
- return;
- }
- // force flushing all stores to clean old logs
- requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
+ protected abstract void scheduleFlush(String encodedRegionName);
+
+ private boolean isWaiting() {
+ Thread.State state = getThread().getState();
+ return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
}
/**
- * For testing only
* @return true if all WAL roll finished
*/
- @VisibleForTesting
public boolean walRollFinished() {
- return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
+ return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
+ }
+
+ /**
+ * Wait until all wals have been rolled after calling {@link #requestRollAll()}.
+ */
+ public void waitUntilWalRollFinished() throws InterruptedException {
+ while (!walRollFinished()) {
+ Thread.sleep(100);
+ }
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index dab65f3..30bb77e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -83,6 +83,8 @@ public class WALFactory {
public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
+ public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
+
final String factoryId;
private final WALProvider provider;
// The meta updates are written to a different wal. If this
@@ -194,7 +196,7 @@ public class WALFactory {
this.conf = conf;
this.factoryId = factoryId;
// end required early initialization
- if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
+ if (conf.getBoolean(WAL_ENABLED, true)) {
provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
} else {
// special handling of existing configuration behavior.
diff --git a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
index ea252cf..5000482 100644
--- a/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
+++ b/hbase-server/src/main/resources/hbase-webapps/master/procedures.jsp
@@ -30,21 +30,12 @@
import="org.apache.hadoop.hbase.procedure2.LockedResource"
import="org.apache.hadoop.hbase.procedure2.Procedure"
import="org.apache.hadoop.hbase.procedure2.ProcedureExecutor"
- import="org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFile"
- import="org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore"
import="org.apache.hadoop.hbase.procedure2.util.StringUtils"
import="org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix"
%>
<%
HMaster master = (HMaster) getServletContext().getAttribute(HMaster.MASTER);
ProcedureExecutor<MasterProcedureEnv> procExecutor = master.getMasterProcedureExecutor();
- WALProcedureStore walStore = master.getWalProcedureStore();
-
- ArrayList<WALProcedureStore.SyncMetrics> syncMetricsBuff = walStore.getSyncMetrics();
- long millisToNextRoll = walStore.getMillisToNextPeriodicRoll();
- long millisFromLastRoll = walStore.getMillisFromLastRoll();
- ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
- Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
List<Procedure<MasterProcedureEnv>> procedures = procExecutor.getProcedures();
Collections.sort(procedures, new Comparator<Procedure>() {
@Override
@@ -159,109 +150,4 @@
<% } %>
</div>
<br />
-<div class="container-fluid content">
- <div class="row">
- <div class="page-header">
- <h2>Procedure WAL State</h2>
- </div>
- </div>
- <div class="tabbable">
- <ul class="nav nav-pills">
- <li class="active">
- <a href="#tab_WALFiles" data-toggle="tab">WAL files</a>
- </li>
- <li class="">
- <a href="#tab_WALFilesCorrupted" data-toggle="tab">Corrupted WAL files</a>
- </li>
- <li class="">
- <a href="#tab_WALRollTime" data-toggle="tab">WAL roll time</a>
- </li>
- <li class="">
- <a href="#tab_SyncStats" data-toggle="tab">Sync stats</a>
- </li>
- </ul>
- <div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
- <div class="tab-pane active" id="tab_WALFiles">
- <% if (procedureWALFiles != null && procedureWALFiles.size() > 0) { %>
- <table class="table table-striped">
- <tr>
- <th>LogID</th>
- <th>Size</th>
- <th>Timestamp</th>
- <th>Path</th>
- </tr>
- <% for (int i = procedureWALFiles.size() - 1; i >= 0; --i) { %>
- <% ProcedureWALFile pwf = procedureWALFiles.get(i); %>
- <tr>
- <td> <%= pwf.getLogId() %></td>
- <td> <%= TraditionalBinaryPrefix.long2String(pwf.getSize(), "B", 1) %> </td>
- <td> <%= new Date(pwf.getTimestamp()) %> </td>
- <td> <%= escapeXml(pwf.toString()) %> </td>
- </tr>
- <% } %>
- </table>
- <% } else {%>
- <p> No WAL files</p>
- <% } %>
- </div>
- <div class="tab-pane" id="tab_WALFilesCorrupted">
- <% if (corruptedWALFiles != null && corruptedWALFiles.size() > 0) { %>
- <table class="table table-striped">
- <tr>
- <th>LogID</th>
- <th>Size</th>
- <th>Timestamp</th>
- <th>Path</th>
- </tr>
- <% for (ProcedureWALFile cwf:corruptedWALFiles) { %>
- <tr>
- <td> <%= cwf.getLogId() %></td>
- <td> <%= TraditionalBinaryPrefix.long2String(cwf.getSize(), "B", 1) %> </td>
- <td> <%= new Date(cwf.getTimestamp()) %> </td>
- <td> <%= escapeXml(cwf.toString()) %> </td>
- </tr>
- <% } %>
- </table>
- <% } else {%>
- <p> No corrupted WAL files</p>
- <% } %>
- </div>
- <div class="tab-pane" id="tab_WALRollTime">
- <table class="table table-striped">
- <tr>
- <th> Milliseconds to next roll</th>
- <th> Milliseconds from last roll</th>
- </tr>
- <tr>
- <td> <%=StringUtils.humanTimeDiff(millisToNextRoll) %></td>
- <td> <%=StringUtils.humanTimeDiff(millisFromLastRoll) %></td>
- </tr>
- </table>
- </div>
- <div class="tab-pane" id="tab_SyncStats">
- <table class="table table-striped">
- <tr>
- <th> Time</th>
- <th> Sync Wait</th>
- <th> Last num of synced entries</th>
- <th> Total Synced</th>
- <th> Synced per second</th>
- </tr>
- <% for (int i = syncMetricsBuff.size() - 1; i >= 0; --i) { %>
- <% WALProcedureStore.SyncMetrics syncMetrics = syncMetricsBuff.get(i); %>
- <tr>
- <td> <%= new Date(syncMetrics.getTimestamp()) %></td>
- <td> <%= StringUtils.humanTimeDiff(syncMetrics.getSyncWaitMs()) %></td>
- <td> <%= syncMetrics.getSyncedEntries() %></td>
- <td> <%= TraditionalBinaryPrefix.long2String(syncMetrics.getTotalSyncedBytes(), "B", 1) %></td>
- <td> <%= TraditionalBinaryPrefix.long2String((long)syncMetrics.getSyncedPerSec(), "B", 1) %></td>
- </tr>
- <%} %>
- </table>
- </div>
- </div>
- </div>
-</div>
-<br />
-
<jsp:include page="footer.jsp" />
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
index 0a57dba..9dbc587 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
@@ -126,7 +126,7 @@ public class TestLoadProcedureError {
ARRIVE.await();
FAIL_LOAD = true;
// do not persist the store tracker
- UTIL.getMiniHBaseCluster().getMaster().getWalProcedureStore().stop(true);
+ UTIL.getMiniHBaseCluster().getMaster().getProcedureStore().stop(true);
UTIL.getMiniHBaseCluster().getMaster().abort("for testing");
waitNoMaster();
// restart twice, and should fail twice, as we will throw an exception in the afterReplay above
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
index 395095e..10d56b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetricsWrapper.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.master;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import java.util.AbstractMap.SimpleImmutableEntry;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -89,7 +89,9 @@ public class TestMasterMetricsWrapper {
}
assertEquals(regionServerCount - 1, info.getNumRegionServers());
assertEquals(1, info.getNumDeadRegionServers());
- assertEquals(1, info.getNumWALFiles());
+ // now we do not expose this information as WALProcedureStore is not the only ProcedureStore
+ // implementation any more.
+ assertEquals(0, info.getNumWALFiles());
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
deleted file mode 100644
index 37bc6de..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.master.procedure;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.StartMiniClusterOption;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
-import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
-import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MasterTests;
-import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({MasterTests.class, LargeTests.class})
-@Ignore
-public class TestMasterProcedureWalLease {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMasterProcedureWalLease.class);
-
- private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureWalLease.class);
-
- @Rule
- public TestName name = new TestName();
-
- protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
- private static void setupConf(Configuration conf) {
- // don't waste time retrying with the roll, the test is already slow enough.
- conf.setInt(WALProcedureStore.MAX_RETRIES_BEFORE_ROLL_CONF_KEY, 1);
- conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 0);
- conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 1);
- conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 1);
- }
-
- @Before
- public void setup() throws Exception {
- setupConf(UTIL.getConfiguration());
- StartMiniClusterOption option = StartMiniClusterOption.builder()
- .numMasters(2).numRegionServers(3).numDataNodes(3).build();
- UTIL.startMiniCluster(option);
- }
-
- @After
- public void tearDown() throws Exception {
- try {
- UTIL.shutdownMiniCluster();
- } catch (Exception e) {
- LOG.warn("failure shutting down cluster", e);
- }
- }
-
- @Test
- public void testWalRecoverLease() throws Exception {
- final ProcedureStore masterStore = getMasterProcedureExecutor().getStore();
- assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore);
-
- HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
- // Abort Latch for the master store
- final CountDownLatch masterStoreAbort = new CountDownLatch(1);
- masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
- @Override
- public void postSync() {}
-
- @Override
- public void abortProcess() {
- LOG.debug("Abort store of Master");
- masterStoreAbort.countDown();
- }
- });
-
- // startup a fake master the new WAL store will take the lease
- // and the active master should abort.
- HMaster backupMaster3 = Mockito.mock(HMaster.class);
- Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
- Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
- final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
- ((WALProcedureStore)masterStore).getWALDir(),
- null,
- new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
- // Abort Latch for the test store
- final CountDownLatch backupStore3Abort = new CountDownLatch(1);
- backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
- @Override
- public void postSync() {}
-
- @Override
- public void abortProcess() {
- LOG.debug("Abort store of backupMaster3");
- backupStore3Abort.countDown();
- backupStore3.stop(true);
- }
- });
- backupStore3.start(1);
- backupStore3.recoverLease();
-
- // Try to trigger a command on the master (WAL lease expired on the active one)
- TableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf(name.getMethodName()), "f");
- RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, null);
- LOG.debug("submit proc");
- try {
- getMasterProcedureExecutor().submitProcedure(
- new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
- fail("expected RuntimeException 'sync aborted'");
- } catch (RuntimeException e) {
- LOG.info("got " + e.getMessage());
- }
- LOG.debug("wait master store abort");
- masterStoreAbort.await();
-
- // Now the real backup master should start up
- LOG.debug("wait backup master to startup");
- MasterProcedureTestingUtility.waitBackupMaster(UTIL, firstMaster);
- assertEquals(true, firstMaster.isStopped());
-
- // wait the store in here to abort (the test will fail due to timeout if it doesn't)
- LOG.debug("wait the store to abort");
- backupStore3.getStoreTracker().setDeleted(1, false);
- try {
- backupStore3.delete(1);
- fail("expected RuntimeException 'sync aborted'");
- } catch (RuntimeException e) {
- LOG.info("got " + e.getMessage());
- }
- backupStore3Abort.await();
- }
-
- /**
- * Tests proper fencing in case the current WAL store is fenced
- */
- @Test
- public void testWALfencingWithoutWALRolling() throws IOException {
- testWALfencing(false);
- }
-
- /**
- * Tests proper fencing in case the current WAL store does not receive writes until after the
- * new WAL does a couple of WAL rolls.
- */
- @Test
- public void testWALfencingWithWALRolling() throws IOException {
- testWALfencing(true);
- }
-
- public void testWALfencing(boolean walRolls) throws IOException {
- final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
- assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
-
- HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
-
- // cause WAL rolling after a delete in WAL:
- firstMaster.getConfiguration().setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 1);
-
- HMaster backupMaster3 = Mockito.mock(HMaster.class);
- Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
- Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
- final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
- ((WALProcedureStore)procStore).getWALDir(),
- null,
- new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
-
- // start a second store which should fence the first one out
- LOG.info("Starting new WALProcedureStore");
- procStore2.start(1);
- procStore2.recoverLease();
-
- // before writing back to the WAL store, optionally do a couple of WAL rolls (which causes
- // to delete the old WAL files).
- if (walRolls) {
- LOG.info("Inserting into second WALProcedureStore, causing WAL rolls");
- for (int i = 0; i < 512; i++) {
- // insert something to the second store then delete it, causing a WAL roll(s)
- Procedure proc2 = new TestProcedure(i);
- procStore2.insert(proc2, null);
- procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later
- }
- }
-
- // Now, insert something to the first store, should fail.
- // If the store does a WAL roll and continue with another logId without checking higher logIds
- // it will incorrectly succeed.
- LOG.info("Inserting into first WALProcedureStore");
- try {
- procStore.insert(new TestProcedure(11), null);
- fail("Inserting into Procedure Store should have failed");
- } catch (Exception ex) {
- LOG.info("Received expected exception", ex);
- }
- }
-
- // ==========================================================================
- // Helpers
- // ==========================================================================
- private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
- return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
new file mode 100644
index 0000000..d550e7f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestHelper.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
+
+final class RegionProcedureStoreTestHelper {
+
+ private RegionProcedureStoreTestHelper() {
+ }
+
+ static RegionProcedureStore createStore(Configuration conf, ProcedureLoader loader)
+ throws IOException {
+ Server server = mock(Server.class);
+ when(server.getConfiguration()).thenReturn(conf);
+ when(server.getServerName())
+ .thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
+ RegionProcedureStore store = new RegionProcedureStore(server, new LeaseRecovery() {
+
+ @Override
+ public void recoverFileLease(FileSystem fs, Path path) throws IOException {
+ }
+ });
+ store.start(1);
+ store.recoverLease();
+ store.load(loader);
+ return store;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestProcedure.java
new file mode 100644
index 0000000..f81d193
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreTestProcedure.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+public class RegionProcedureStoreTestProcedure extends NoopProcedure<Void> {
+ private static long SEQ_ID = 0;
+
+ public RegionProcedureStoreTestProcedure() {
+ setProcId(++SEQ_ID);
+ }
+
+ @Override
+ protected Procedure<Void>[] execute(Void env) {
+ return null;
+ }
+
+ @Override
+ protected void rollback(Void env) {
+ }
+
+ @Override
+ protected boolean abort(Void env) {
+ return false;
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ long procId = getProcId();
+ if (procId % 2 == 0) {
+ Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
+ serializer.serialize(builder.build());
+ }
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
+ long procId = getProcId();
+ if (procId % 2 == 0) {
+ Int64Value value = serializer.deserialize(Int64Value.class);
+ assertEquals(procId, value.getValue());
+ }
+ }
+
+ public void setParent(Procedure<?> proc) {
+ super.setParentProcId(proc.getProcId());
+ }
+
+ public void finish() {
+ setState(ProcedureState.SUCCESS);
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
new file mode 100644
index 0000000..b3daa89
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStore {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionProcedureStore.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
+
+ private HBaseCommonTestingUtility htu;
+
+ private RegionProcedureStore store;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+ Path testDir = htu.getDataTestDir();
+ CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
+ store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ store.stop(true);
+ htu.cleanupTestDir();
+ }
+
+ private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
+ LOG.debug("expected: " + procIds);
+ LoadCounter loader = new LoadCounter();
+ ProcedureTestingUtility.storeRestart(store, true, loader);
+ assertEquals(procIds.size(), loader.getLoadedCount());
+ assertEquals(0, loader.getCorruptedCount());
+ }
+
+ @Test
+ public void testLoad() throws Exception {
+ Set<Long> procIds = new HashSet<>();
+
+ // Insert something in the log
+ RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
+ procIds.add(proc1.getProcId());
+ store.insert(proc1, null);
+
+ RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
+ RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
+ proc3.setParent(proc2);
+ RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
+ proc4.setParent(proc2);
+
+ procIds.add(proc2.getProcId());
+ procIds.add(proc3.getProcId());
+ procIds.add(proc4.getProcId());
+ store.insert(proc2, new Procedure[] { proc3, proc4 });
+
+ // Verify that everything is there
+ verifyProcIdsOnRestart(procIds);
+
+ // Update and delete something
+ proc1.finish();
+ store.update(proc1);
+ proc4.finish();
+ store.update(proc4);
+ store.delete(proc4.getProcId());
+ procIds.remove(proc4.getProcId());
+
+ // Verify that everything is there
+ verifyProcIdsOnRestart(procIds);
+ }
+
+ @Test
+ public void testCleanup() throws Exception {
+ RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
+ store.insert(proc1, null);
+ RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
+ store.insert(proc2, null);
+ RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
+ store.insert(proc3, null);
+ LoadCounter loader = new LoadCounter();
+ store.load(loader);
+ assertEquals(proc3.getProcId(), loader.getMaxProcId());
+ assertEquals(3, loader.getRunnableCount());
+
+ store.delete(proc3.getProcId());
+ store.delete(proc2.getProcId());
+ loader = new LoadCounter();
+ store.load(loader);
+ assertEquals(proc3.getProcId(), loader.getMaxProcId());
+ assertEquals(1, loader.getRunnableCount());
+
+ // the row should still be there
+ assertTrue(store.region
+ .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
+ assertTrue(store.region
+ .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
+
+ // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
+ // id
+ store.cleanup();
+ assertTrue(store.region
+ .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
+ assertFalse(store.region
+ .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
+
+ RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
+ store.insert(proc4, null);
+ store.cleanup();
+ // proc3 should also be deleted as now proc4 holds the max proc id
+ assertFalse(store.region
+ .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
new file mode 100644
index 0000000..475ae59
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreMigration.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@SuppressWarnings("deprecation")
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStoreMigration {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionProcedureStoreMigration.class);
+
+ private HBaseCommonTestingUtility htu;
+
+ private RegionProcedureStore store;
+
+ private WALProcedureStore walStore;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ Configuration conf = htu.getConfiguration();
+ conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+ Path testDir = htu.getDataTestDir();
+ CommonFSUtils.setWALRootDir(conf, testDir);
+ walStore = new WALProcedureStore(conf, new LeaseRecovery() {
+
+ @Override
+ public void recoverFileLease(FileSystem fs, Path path) throws IOException {
+ }
+ });
+ walStore.start(1);
+ walStore.recoverLease();
+ walStore.load(new LoadCounter());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (store != null) {
+ store.stop(true);
+ }
+ walStore.stop(true);
+ htu.cleanupTestDir();
+ }
+
+ @Test
+ public void test() throws IOException {
+ List<RegionProcedureStoreTestProcedure> procs = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
+ walStore.insert(proc, null);
+ procs.add(proc);
+ }
+ for (int i = 5; i < 10; i++) {
+ walStore.delete(procs.get(i).getProcId());
+ }
+ walStore.stop(true);
+ SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
+ new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
+ MutableLong maxProcIdSet = new MutableLong(0);
+ store =
+ RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new ProcedureLoader() {
+
+ @Override
+ public void setMaxProcId(long maxProcId) {
+ maxProcIdSet.setValue(maxProcId);
+ }
+
+ @Override
+ public void load(ProcedureIterator procIter) throws IOException {
+ while (procIter.hasNext()) {
+ RegionProcedureStoreTestProcedure proc =
+ (RegionProcedureStoreTestProcedure) procIter.next();
+ loadedProcs.add(proc);
+ }
+ }
+
+ @Override
+ public void handleCorrupted(ProcedureIterator procIter) throws IOException {
+ if (procIter.hasNext()) {
+ fail("Found corrupted procedures");
+ }
+ }
+ });
+ assertEquals(10, maxProcIdSet.longValue());
+ assertEquals(5, loadedProcs.size());
+ int procId = 1;
+ for (RegionProcedureStoreTestProcedure proc : loadedProcs) {
+ assertEquals(procId, proc.getProcId());
+ procId++;
+ }
+ Path testDir = htu.getDataTestDir();
+ FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
+ Path oldProcWALDir = new Path(testDir, WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
+ // make sure the old proc wal directory has been deleted.
+ assertFalse(fs.exists(oldProcWALDir));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
new file mode 100644
index 0000000..826c763
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStoreWALCleaner.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
+import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestRegionProcedureStoreWALCleaner {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionProcedureStoreWALCleaner.class);
+
+ private HBaseCommonTestingUtility htu;
+
+ private FileSystem fs;
+
+ private RegionProcedureStore store;
+
+ private ChoreService choreService;
+
+ private DirScanPool dirScanPool;
+
+ private LogCleaner logCleaner;
+
+ private Path globalWALArchiveDir;
+
+ @Before
+ public void setUp() throws IOException {
+ htu = new HBaseCommonTestingUtility();
+ Configuration conf = htu.getConfiguration();
+ conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
+ Path testDir = htu.getDataTestDir();
+ fs = testDir.getFileSystem(conf);
+ CommonFSUtils.setWALRootDir(conf, testDir);
+ globalWALArchiveDir = new Path(testDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ choreService = new ChoreService("Region-Procedure-Store");
+ dirScanPool = new DirScanPool(conf);
+ conf.setLong(TimeToLiveProcedureWALCleaner.TTL_CONF_KEY, 5000);
+ conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 1000);
+ logCleaner = new LogCleaner(1000, new Stoppable() {
+
+ private volatile boolean stopped = false;
+
+ @Override
+ public void stop(String why) {
+ stopped = true;
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+ }, conf, fs, globalWALArchiveDir, dirScanPool);
+ choreService.scheduleChore(logCleaner);
+ store = RegionProcedureStoreTestHelper.createStore(conf, new LoadCounter());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ store.stop(true);
+ logCleaner.cancel();
+ dirScanPool.shutdownNow();
+ choreService.shutdown();
+ htu.cleanupTestDir();
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
+ store.insert(proc, null);
+ store.region.flush(true);
+ // no archived wal files yet
+ assertFalse(fs.exists(globalWALArchiveDir));
+ store.walRoller.requestRollAll();
+ store.walRoller.waitUntilWalRollFinished();
+ // should have one
+ FileStatus[] files = fs.listStatus(globalWALArchiveDir);
+ assertEquals(1, files.length);
+ Thread.sleep(2000);
+ // should still be there
+ assertTrue(fs.exists(files[0].getPath()));
+ Thread.sleep(6000);
+ // should have been cleaned
+ assertEquals(0, fs.listStatus(globalWALArchiveDir).length);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java
index c4571ac..208fa98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerCrashDisableWAL.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -59,7 +60,7 @@ public class TestRegionServerCrashDisableWAL {
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
- UTIL.getConfiguration().setBoolean("hbase.regionserver.hlog.enabled", false);
+ UTIL.getConfiguration().setBoolean(WALFactory.WAL_ENABLED, false);
UTIL.startMiniCluster(2);
UTIL.createTable(TABLE_NAME, CF);
UTIL.waitTableAvailable(TABLE_NAME);