You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/06/26 01:46:57 UTC
[2/3] hbase git commit: HBASE-13950 Add a NoopProcedureStore for
testing
HBASE-13950 Add a NoopProcedureStore for testing
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/70f0ca3c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/70f0ca3c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/70f0ca3c
Branch: refs/heads/master
Commit: 70f0ca3c6d485d504d49652801fd841de2086157
Parents: db5dd1e
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Jun 25 16:29:57 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Jun 25 16:44:58 2015 -0700
----------------------------------------------------------------------
.../procedure2/store/NoopProcedureStore.java | 73 ++++++++++++++++++++
.../procedure2/store/ProcedureStoreBase.java | 66 ++++++++++++++++++
.../procedure2/store/wal/WALProcedureStore.java | 44 +++---------
.../procedure2/ProcedureTestingUtility.java | 15 ++++
4 files changed, 162 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
new file mode 100644
index 0000000..62448fb
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.procedure2.Procedure;
+
+/**
+ * An In-Memory store that does not keep track of the procedures inserted.
+ */
+public class NoopProcedureStore extends ProcedureStoreBase {
+ private int numThreads;
+
+ @Override
+ public void start(int numThreads) throws IOException {
+ if (!setRunning(true)) {
+ return;
+ }
+ this.numThreads = numThreads;
+ }
+
+ @Override
+ public void stop(boolean abort) {
+ setRunning(false);
+ }
+
+ @Override
+ public void recoverLease() throws IOException {
+ // no-op
+ }
+
+ @Override
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ @Override
+ public void load(final ProcedureLoader loader) throws IOException {
+ loader.setMaxProcId(0);
+ }
+
+ @Override
+ public void insert(Procedure proc, Procedure[] subprocs) {
+ // no-op
+ }
+
+ @Override
+ public void update(Procedure proc) {
+ // no-op
+ }
+
+ @Override
+ public void delete(long procId) {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
new file mode 100644
index 0000000..e5653b6
--- /dev/null
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreBase.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Base class for {@link ProcedureStore}s.
+ */
+public abstract class ProcedureStoreBase implements ProcedureStore {
+ private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
+ new CopyOnWriteArrayList<ProcedureStoreListener>();
+
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ /**
+ * Change the state to 'isRunning',
+ * returns true if the store state was changed,
+ * false if the store was already in that state.
+ * @param isRunning the state to set.
+ * @return true if the store state was changed, otherwise false.
+ */
+ protected boolean setRunning(boolean isRunning) {
+ return running.getAndSet(isRunning) != isRunning;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ @Override
+ public void registerListener(ProcedureStoreListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public boolean unregisterListener(ProcedureStoreListener listener) {
+ return listeners.remove(listener);
+ }
+
+ protected void sendAbortProcessSignal() {
+ if (!this.listeners.isEmpty()) {
+ for (ProcedureStoreListener listener : this.listeners) {
+ listener.abortProcess();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index f4a52b1..54b53dc 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.ArrayList;
@@ -47,7 +46,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
-import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
@@ -58,7 +57,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALHe
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class WALProcedureStore implements ProcedureStore {
+public class WALProcedureStore extends ProcedureStoreBase {
private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
public interface LeaseRecovery {
@@ -76,12 +75,8 @@ public class WALProcedureStore implements ProcedureStore {
private static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
private static final long DEFAULT_ROLL_THRESHOLD = 32 * 1024 * 1024; // 32M
- private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
- new CopyOnWriteArrayList<ProcedureStoreListener>();
-
private final LinkedList<ProcedureWALFile> logs = new LinkedList<ProcedureWALFile>();
private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
- private final AtomicBoolean running = new AtomicBoolean(false);
private final ReentrantLock lock = new ReentrantLock();
private final Condition waitCond = lock.newCondition();
private final Condition slotCond = lock.newCondition();
@@ -117,7 +112,7 @@ public class WALProcedureStore implements ProcedureStore {
@Override
public void start(int numSlots) throws IOException {
- if (running.getAndSet(true)) {
+ if (!setRunning(true)) {
return;
}
@@ -137,7 +132,7 @@ public class WALProcedureStore implements ProcedureStore {
syncThread = new Thread("WALProcedureStoreSyncThread") {
@Override
public void run() {
- while (running.get()) {
+ while (isRunning()) {
try {
syncLoop();
} catch (IOException e) {
@@ -152,7 +147,7 @@ public class WALProcedureStore implements ProcedureStore {
@Override
public void stop(boolean abort) {
- if (!running.getAndSet(false)) {
+ if (!setRunning(false)) {
return;
}
@@ -186,11 +181,6 @@ public class WALProcedureStore implements ProcedureStore {
}
@Override
- public boolean isRunning() {
- return running.get();
- }
-
- @Override
public int getNumThreads() {
return slots == null ? 0 : slots.length;
}
@@ -200,20 +190,10 @@ public class WALProcedureStore implements ProcedureStore {
}
@Override
- public void registerListener(ProcedureStoreListener listener) {
- this.listeners.add(listener);
- }
-
- @Override
- public boolean unregisterListener(ProcedureStoreListener listener) {
- return this.listeners.remove(listener);
- }
-
- @Override
public void recoverLease() throws IOException {
LOG.info("Starting WAL Procedure Store lease recovery");
FileStatus[] oldLogs = getLogFiles();
- while (running.get()) {
+ while (isRunning()) {
// Get Log-MaxID and recover lease on old logs
flushLogId = initOldLogs(oldLogs);
@@ -462,7 +442,7 @@ public class WALProcedureStore implements ProcedureStore {
private void syncLoop() throws IOException {
inSync.set(false);
- while (running.get()) {
+ while (isRunning()) {
lock.lock();
try {
// Wait until new data is available
@@ -522,7 +502,7 @@ public class WALProcedureStore implements ProcedureStore {
sendAbortProcessSignal();
}
}
- } while (running.get());
+ } while (isRunning());
return totalSynced;
}
@@ -548,14 +528,6 @@ public class WALProcedureStore implements ProcedureStore {
return totalSynced;
}
- private void sendAbortProcessSignal() {
- if (!this.listeners.isEmpty()) {
- for (ProcedureStoreListener listener : this.listeners) {
- listener.abortProcess();
- }
- }
- }
-
private boolean rollWriterOrDie() {
try {
return rollWriter();
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f0ca3c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index ddea9d2..a90e056 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import static org.junit.Assert.assertEquals;
@@ -109,6 +110,20 @@ public class ProcedureTestingUtility {
ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, value);
}
+ public static <TEnv> long submitAndWait(Configuration conf, TEnv env, Procedure<TEnv> proc)
+ throws IOException {
+ NoopProcedureStore procStore = new NoopProcedureStore();
+ ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<TEnv>(conf, env, procStore);
+ procStore.start(1);
+ procExecutor.start(1, false);
+ try {
+ return submitAndWait(procExecutor, proc);
+ } finally {
+ procStore.stop(false);
+ procExecutor.stop();
+ }
+ }
+
public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) {
long procId = procExecutor.submitProcedure(proc);
waitProcedure(procExecutor, procId);