You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2018/12/07 22:28:55 UTC

[12/51] [abbrv] hbase git commit: HBASE-21490 WALProcedure may remove proc wal files still with active procedures

HBASE-21490 WALProcedure may remove proc wal files still with active procedures

Signed-off-by: Allan Yang <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/405bf5e6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/405bf5e6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/405bf5e6

Branch: refs/heads/HBASE-20952
Commit: 405bf5e6383a09f435baadbac6c389e9f6c43ac6
Parents: 83dc38a
Author: Duo Zhang <zh...@apache.org>
Authored: Mon Nov 19 11:03:52 2018 +0800
Committer: stack <st...@apache.org>
Committed: Mon Nov 19 08:21:28 2018 -0800

----------------------------------------------------------------------
 .../procedure2/store/ProcedureStoreTracker.java |   3 +-
 .../store/wal/ProcedureWALFormat.java           |  33 ++--
 .../procedure2/store/wal/WALProcedureStore.java |  40 +++--
 .../hbase/master/TestLoadProcedureError.java    | 150 +++++++++++++++++++
 4 files changed, 192 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/405bf5e6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 25c9427..7d430d6 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -274,7 +274,8 @@ public class ProcedureStoreTracker {
     this.keepDeletes = false;
     this.partial = false;
     this.map.clear();
-    resetModified();
+    minModifiedProcId = Long.MAX_VALUE;
+    maxModifiedProcId = Long.MIN_VALUE;
   }
 
   public boolean isModified(long procId) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/405bf5e6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 179c740..9686593 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -88,27 +88,24 @@ public final class ProcedureWALFormat {
       Loader loader) throws IOException {
     ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader);
     tracker.setKeepDeletes(true);
-    try {
-      // Ignore the last log which is current active log.
-      while (logs.hasNext()) {
-        ProcedureWALFile log = logs.next();
-        log.open();
-        try {
-          reader.read(log);
-        } finally {
-          log.close();
-        }
+    // Ignore the last log which is current active log.
+    while (logs.hasNext()) {
+      ProcedureWALFile log = logs.next();
+      log.open();
+      try {
+        reader.read(log);
+      } finally {
+        log.close();
       }
-      reader.finish();
+    }
+    reader.finish();
 
-      // The tracker is now updated with all the procedures read from the logs
-      if (tracker.isPartial()) {
-        tracker.setPartialFlag(false);
-      }
-      tracker.resetModified();
-    } finally {
-      tracker.setKeepDeletes(false);
+    // The tracker is now updated with all the procedures read from the logs
+    if (tracker.isPartial()) {
+      tracker.setPartialFlag(false);
     }
+    tracker.resetModified();
+    tracker.setKeepDeletes(false);
   }
 
   public static void writeHeader(OutputStream stream, ProcedureWALHeader header)

http://git-wip-us.apache.org/repos/asf/hbase/blob/405bf5e6/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index dbab6b7..82dc9df 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -448,13 +448,14 @@ public class WALProcedureStore extends ProcedureStoreBase {
     lock.lock();
     try {
       if (logs.isEmpty()) {
-        throw new RuntimeException("recoverLease() must be called before loading data");
+        throw new IllegalStateException("recoverLease() must be called before loading data");
       }
 
       // Nothing to do, If we have only the current log.
       if (logs.size() == 1) {
         LOG.debug("No state logs to replay.");
         loader.setMaxProcId(0);
+        loading.set(false);
         return;
       }
 
@@ -488,15 +489,20 @@ public class WALProcedureStore extends ProcedureStoreBase {
           // TODO: sideline corrupted log
         }
       });
+      // if we fail when loading, we should prevent persisting the storeTracker later in the stop
+      // method. As it may happen that, we have finished constructing the modified and deleted bits,
+      // but before we call resetModified, we fail, then if we persist the storeTracker then when
+      // restarting, we will consider that all procedures have been included in this file and delete
+      // all the previous files. Obviously this not correct. So here we will only set loading to
+      // false when we successfully loaded all the procedures, and when closing we will skip
+      // persisting the store tracker. And also, this will prevent the sync thread to do
+      // periodicRoll, where we may also clean old logs.
+      loading.set(false);
+      // try to cleanup inactive wals and complete the operation
+      buildHoldingCleanupTracker();
+      tryCleanupLogsOnLoad();
     } finally {
-      try {
-        // try to cleanup inactive wals and complete the operation
-        buildHoldingCleanupTracker();
-        tryCleanupLogsOnLoad();
-        loading.set(false);
-      } finally {
-        lock.unlock();
-      }
+      lock.unlock();
     }
   }
 
