You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/01/23 02:16:31 UTC

hbase git commit: HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed

Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 aa0e492f8 -> 20d5577d2


HBASE-15100 Master WALProcs are deleted out of order ending up with older wals not removed


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/20d5577d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/20d5577d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/20d5577d

Branch: refs/heads/branch-1.1
Commit: 20d5577d2e1b8e395e66be88a66a70bec8665f4f
Parents: aa0e492
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Fri Jan 22 17:12:18 2016 -0800
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Fri Jan 22 17:12:18 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ProcedureInfo.java  | 20 +++++----
 .../hadoop/hbase/procedure2/Procedure.java      | 17 ++------
 .../hbase/procedure2/ProcedureExecutor.java     |  8 ++--
 .../hbase/procedure2/store/ProcedureStore.java  |  1 +
 .../procedure2/store/ProcedureStoreTracker.java | 10 +++--
 .../store/wal/ProcedureWALFormat.java           |  1 -
 .../store/wal/ProcedureWALFormatReader.java     |  9 ++--
 .../procedure2/store/wal/WALProcedureStore.java | 32 +++++++-------
 .../store/TestProcedureStoreTracker.java        |  4 +-
 .../store/wal/TestWALProcedureStore.java        | 44 ++++++++++++++++++++
 10 files changed, 91 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
