You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/06/26 01:46:58 UTC

[3/3] hbase git commit: HBASE-13950 Add a NoopProcedureStore for testing

HBASE-13950 Add a NoopProcedureStore for testing


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b7a82d83
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b7a82d83
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b7a82d83

Branch: refs/heads/branch-1
Commit: b7a82d83112faabb30c418030acaaac3d3e9da10
Parents: f248f86
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Jun 25 16:40:20 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Jun 25 16:46:22 2015 -0700

----------------------------------------------------------------------
 .../procedure2/store/NoopProcedureStore.java    | 73 ++++++++++++++++++++
 .../procedure2/store/ProcedureStoreBase.java    | 66 ++++++++++++++++++
 .../procedure2/store/wal/WALProcedureStore.java | 44 +++---------
 .../procedure2/ProcedureTestingUtility.java     | 15 ++++
 .../hbase/master/TestAssignmentManager.java     | 30 +++-----
 5 files changed, 171 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b7a82d83/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
new file mode 100644
index 0000000..62448fb
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.procedure2.Procedure;
+
+/**
+ * An In-Memory store that does not keep track of the procedures inserted.
+ */
+public class NoopProcedureStore extends ProcedureStoreBase {
+  private int numThreads;
+
+  @Override
+  public void start(int numThreads) throws IOException {
+    if (!setRunning(true)) {
+      return;
+    }
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void stop(boolean abort) {
+    setRunning(false);
+  }
+
+  @Override
+  public void recoverLease() throws IOException {
+    // no-op
+  }
+
+  @Override
+  public int getNumThreads() {
+    return numThreads;
+  }
+
+  @Override
+  public void load(final ProcedureLoader loader) throws IOException {
+    loader.setMaxProcId(0);
+  }
+
+  @Override
+  public void insert(Procedure proc, Procedure[] subprocs) {
+    // no-op
+  }
+
+  @Override
+  public void update(Procedure proc) {
+    // no-op
+  }
+
+  @Override
+  public void delete(long procId) {
+    // no-op
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7a82d83/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
new file mode 100644
index 0000000..e5653b6
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Base class for {@link ProcedureStore}s.
+ */
+public abstract class ProcedureStoreBase implements ProcedureStore {
+  private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
+      new CopyOnWriteArrayList<ProcedureStoreListener>();
+
+  private final AtomicBoolean running = new AtomicBoolean(false);
+
+  /**
+   * Change the state to 'isRunning',
+   * returns true if the store state was changed,
+   * false if the store was already in that state.
+   * @param isRunning the state to set.
+   * @return true if the store state was changed, otherwise false.
+   */
+  protected boolean setRunning(boolean isRunning) {
+    return running.getAndSet(isRunning) != isRunning;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return running.get();
+  }
+
+  @Override
+  public void registerListener(ProcedureStoreListener listener) {
+    listeners.add(listener);
+  }
+
+  @Override
+  public boolean unregisterListener(ProcedureStoreListener listener) {
+    return listeners.remove(listener);
+  }
+
+  protected void sendAbortProcessSignal() {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureStoreListener listener : this.listeners) {
+        listener.abortProcess();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7a82d83/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index f4a52b1..54b53dc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.Arrays;
 import java.util.ArrayList;
@@ -47,7 +46,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
@@ -58,7 +57,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHe
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class WALProcedureStore implements ProcedureStore {
+public class WALProcedureStore extends ProcedureStoreBase {
   private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
 
   public interface LeaseRecovery {
@@ -76,12 +75,8 @@ public class WALProcedureStore implements ProcedureStore {
   private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
   private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
 
-  private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
-    new CopyOnWriteArrayList<ProcedureStoreListener>();
-
   private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
   private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
-  private final AtomicBoolean running = new AtomicBoolean(false);
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition waitCond = lock.newCondition();
   private final Condition slotCond = lock.newCondition();
@@ -117,7 +112,7 @@ public class WALProcedureStore implements ProcedureStore {
 
   @Override
   public void start(int numSlots) throws IOException {
-    if (running.getAndSet(true)) {
+    if (!setRunning(true)) {
       return;
     }
 
@@ -137,7 +132,7 @@ public class WALProcedureStore implements ProcedureStore {
     syncThread = new Thread("WALProcedureStoreSyncThread") {
       @Override
       public void run() {
-        while (running.get()) {
+        while (isRunning()) {
           try {
             syncLoop();
           } catch (IOException e) {
@@ -152,7 +147,7 @@ public class WALProcedureStore implements ProcedureStore {
 
   @Override
   public void stop(boolean abort) {
-    if (!running.getAndSet(false)) {
+    if (!setRunning(false)) {
       return;
     }
 
@@ -186,11 +181,6 @@ public class WALProcedureStore implements ProcedureStore {
   }
 
   @Override
-  public boolean isRunning() {
-    return running.get();
-  }
-
-  @Override
   public int getNumThreads() {
     return slots == null ? 0 : slots.length;
   }
@@ -200,20 +190,10 @@ public class WALProcedureStore implements ProcedureStore {
   }
 
   @Override
-  public void registerListener(ProcedureStoreListener listener) {
-    this.listeners.add(listener);
-  }
-
-  @Override
-  public boolean unregisterListener(ProcedureStoreListener listener) {
-    return this.listeners.remove(listener);
-  }
-
-  @Override
   public void recoverLease() throws IOException {
     LOG.info("Starting WAL Procedure Store lease recovery");
     FileStatus[] oldLogs = getLogFiles();
-    while (running.get()) {
+    while (isRunning()) {
       // Get Log-MaxID and recover lease on old logs
       flushLogId = initOldLogs(oldLogs);
 
@@ -462,7 +442,7 @@ public class WALProcedureStore implements ProcedureStore {
 
   private void syncLoop() throws IOException {
     inSync.set(false);
-    while (running.get()) {
+    while (isRunning()) {
       lock.lock();
       try {
         // Wait until new data is available
@@ -522,7 +502,7 @@ public class WALProcedureStore implements ProcedureStore {
           sendAbortProcessSignal();
         }
       }
-    } while (running.get());
+    } while (isRunning());
     return totalSynced;
   }
 
@@ -548,14 +528,6 @@ public class WALProcedureStore implements ProcedureStore {
     return totalSynced;
   }
 
-  private void sendAbortProcessSignal() {
-    if (!this.listeners.isEmpty()) {
-      for (ProcedureStoreListener listener : this.listeners) {
-        listener.abortProcess();
-      }
-    }
-  }
-
   private boolean rollWriterOrDie() {
     try {
       return rollWriter();

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7a82d83/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index ddea9d2..a90e056 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 
 import static org.junit.Assert.assertEquals;
@@ -109,6 +110,20 @@ public class ProcedureTestingUtility {
     ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
   }
 
+  public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
+      throws IOException {
+    NoopProcedureStore procStore = new NoopProcedureStore();
+    ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<TEnv>(conf, env, procStore);
+    procStore.start(1);
+    procExecutor.start(1, false);
+    try {
+      return submitAndWait(procExecutor, proc);
+    } finally {
+      procStore.stop(false);
+      procExecutor.stop();
+    }
+  }
+
   public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
     long procId = procExecutor.submitProcedure(proc);
     waitProcedure(procExecutor, procId);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b7a82d83/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 16ba50a..162b114 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
 import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@@ -111,7 +112,7 @@ import com.google.protobuf.ServiceException;
 
 /**
  * Test {@link AssignmentManager}
- * 
+ *
  * TODO: This test suite has rotted. It is too fragile. The smallest change throws it off. It is
  * too brittle mocking up partial states in mockito trying to ensure we walk the right codepath
  * to obtain expected result. Redo.
@@ -476,7 +477,7 @@ public class TestAssignmentManager {
    * Run a simple server shutdown handler.
    * @throws KeeperException
    * @throws IOException
-   * @throws InterruptedException 
+   * @throws InterruptedException
    */
   @Test (timeout=180000)
   public void testShutdownHandler()
@@ -507,7 +508,7 @@ public class TestAssignmentManager {
    * @throws KeeperException
    * @throws IOException
    * @throws ServiceException
-   * @throws InterruptedException 
+   * @throws InterruptedException
    */
   @Test (timeout=180000)
   public void testSSHWhenDisablingTableInProgress() throws KeeperException, IOException,
@@ -523,7 +524,7 @@ public class TestAssignmentManager {
    * @throws KeeperException
    * @throws IOException
    * @throws ServiceException
-   * @throws InterruptedException 
+   * @throws InterruptedException
    */
   @Test (timeout=180000)
   public void testSSHWhenDisabledTableInProgress() throws KeeperException, IOException,
@@ -745,26 +746,13 @@ public class TestAssignmentManager {
       Mockito.when(services.getMetaTableLocator()).thenReturn(mtl);
       Configuration conf = server.getConfiguration();
       Mockito.when(services.getConfiguration()).thenReturn(conf);
+      Mockito.when(services.isServerCrashProcessingEnabled()).thenReturn(true);
+
       MasterProcedureEnv env = new MasterProcedureEnv(services);
       ServerCrashProcedure procedure = new ServerCrashProcedure(SERVERNAME_DEAD, true, false);
       am.failoverCleanupDone.set(true);
       clearRITInBackground(am, REGIONINFO, SERVERNAME_LIVE);
-      Method protectedExecuteMethod = null;
-        try {
-          protectedExecuteMethod =
-            procedure.getClass().getSuperclass().getDeclaredMethod("execute", Object.class);
-          protectedExecuteMethod.setAccessible(true);
-          Procedure [] procedures = new Procedure [] {procedure};
-          do {
-            // We know that ServerCrashProcedure does not return more than a single Procedure as
-            // result; it does not make children so the procedures[0] is safe.
-            procedures = (Procedure [])protectedExecuteMethod.invoke(procedures[0], env);
-          } while(procedures != null);
-        } catch (NoSuchMethodException | SecurityException | IllegalAccessException |
-            IllegalArgumentException | InvocationTargetException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
-        }
+      ProcedureTestingUtility.submitAndWait(conf, env, procedure);
       // The region in r will have been assigned.  It'll be up in zk as unassigned.
     } finally {
       if (connection != null) connection.close();
@@ -1182,7 +1170,7 @@ public class TestAssignmentManager {
    * When a region is in transition, if the region server opening the region goes down,
    * the region assignment takes a long time normally (waiting for timeout monitor to trigger
    * assign). This test is to make sure SSH reassigns it right away.
-   * @throws InterruptedException 
+   * @throws InterruptedException
    */
   @Test (timeout=180000)
   public void testSSHTimesOutOpeningRegionTransition()