@@ -1133,11 +1139,15 @@ public class WALProcedureStore extends ProcedureStoreBase {
 
     try {
       ProcedureWALFile log = logs.getLast();
-      log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
-      log.updateLocalTracker(storeTracker);
-      if (!abort) {
-        long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
-        log.addToSize(trailerSize);
+      // If the loading flag is true, it usually means that we fail when loading procedures, so we
+      // should not persist the store tracker, as its state may not be correct.
+      if (!loading.get()) {
+        log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId());
+        log.updateLocalTracker(storeTracker);
+        if (!abort) {
+          long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker);
+          log.addToSize(trailerSize);
+        }
       }
     } catch (IOException e) {
       LOG.warn("Unable to write the trailer", e);
@@ -1193,7 +1203,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
       if (holdingCleanupTracker.isEmpty()) {
         break;
       }
-      iter.next();
+      tracker = iter.next().getTracker();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/405bf5e6/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
new file mode 100644
index 0000000..0a57dba
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestLoadProcedureError.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertFalse;
+
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * Testcase for HBASE-21490.
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestLoadProcedureError {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestLoadProcedureError.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName NAME = TableName.valueOf("Load");
+
+  private static volatile CountDownLatch ARRIVE;
+
+  private static volatile boolean FINISH_PROC;
+
+  private static volatile boolean FAIL_LOAD;
+
+  public static final class TestProcedure extends NoopProcedure<MasterProcedureEnv>
+      implements TableProcedureInterface {
+
+    @Override
+    protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+        throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
+      if (ARRIVE != null) {
+        ARRIVE.countDown();
+        ARRIVE = null;
+      }
+      if (FINISH_PROC) {
+        return null;
+      }
+      setTimeout(1000);
+      setState(ProcedureState.WAITING_TIMEOUT);
+      throw new ProcedureSuspendedException();
+    }
+
+    @Override
+    protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+      setState(ProcedureState.RUNNABLE);
+      env.getProcedureScheduler().addBack(this);
+      return false;
+    }
+
+    @Override
+    protected void afterReplay(MasterProcedureEnv env) {
+      if (FAIL_LOAD) {
+        throw new RuntimeException("Inject error");
+      }
+    }
+
+    @Override
+    public TableName getTableName() {
+      return NAME;
+    }
+
+    @Override
+    public TableOperationType getTableOperationType() {
+      return TableOperationType.READ;
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  private void waitNoMaster() {
+    UTIL.waitFor(30000, () -> UTIL.getMiniHBaseCluster().getLiveMasterThreads().isEmpty());
+  }
+
+  @Test
+  public void testLoadError() throws Exception {
+    ProcedureExecutor<MasterProcedureEnv> procExec =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    ARRIVE = new CountDownLatch(1);
+    long procId = procExec.submitProcedure(new TestProcedure());
+    ARRIVE.await();
+    FAIL_LOAD = true;
+    // do not persist the store tracker
+    UTIL.getMiniHBaseCluster().getMaster().getWalProcedureStore().stop(true);
+    UTIL.getMiniHBaseCluster().getMaster().abort("for testing");
+    waitNoMaster();
+    // restart twice, and should fail twice, as we will throw an exception in the afterReplay above
+    // in order to reproduce the problem in HBASE-21490 stably, here we will wait until a master is
+    // fully done, before starting the new master, otherwise the new master may start too early and
+    // call recoverLease on the proc wal files and cause we fail to persist the store tracker when
+    // shutting down
+    UTIL.getMiniHBaseCluster().startMaster();
+    waitNoMaster();
+    UTIL.getMiniHBaseCluster().startMaster();
+    waitNoMaster();
+    FAIL_LOAD = false;
+    HMaster master = UTIL.getMiniHBaseCluster().startMaster().getMaster();
+    UTIL.waitFor(30000, () -> master.isActiveMaster() && master.isInitialized());
+    // assert the procedure is still there and not finished yet
+    TestProcedure proc = (TestProcedure) master.getMasterProcedureExecutor().getProcedure(procId);
+    assertFalse(proc.isFinished());
+    FINISH_PROC = true;
+    UTIL.waitFor(30000, () -> proc.isFinished());
+  }
+}