You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/07/27 09:28:18 UTC
hbase git commit: HBASE-20939 There will be race when we call
suspendIfNotReady and then throw ProcedureSuspendedException
Repository: hbase
Updated Branches:
refs/heads/master 80b40a3b5 -> 7178a9825
HBASE-20939 There will be race when we call suspendIfNotReady and then throw ProcedureSuspendedException
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7178a982
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7178a982
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7178a982
Branch: refs/heads/master
Commit: 7178a98258dbb28496c2c4f3fbbf8e552ead8bdb
Parents: 80b40a3
Author: zhangduo <zh...@apache.org>
Authored: Fri Jul 27 17:26:40 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Jul 27 17:27:12 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/IdLock.java | 137 ++++++++++++++++++
.../hbase/procedure2/ProcedureExecutor.java | 20 +--
.../procedure2/ProcedureSuspendedException.java | 6 +-
.../org/apache/hadoop/hbase/util/IdLock.java | 138 -------------------
4 files changed, 152 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7178a982/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
new file mode 100644
index 0000000..269bf83
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Allows multiple concurrent clients to lock on a numeric id with a minimal
+ * memory overhead. The intended usage is as follows:
+ *
+ * <pre>
+ * IdLock.Entry lockEntry = idLock.getLockEntry(id);
+ * try {
+ * // User code.
+ * } finally {
+ * idLock.releaseLockEntry(lockEntry);
+ * }</pre>
+ */
+@InterfaceAudience.Private
+public class IdLock {
+
+ /** An entry returned to the client as a lock object */
+ public static final class Entry {
+ private final long id;
+ private int numWaiters;
+ private boolean locked = true;
+
+ private Entry(long id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
+ + locked;
+ }
+ }
+
+ private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>();
+
+ /**
+ * Blocks until the lock corresponding to the given id is acquired.
+ *
+ * @param id an arbitrary number to lock on
+ * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
+ * the lock
+ * @throws IOException if interrupted
+ */
+ public Entry getLockEntry(long id) throws IOException {
+ Entry entry = new Entry(id);
+ Entry existing;
+ while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
+ synchronized (existing) {
+ if (existing.locked) {
+ ++existing.numWaiters; // Add ourselves to waiters.
+ while (existing.locked) {
+ try {
+ existing.wait();
+ } catch (InterruptedException e) {
+ --existing.numWaiters; // Remove ourselves from waiters.
+ throw new InterruptedIOException(
+ "Interrupted waiting to acquire sparse lock");
+ }
+ }
+
+ --existing.numWaiters; // Remove ourselves from waiters.
+ existing.locked = true;
+ return existing;
+ }
+ // If the entry is not locked, it might already be deleted from the
+ // map, so we cannot return it. We need to get our entry into the map
+ // or get someone else's locked entry.
+ }
+ }
+ return entry;
+ }
+
+ /**
+ * Must be called in a finally block to decrease the internal counter and
+ * remove the monitor object for the given id if the caller is the last
+ * client.
+ *
+ * @param entry the return value of {@link #getLockEntry(long)}
+ */
+ public void releaseLockEntry(Entry entry) {
+ synchronized (entry) {
+ entry.locked = false;
+ if (entry.numWaiters > 0) {
+ entry.notify();
+ } else {
+ map.remove(entry.id);
+ }
+ }
+ }
+
+ /** For testing */
+ void assertMapEmpty() {
+ assert map.isEmpty();
+ }
+
+ @VisibleForTesting
+ public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
+ for (Entry entry;;) {
+ entry = map.get(id);
+ if (entry != null) {
+ synchronized (entry) {
+ if (entry.numWaiters >= numWaiters) {
+ return;
+ }
+ }
+ }
+ Thread.sleep(100);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7178a982/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index f1bec72..e2215c6 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
@@ -313,6 +314,14 @@ public class ProcedureExecutor<TEnvironment> {
private final boolean checkOwnerSet;
+ // To prevent concurrent execution of the same procedure.
+ // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
+ // procedure is woken up before we finish the suspend which causes the same procedures to be
+ // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
+ // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
+ // execution of the same procedure.
+ private final IdLock procExecutionLock = new IdLock();
+
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
final ProcedureStore store) {
this(conf, environment, store, new SimpleProcedureScheduler());
@@ -1496,14 +1505,7 @@ public class ProcedureExecutor<TEnvironment> {
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
// The exception is caught below and then we hurry to the exit without disturbing state. The
// idea is that the processing of this procedure will be unsuspended later by an external event
- // such the report of a region open. TODO: Currently, its possible for two worker threads
- // to be working on the same procedure concurrently (locking in procedures is NOT about
- // concurrency but about tying an entity to a procedure; i.e. a region to a particular
- // procedure instance). This can make for issues if both threads are changing state.
- // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
- // in RegionTransitionProcedure#reportTransition for example of Procedure putting
- // itself back on the scheduler making it possible for two threads running against
- // the one Procedure. Might be ok if they are both doing different, idempotent sections.
+ // such the report of a region open.
boolean suspended = false;
// Whether to 're-' -execute; run through the loop again.
@@ -1798,12 +1800,14 @@ public class ProcedureExecutor<TEnvironment> {
LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
runningCount, activeCount);
executionStartTime.set(EnvironmentEdgeManager.currentTime());
+ IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
try {
executeProcedure(proc);
} catch (AssertionError e) {
LOG.info("ASSERT pid=" + proc.getProcId(), e);
throw e;
} finally {
+ procExecutionLock.releaseLockEntry(lockEntry);
activeCount = activeExecutorCount.decrementAndGet();
runningCount = store.setRunningProcedureCount(activeCount);
LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/7178a982/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java
index 5090fb1..9f52121 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java
@@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.Private
-@InterfaceStability.Stable
public class ProcedureSuspendedException extends ProcedureException {
+
+ private static final long serialVersionUID = -8328419627678496269L;
+
/** default constructor */
public ProcedureSuspendedException() {
super();
http://git-wip-us.apache.org/repos/asf/hbase/blob/7178a982/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
deleted file mode 100644
index eba9acd..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.util;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Allows multiple concurrent clients to lock on a numeric id with a minimal
- * memory overhead. The intended usage is as follows:
- *
- * <pre>
- * IdLock.Entry lockEntry = idLock.getLockEntry(id);
- * try {
- * // User code.
- * } finally {
- * idLock.releaseLockEntry(lockEntry);
- * }</pre>
- */
-@InterfaceAudience.Private
-public class IdLock {
-
- /** An entry returned to the client as a lock object */
- public static class Entry {
- private final long id;
- private int numWaiters;
- private boolean locked = true;
-
- private Entry(long id) {
- this.id = id;
- }
-
- @Override
- public String toString() {
- return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked="
- + locked;
- }
- }
-
- private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>();
-
- /**
- * Blocks until the lock corresponding to the given id is acquired.
- *
- * @param id an arbitrary number to lock on
- * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release
- * the lock
- * @throws IOException if interrupted
- */
- public Entry getLockEntry(long id) throws IOException {
- Entry entry = new Entry(id);
- Entry existing;
- while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
- synchronized (existing) {
- if (existing.locked) {
- ++existing.numWaiters; // Add ourselves to waiters.
- while (existing.locked) {
- try {
- existing.wait();
- } catch (InterruptedException e) {
- --existing.numWaiters; // Remove ourselves from waiters.
- throw new InterruptedIOException(
- "Interrupted waiting to acquire sparse lock");
- }
- }
-
- --existing.numWaiters; // Remove ourselves from waiters.
- existing.locked = true;
- return existing;
- }
- // If the entry is not locked, it might already be deleted from the
- // map, so we cannot return it. We need to get our entry into the map
- // or get someone else's locked entry.
- }
- }
- return entry;
- }
-
- /**
- * Must be called in a finally block to decrease the internal counter and
- * remove the monitor object for the given id if the caller is the last
- * client.
- *
- * @param entry the return value of {@link #getLockEntry(long)}
- */
- public void releaseLockEntry(Entry entry) {
- synchronized (entry) {
- entry.locked = false;
- if (entry.numWaiters > 0) {
- entry.notify();
- } else {
- map.remove(entry.id);
- }
- }
- }
-
- /** For testing */
- void assertMapEmpty() {
- assert map.isEmpty();
- }
-
- @VisibleForTesting
- public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
- for (Entry entry;;) {
- entry = map.get(id);
- if (entry != null) {
- synchronized (entry) {
- if (entry.numWaiters >= numWaiters) {
- return;
- }
- }
- }
- Thread.sleep(100);
- }
- }
-}