index 4a15857..c79ea98 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java
@@ -41,12 +41,12 @@ public class ProcedureInfo {
   private final String procOwner;
   private final ProcedureState procState;
   private final long parentId;
+  private final NonceKey nonceKey;
   private final ForeignExceptionMessage exception;
   private final long lastUpdate;
   private final long startTime;
   private final byte[] result;
 
-  private NonceKey nonceKey = null;
   private long clientAckTime = -1;
 
   public ProcedureInfo(
@@ -55,6 +55,7 @@ public class ProcedureInfo {
       final String procOwner,
       final ProcedureState procState,
       final long parentId,
+      final NonceKey nonceKey,
       final ForeignExceptionMessage exception,
       final long lastUpdate,
       final long startTime,
@@ -64,6 +65,7 @@ public class ProcedureInfo {
     this.procOwner = procOwner;
     this.procState = procState;
     this.parentId = parentId;
+    this.nonceKey = nonceKey;
     this.lastUpdate = lastUpdate;
     this.startTime = startTime;
 
@@ -73,8 +75,8 @@ public class ProcedureInfo {
   }
 
   public ProcedureInfo clone() {
-    return new ProcedureInfo(
-      procId, procName, procOwner, procState, parentId, exception, lastUpdate, startTime, result);
+    return new ProcedureInfo(procId, procName, procOwner, procState, parentId, nonceKey,
+      exception, lastUpdate, startTime, result);
   }
 
   public long getProcId() {
@@ -105,10 +107,6 @@ public class ProcedureInfo {
     return nonceKey;
   }
 
-  public void setNonceKey(NonceKey nonceKey) {
-    this.nonceKey = nonceKey;
-  }
-
   public boolean isFailed() {
     return exception != null;
   }
@@ -216,13 +214,19 @@ public class ProcedureInfo {
    */
   @InterfaceAudience.Private
   public static ProcedureInfo convert(final ProcedureProtos.Procedure procProto) {
+    NonceKey nonceKey = null;
+    if (procProto.getNonce() != HConstants.NO_NONCE) {
+      nonceKey = new NonceKey(procProto.getNonceGroup(), procProto.getNonce());
+    }
+
     return new ProcedureInfo(
       procProto.getProcId(),
       procProto.getClassName(),
       procProto.getOwner(),
       procProto.getState(),
       procProto.hasParentId() ? procProto.getParentId() : -1,
-          procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null,
+      nonceKey,
+      procProto.hasException() ? procProto.getException() : null,
       procProto.getLastUpdate(),
       procProto.getStartTime(),
       procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index b9306c4..304c225 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -600,30 +600,19 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
    */
   @InterfaceAudience.Private
   public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
-    RemoteProcedureException exception;
-
-    if (proc.hasException()) {
-      exception = proc.getException();
-    } else {
-      exception = null;
-    }
-    ProcedureInfo procInfo = new ProcedureInfo(
+    RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
+    return new ProcedureInfo(
       proc.getProcId(),
       proc.toStringClass(),
       proc.getOwner(),
       proc.getState(),
       proc.hasParent() ? proc.getParentProcId() : -1,
+      nonceKey,
       exception != null ?
           RemoteProcedureException.toProto(exception.getSource(), exception.getCause()) : null,
       proc.getLastUpdate(),
       proc.getStartTime(),
       proc.getResult());
-
-    if (nonceKey != null) {
-      procInfo.setNonceKey(nonceKey);
-    }
-
-    return procInfo;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index dd439fd..5277fa2 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -813,13 +813,15 @@ public class ProcedureExecutor<TEnvironment> {
         break;
       }
 
-      if (proc.getProcId() == rootProcId && proc.isSuccess()) {
-        // Finalize the procedure state
+      if (proc.isSuccess()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Procedure completed in " +
               StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
         }
-        procedureFinished(proc);
+        // Finalize the procedure state
+        if (proc.getProcId() == rootProcId) {
+          procedureFinished(proc);
+        }
         break;
       }
     } while (procStack.isFailed());

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
index 83933ff..e49475d 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ProcedureInfo;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 6823288..fe2904b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -414,7 +414,9 @@ public class ProcedureStoreTracker {
     node.updateState(procId, isDeleted);
   }
 
-  public void clear() {
+  public void reset() {
+    this.keepDeletes = false;
+    this.partial = false;
     this.map.clear();
     resetUpdates();
   }
@@ -579,11 +581,11 @@ public class ProcedureStoreTracker {
   }
 
   public void readFrom(final InputStream stream) throws IOException {
-    ProcedureProtos.ProcedureStoreTracker data =
+    reset();
+    final ProcedureProtos.ProcedureStoreTracker data =
         ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
-    map.clear();
     for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
-      BitSetNode node = BitSetNode.convert(protoNode);
+      final BitSetNode node = BitSetNode.convert(protoNode);
       map.put(node.getStart(), node);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index 17432ac..0df4046 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -64,7 +64,6 @@ public final class ProcedureWALFormat {
   }
 
   interface Loader {
-    void removeLog(ProcedureWALFile log);
     void markCorruptedWAL(ProcedureWALFile log, IOException e);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 4dbd929..7d9a57b 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -25,9 +25,10 @@ import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.ProcedureInfo;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
@@ -91,10 +92,7 @@ public class ProcedureWALFormatReader {
       loader.markCorruptedWAL(log, e);
     }
 
-    if (localProcedures.isEmpty()) {
-      LOG.info("No active entry found in state log " + log + ". removing it");
-      loader.removeLog(log);
-    } else {
+    if (!localProcedures.isEmpty()) {
       Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
         localProcedures.entrySet().iterator();
       long minProcId = Long.MAX_VALUE;
@@ -160,6 +158,7 @@ public class ProcedureWALFormatReader {
     }
     maxProcId = Math.max(maxProcId, entry.getProcId());
     localProcedures.remove(entry.getProcId());
+    assert !procedures.containsKey(entry.getProcId());
     tracker.setDeleted(entry.getProcId(), true);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index e5d6869..0089760 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -247,8 +247,13 @@ public class WALProcedureStore implements ProcedureStore {
     return storeTracker;
   }
 
-  public LinkedList<ProcedureWALFile> getActiveLogs() {
-    return logs;
+  public ArrayList<ProcedureWALFile> getActiveLogs() {
+    lock.lock();
+    try {
+      return new ArrayList<ProcedureWALFile>(logs);
+    } finally {
+      lock.unlock();
+    }
   }
 
   public Set<ProcedureWALFile> getCorruptedLogs() {
@@ -316,17 +321,11 @@ public class WALProcedureStore implements ProcedureStore {
     }
 
     // Load the old logs
-    final ArrayList<ProcedureWALFile> toRemove = new ArrayList<ProcedureWALFile>();
     Iterator<ProcedureWALFile> it = logs.descendingIterator();
     it.next(); // Skip the current log
     try {
       return ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() {
         @Override
-        public void removeLog(ProcedureWALFile log) {
-          toRemove.add(log);
-        }
-
-        @Override
         public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
           if (corruptedLogs == null) {
             corruptedLogs = new HashSet<ProcedureWALFile>();
@@ -336,11 +335,6 @@ public class WALProcedureStore implements ProcedureStore {
         }
       });
     } finally {
-      if (!toRemove.isEmpty()) {
-        for (ProcedureWALFile log: toRemove) {
-          removeLogFile(log);
-        }
-      }
       loading.set(false);
     }
   }
@@ -589,6 +583,7 @@ public class WALProcedureStore implements ProcedureStore {
         totalSynced = syncSlots(stream, slots, 0, slotIndex);
         break;
       } catch (Throwable e) {
+        LOG.warn("unable to sync slots, retry=" + retry);
         if (++retry >= maxRetriesBeforeRoll) {
           if (logRolled >= maxSyncFailureRoll) {
             LOG.error("Sync slots after log roll failed, abort.", e);
@@ -648,14 +643,15 @@ public class WALProcedureStore implements ProcedureStore {
   }
 
   private boolean rollWriterOrDie() {
-    for (int i = 1; i <= rollRetries; ++i) {
+    for (int i = 0; i < rollRetries; ++i) {
+      if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
+
       try {
         if (rollWriter()) {
           return true;
         }
       } catch (IOException e) {
-        LOG.warn("Unable to roll the log, attempt=" + i, e);
-        Threads.sleepWithoutInterrupt(waitBeforeRoll);
+        LOG.warn("Unable to roll the log, attempt=" + (i + 1), e);
       }
     }
     LOG.fatal("Unable to roll the log");
@@ -902,7 +898,7 @@ public class WALProcedureStore implements ProcedureStore {
     }
   }
 
-  private long getMaxLogId(final FileStatus[] logFiles) {
+  private static long getMaxLogId(final FileStatus[] logFiles) {
     long maxLogId = 0;
     if (logFiles != null && logFiles.length > 0) {
       for (int i = 0; i < logFiles.length; ++i) {
@@ -945,7 +941,7 @@ public class WALProcedureStore implements ProcedureStore {
       } catch (IOException e) {
         LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage());
         // try the next one...
-        storeTracker.clear();
+        storeTracker.reset();
         storeTracker.setPartialFlag(true);
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
index 26a94d4..6bc5d36 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java
@@ -192,7 +192,7 @@ public class TestProcedureStoreTracker {
         count++;
       }
 
-      tracker.clear();
+      tracker.reset();
     }
   }
 
@@ -212,7 +212,7 @@ public class TestProcedureStoreTracker {
           tracker.setDeleted(i, false);
         }
 
-        tracker.clear();
+        tracker.reset();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/20d5577d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 4fea6de..a33f334 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -151,6 +151,50 @@ public class TestWALProcedureStore {
   }
 
   @Test
+  public void testNoTrailerDoubleRestart() throws Exception {
+    // log-0001: proc 0, 1 and 2 are inserted
+    Procedure proc0 = new TestSequentialProcedure();
+    procStore.insert(proc0, null);
+    Procedure proc1 = new TestSequentialProcedure();
+    procStore.insert(proc1, null);
+    Procedure proc2 = new TestSequentialProcedure();
+    procStore.insert(proc2, null);
+    procStore.rollWriterForTesting();
+
+    // log-0002: proc 1 deleted
+    procStore.delete(proc1.getProcId());
+    procStore.rollWriterForTesting();
+
+    // log-0003: proc 2 is update
+    procStore.update(proc2);
+    procStore.rollWriterForTesting();
+
+    // log-0004: proc 2 deleted
+    procStore.delete(proc2.getProcId());
+
+    // stop the store and remove the trailer
+    procStore.stop(false);
+    FileStatus[] logs = fs.listStatus(logDir);
+    assertEquals(4, logs.length);
+    for (int i = 0; i < logs.length; ++i) {
+      corruptLog(logs[i], 4);
+    }
+
+    // Test Load 1
+    assertEquals(1, countProcedures(storeRestart()));
+
+    // Test Load 2
+    assertEquals(5, fs.listStatus(logDir).length);
+    assertEquals(1, countProcedures(storeRestart()));
+
+    // remove proc-0
+    procStore.delete(proc0.getProcId());
+    procStore.periodicRollForTesting();
+    assertEquals(1, fs.listStatus(logDir).length);
+    assertEquals(0, countProcedures(storeRestart()));
+  }
+
+  @Test
   public void testCorruptedTrailer() throws Exception {
     // Insert something
     for (int i = 0; i < 100; ++i) {