You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/04/10 09:51:42 UTC
[07/24] hbase git commit: HBASE-13203 Procedure v2 - master
create/delete table
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
new file mode 100644
index 0000000..903dbd3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureResult;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+
+/**
+ * Helper to synchronously wait on conditions.
+ * This will be removed in the future (mainly when the AssignmentManager will be
+ * replaced with a Procedure version) by using ProcedureYieldException,
+ * and the queue will handle waiting and scheduling based on events.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class ProcedureSyncWait {
+ private static final Log LOG = LogFactory.getLog(ProcedureSyncWait.class);
+
+ private ProcedureSyncWait() {}
+
+ @InterfaceAudience.Private
+ public interface Predicate<T> {
+ T evaluate() throws IOException;
+ }
+
+ public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec,
+ final Procedure proc) throws IOException {
+ long procId = procExec.submitProcedure(proc);
+ return waitForProcedureToComplete(procExec, procId);
+ }
+
+ public static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
+ final long procId) throws IOException {
+ while (!procExec.isFinished(procId) && procExec.isRunning()) {
+ // TODO: add a config to make it tunable
+ // Dev Consideration: are we waiting forever, or we can set up some timeout value?
+ Threads.sleepWithoutInterrupt(250);
+ }
+ ProcedureResult result = procExec.getResult(procId);
+ if (result != null) {
+ if (result.isFailed()) {
+ // If the procedure fails, we should always have an exception captured. Throw it.
+ throw result.getException().unwrapRemoteException();
+ }
+ return result.getResult();
+ } else {
+ if (procExec.isRunning()) {
+ throw new IOException("Procedure " + procId + "not found");
+ } else {
+ throw new IOException("The Master is Aborting");
+ }
+ }
+ }
+
+ public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate)
+ throws IOException {
+ final Configuration conf = env.getMasterConfiguration();
+ final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000);
+ final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000);
+ return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate);
+ }
+
+ public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents,
+ String purpose, Predicate<T> predicate) throws IOException {
+ final long done = EnvironmentEdgeManager.currentTime() + waitTime;
+ do {
+ T result = predicate.evaluate();
+ if (result != null && !result.equals(Boolean.FALSE)) {
+ return result;
+ }
+ try {
+ Thread.sleep(waitingTimeForEvents);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while sleeping, waiting on " + purpose);
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ }
+ LOG.debug("Waiting on " + purpose);
+ } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning());
+
+ throw new TimeoutIOException("Timed out while waiting on " + purpose);
+ }
+
+ protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException {
+ int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000);
+ try {
+ if (env.getMasterServices().getMetaTableLocator().waitMetaRegionLocation(
+ env.getMasterServices().getZooKeeper(), timeout) == null) {
+ throw new NotAllMetaRegionsOnlineException();
+ }
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ }
+ }
+
+ protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException {
+ final ServerManager sm = env.getMasterServices().getServerManager();
+ ProcedureSyncWait.waitFor(env, "server to assign region(s)",
+ new ProcedureSyncWait.Predicate<Boolean>() {
+ @Override
+ public Boolean evaluate() throws IOException {
+ List<ServerName> servers = sm.createDestinationServersList();
+ return servers != null && !servers.isEmpty();
+ }
+ });
+ }
+
+ protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env,
+ final TableName tableName) throws IOException {
+ return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta",
+ new ProcedureSyncWait.Predicate<List<HRegionInfo>>() {
+ @Override
+ public List<HRegionInfo> evaluate() throws IOException {
+ if (TableName.META_TABLE_NAME.equals(tableName)) {
+ return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper());
+ }
+ return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(),tableName);
+ }
+ });
+ }
+
+ protected static void waitRegionInTransition(final MasterProcedureEnv env,
+ final List<HRegionInfo> regions) throws IOException, CoordinatedStateException {
+ final AssignmentManager am = env.getMasterServices().getAssignmentManager();
+ final RegionStates states = am.getRegionStates();
+ for (final HRegionInfo region : regions) {
+ ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition",
+ new ProcedureSyncWait.Predicate<Boolean>() {
+ @Override
+ public Boolean evaluate() throws IOException {
+ if (states.isRegionInState(region, State.FAILED_OPEN)) {
+ am.regionOffline(region);
+ }
+ return !states.isRegionInTransition(region);
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
new file mode 100644
index 0000000..76ca094
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Procedures that operates on a specific Table (e.g. create, delete, snapshot, ...)
+ * must implement this interface to allow the system handle the lock/concurrency problems.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface TableProcedureInterface {
+ public enum TableOperationType { CREATE, DELETE, EDIT, READ };
+
+ /**
+ * @return the name of the table the procedure is operating on
+ */
+ TableName getTableName();
+
+ /**
+ * Given an operation type we can take decisions about what to do with pending operations.
+ * e.g. if we get a delete and we have some table operation pending (e.g. add column)
+ * we can abort those operations.
+ * @return the operation type that the procedure is executing.
+ */
+ TableOperationType getTableOperationType();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
index 9893fc8..5fe5f8c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.master.MasterServices;
-import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
+import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.namespace.NamespaceAuditor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest;
@@ -444,14 +444,11 @@ public class MasterQuotaManager implements RegionStateListener {
new HRegionInfo(QuotaUtil.QUOTA_TABLE_NAME)
};
- masterServices.getExecutorService()
- .submit(new CreateTableHandler(masterServices,
- masterServices.getMasterFileSystem(),
- QuotaUtil.QUOTA_TABLE_DESC,
- masterServices.getConfiguration(),
- newRegions,
- masterServices)
- .prepare());
+ masterServices.getMasterProcedureExecutor()
+ .submitProcedure(new CreateTableProcedure(
+ masterServices.getMasterProcedureExecutor().getEnvironment(),
+ QuotaUtil.QUOTA_TABLE_DESC,
+ newRegions));
}
private static class NamedLock<T> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a515f8e..f15eb1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -981,13 +981,14 @@ public class HRegionServer extends HasThread implements
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
- if(this.hMemManager != null) this.hMemManager.stop();
+ if (this.hMemManager != null) this.hMemManager.stop();
if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
if (this.compactionChecker != null) this.compactionChecker.cancel(true);
if (this.healthCheckChore != null) this.healthCheckChore.cancel(true);
if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true);
if (this.storefileRefresher != null) this.storefileRefresher.cancel(true);
+ sendShutdownInterrupt();
// Stop the quota manager
if (rsQuotaManager != null) {
@@ -2073,6 +2074,12 @@ public class HRegionServer extends HasThread implements
}
/**
+ * Called on stop/abort before closing the cluster connection and meta locator.
+ */
+ protected void sendShutdownInterrupt() {
+ }
+
+ /**
* Wait on all threads to finish. Presumption is that all closes and stops
* have already been called.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
index 95d8a17..347cad5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
@@ -67,6 +67,30 @@ public abstract class ModifyRegionUtils {
void editRegion(final HRegionInfo region) throws IOException;
}
+ public static HRegionInfo[] createHRegionInfos(HTableDescriptor hTableDescriptor,
+ byte[][] splitKeys) {
+ long regionId = System.currentTimeMillis();
+ HRegionInfo[] hRegionInfos = null;
+ if (splitKeys == null || splitKeys.length == 0) {
+ hRegionInfos = new HRegionInfo[]{
+ new HRegionInfo(hTableDescriptor.getTableName(), null, null, false, regionId)
+ };
+ } else {
+ int numRegions = splitKeys.length + 1;
+ hRegionInfos = new HRegionInfo[numRegions];
+ byte[] startKey = null;
+ byte[] endKey = null;
+ for (int i = 0; i < numRegions; i++) {
+ endKey = (i == splitKeys.length) ? null : splitKeys[i];
+ hRegionInfos[i] =
+ new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey,
+ false, regionId);
+ startKey = endKey;
+ }
+ }
+ return hRegionInfos;
+ }
+
/**
* Create new set of regions on the specified file-system.
* NOTE: that you should add the regions to hbase:meta after this operation.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index 8ed49ff..2c13f39 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLog
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.CatalogJanitor.SplitParentFirstComparator;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
@@ -261,6 +263,11 @@ public class TestCatalogJanitor {
}
@Override
+ public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+ return null;
+ }
+
+ @Override
public ServerManager getServerManager() {
return null;
}
@@ -912,7 +919,7 @@ public class TestCatalogJanitor {
MasterServices services = new MockMasterServices(server);
// create the janitor
-
+
CatalogJanitor janitor = new CatalogJanitor(server, services);
// Create regions.
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
new file mode 100644
index 0000000..d6c19e1
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableDescriptor;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MasterProcedureTestingUtility {
+ private static final Log LOG = LogFactory.getLog(MasterProcedureTestingUtility.class);
+
+ private MasterProcedureTestingUtility() {
+ }
+
+ public static HTableDescriptor createHTD(final TableName tableName, final String... family) {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ for (int i = 0; i < family.length; ++i) {
+ htd.addFamily(new HColumnDescriptor(family[i]));
+ }
+ return htd;
+ }
+
+ public static HRegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
+ final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
+ HTableDescriptor htd = createHTD(tableName, family);
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys);
+ long procId = ProcedureTestingUtility.submitAndWait(procExec,
+ new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
+ ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
+ return regions;
+ }
+
+ public static void validateTableCreation(final HMaster master, final TableName tableName,
+ final HRegionInfo[] regions, String... family) throws IOException {
+ validateTableCreation(master, tableName, regions, true, family);
+ }
+
+ public static void validateTableCreation(final HMaster master, final TableName tableName,
+ final HRegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
+ // check filesystem
+ final FileSystem fs = master.getMasterFileSystem().getFileSystem();
+ final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
+ assertTrue(fs.exists(tableDir));
+ List<Path> allRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
+ for (int i = 0; i < regions.length; ++i) {
+ Path regionDir = new Path(tableDir, regions[i].getEncodedName());
+ assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
+ assertTrue(allRegionDirs.remove(regionDir));
+ List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
+ for (int j = 0; j < family.length; ++j) {
+ final Path familyDir = new Path(regionDir, family[j]);
+ if (hasFamilyDirs) {
+ assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
+ assertTrue(allFamilyDirs.remove(familyDir));
+ } else {
+ // TODO: WARN: Modify Table/Families does not create a family dir
+ if (!fs.exists(familyDir)) {
+ LOG.warn(family[j] + " family dir does not exist");
+ }
+ allFamilyDirs.remove(familyDir);
+ }
+ }
+ assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
+ }
+ assertTrue("found extraneous regions: " + allRegionDirs, allRegionDirs.isEmpty());
+
+ // check meta
+ assertTrue(MetaTableAccessor.tableExists(master.getConnection(), tableName));
+ assertEquals(regions.length, countMetaRegions(master, tableName));
+
+ // check htd
+ TableDescriptor tableDesc = master.getTableDescriptors().getDescriptor(tableName);
+ assertTrue("table descriptor not found", tableDesc != null);
+ HTableDescriptor htd = tableDesc.getHTableDescriptor();
+ assertTrue("table descriptor not found", htd != null);
+ for (int i = 0; i < family.length; ++i) {
+ assertTrue("family not found " + family[i], htd.getFamily(Bytes.toBytes(family[i])) != null);
+ }
+ assertEquals(family.length, htd.getFamilies().size());
+ }
+
+ public static void validateTableDeletion(final HMaster master, final TableName tableName,
+ final HRegionInfo[] regions, String... family) throws IOException {
+ // check filesystem
+ final FileSystem fs = master.getMasterFileSystem().getFileSystem();
+ final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
+ assertFalse(fs.exists(tableDir));
+
+ // check meta
+ assertFalse(MetaTableAccessor.tableExists(master.getConnection(), tableName));
+ assertEquals(0, countMetaRegions(master, tableName));
+
+ // check htd
+ assertTrue("found htd of deleted table",
+ master.getTableDescriptors().getDescriptor(tableName) == null);
+ }
+
+ private static int countMetaRegions(final HMaster master, final TableName tableName)
+ throws IOException {
+ final AtomicInteger actualRegCount = new AtomicInteger(0);
+ final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
+ @Override
+ public boolean visit(Result rowResult) throws IOException {
+ RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
+ if (list == null) {
+ LOG.warn("No serialized HRegionInfo in " + rowResult);
+ return true;
+ }
+ HRegionLocation l = list.getRegionLocation();
+ if (l == null) {
+ return true;
+ }
+ if (!l.getRegionInfo().getTable().equals(tableName)) {
+ return false;
+ }
+ if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
+ HRegionLocation[] locations = list.getRegionLocations();
+ for (HRegionLocation location : locations) {
+ if (location == null) continue;
+ ServerName serverName = location.getServerName();
+ // Make sure that regions are assigned to server
+ if (serverName != null && serverName.getHostAndPort() != null) {
+ actualRegCount.incrementAndGet();
+ }
+ }
+ return true;
+ }
+ };
+ MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
+ return actualRegCount.get();
+ }
+
+ public static <TState> void testRecoveryAndDoubleExecution(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
+ final int numSteps, final TState[] states) throws Exception {
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ assertEquals(false, procExec.isRunning());
+ // Restart the executor and execute the step twice
+ // execute step N - kill before store update
+ // restart executor/store
+ // execute step N - save on store
+ for (int i = 0; i < numSteps; ++i) {
+ LOG.info("Restart "+ i +" exec state: " + states[i]);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ }
+ assertEquals(true, procExec.isRunning());
+ ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+ }
+
+ public static <TState> void testRollbackAndDoubleExecution(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
+ final int lastStep, final TState[] states) throws Exception {
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+ // Restart the executor and execute the step twice
+ // execute step N - kill before store update
+ // restart executor/store
+ // execute step N - save on store
+ for (int i = 0; i < lastStep; ++i) {
+ LOG.info("Restart "+ i +" exec state: " + states[i]);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ }
+
+ // Restart the executor and rollback the step twice
+ // rollback step N - kill before store update
+ // restart executor/store
+ // rollback step N - save on store
+ MasterProcedureTestingUtility.InjectAbortOnLoadListener abortListener =
+ new MasterProcedureTestingUtility.InjectAbortOnLoadListener(procExec);
+ procExec.registerListener(abortListener);
+ try {
+ for (int i = lastStep + 1; i >= 0; --i) {
+ LOG.info("Restart " + i +" rollback state: "+ states[i]);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ }
+ } finally {
+ assertTrue(procExec.unregisterListener(abortListener));
+ }
+
+ ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
+ }
+
+ public static <TState> void testRollbackAndDoubleExecutionAfterPONR(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
+ final int lastStep, final TState[] states) throws Exception {
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+ // Restart the executor and execute the step twice
+ // execute step N - kill before store update
+ // restart executor/store
+ // execute step N - save on store
+ for (int i = 0; i < lastStep; ++i) {
+ LOG.info("Restart "+ i +" exec state: " + states[i]);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ }
+
+ // try to inject the abort
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
+ MasterProcedureTestingUtility.InjectAbortOnLoadListener abortListener =
+ new MasterProcedureTestingUtility.InjectAbortOnLoadListener(procExec);
+ procExec.registerListener(abortListener);
+ try {
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ LOG.info("Restart and execute");
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ } finally {
+ assertTrue(procExec.unregisterListener(abortListener));
+ }
+
+ assertEquals(true, procExec.isRunning());
+ ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+ }
+
+ public static <TState> void testRollbackRetriableFailure(
+ final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
+ final int lastStep, final TState[] states) throws Exception {
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+ // Restart the executor and execute the step twice
+ // execute step N - kill before store update
+ // restart executor/store
+ // execute step N - save on store
+ for (int i = 0; i < lastStep; ++i) {
+ LOG.info("Restart "+ i +" exec state: " + states[i]);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ }
+
+ // execute the rollback
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
+ MasterProcedureTestingUtility.InjectAbortOnLoadListener abortListener =
+ new MasterProcedureTestingUtility.InjectAbortOnLoadListener(procExec);
+ procExec.registerListener(abortListener);
+ try {
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ LOG.info("Restart and rollback");
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ } finally {
+ assertTrue(procExec.unregisterListener(abortListener));
+ }
+
+ ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
+ }
+
+ public static class InjectAbortOnLoadListener
+ implements ProcedureExecutor.ProcedureExecutorListener {
+ private final ProcedureExecutor<MasterProcedureEnv> procExec;
+
+ public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
+ this.procExec = procExec;
+ }
+
+ @Override
+ public void procedureLoaded(long procId) {
+ procExec.abort(procId);
+ }
+
+ @Override
+ public void procedureAdded(long procId) { /* no-op */ }
+
+ @Override
+ public void procedureFinished(long procId) { /* no-op */ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java
new file mode 100644
index 0000000..7cd64b6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, MediumTests.class})
+public class TestCreateTableProcedure {
+ private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class);
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static void setupConf(Configuration conf) {
+ conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ resetProcExecutorTestingKillFlag();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ resetProcExecutorTestingKillFlag();
+ for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) {
+ LOG.info("Tear down, remove table=" + htd.getTableName());
+ UTIL.deleteTable(htd.getTableName());
+ }
+ }
+
+ private void resetProcExecutorTestingKillFlag() {
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
+ assertTrue("expected executor to be running", procExec.isRunning());
+ }
+
+ @Test(timeout=60000)
+ public void testSimpleCreate() throws Exception {
+ final TableName tableName = TableName.valueOf("testSimpleCreate");
+ final byte[][] splitKeys = null;
+ testSimpleCreate(tableName, splitKeys);
+ }
+
+ @Test(timeout=60000)
+ public void testSimpleCreateWithSplits() throws Exception {
+ final TableName tableName = TableName.valueOf("testSimpleCreateWithSplits");
+ final byte[][] splitKeys = new byte[][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+ };
+ testSimpleCreate(tableName, splitKeys);
+ }
+
+ private void testSimpleCreate(final TableName tableName, byte[][] splitKeys) throws Exception {
+ HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+ MasterProcedureTestingUtility.validateTableCreation(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ @Test(timeout=60000, expected=TableExistsException.class)
+ public void testCreateExisting() throws Exception {
+ final TableName tableName = TableName.valueOf("testCreateExisting");
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ final HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f");
+ final HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null);
+
+ // create the table
+ long procId1 = procExec.submitProcedure(
+ new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
+
+ // create another with the same name
+ ProcedurePrepareLatch latch2 = new ProcedurePrepareLatch.CompatibilityLatch();
+ long procId2 = procExec.submitProcedure(
+ new CreateTableProcedure(procExec.getEnvironment(), htd, regions, latch2));
+
+ ProcedureTestingUtility.waitProcedure(procExec, procId1);
+ ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId1));
+
+ ProcedureTestingUtility.waitProcedure(procExec, procId2);
+ latch2.await();
+ }
+
+ @Test(timeout=60000)
+ public void testRecoveryAndDoubleExecution() throws Exception {
+ final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution");
+
+ // create the table
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the Create procedure && kill the executor
+ byte[][] splitKeys = null;
+ HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2");
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys);
+ long procId = procExec.submitProcedure(
+ new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
+
+ // Restart the executor and execute the step twice
+ // NOTE: the 6 (number of CreateTableState steps) is hardcoded,
+ // so you have to look at this test at least once when you add a new step.
+ MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(
+ procExec, procId, 6, CreateTableState.values());
+
+ MasterProcedureTestingUtility.validateTableCreation(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ @Test(timeout=90000)
+ public void testRollbackAndDoubleExecution() throws Exception {
+ final TableName tableName = TableName.valueOf("testRollbackAndDoubleExecution");
+
+ // create the table
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the Create procedure && kill the executor
+ final byte[][] splitKeys = new byte[][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+ };
+ HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2");
+ htd.setRegionReplication(3);
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys);
+ long procId = procExec.submitProcedure(
+ new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
+
+ // NOTE: the 4 (number of CreateTableState steps) is hardcoded,
+ // so you have to look at this test at least once when you add a new step.
+ MasterProcedureTestingUtility.testRollbackAndDoubleExecution(
+ procExec, procId, 4, CreateTableState.values());
+
+ MasterProcedureTestingUtility.validateTableDeletion(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+
+ // are we able to create the table after a rollback?
+ resetProcExecutorTestingKillFlag();
+ testSimpleCreate(tableName, splitKeys);
+ }
+
+ @Test(timeout=90000)
+ public void testRollbackRetriableFailure() throws Exception {
+ final TableName tableName = TableName.valueOf("testRollbackRetriableFailure");
+
+ // create the table
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the Create procedure && kill the executor
+ final byte[][] splitKeys = new byte[][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+ };
+ HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2");
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys);
+ long procId = procExec.submitProcedure(
+ new FaultyCreateTableProcedure(procExec.getEnvironment(), htd, regions));
+
+ // NOTE: the 4 (number of CreateTableState steps) is hardcoded,
+ // so you have to look at this test at least once when you add a new step.
+ MasterProcedureTestingUtility.testRollbackRetriableFailure(
+ procExec, procId, 4, CreateTableState.values());
+
+ MasterProcedureTestingUtility.validateTableDeletion(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+
+ // are we able to create the table after a rollback?
+ resetProcExecutorTestingKillFlag();
+ testSimpleCreate(tableName, splitKeys);
+ }
+
+ private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+ return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ }
+
+ public static class FaultyCreateTableProcedure extends CreateTableProcedure {
+ private int retries = 0;
+
+ public FaultyCreateTableProcedure() {
+ // Required by the Procedure framework to create the procedure on replay
+ }
+
+ public FaultyCreateTableProcedure(final MasterProcedureEnv env,
+ final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions)
+ throws IOException {
+ super(env, hTableDescriptor, newRegions);
+ }
+
+ @Override
+ protected void rollbackState(final MasterProcedureEnv env, final CreateTableState state)
+ throws IOException {
+ if (retries++ < 3) {
+ LOG.info("inject rollback failure state=" + state);
+ throw new IOException("injected failure number " + retries);
+ } else {
+ super.rollbackState(env, state);
+ retries = 0;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java
new file mode 100644
index 0000000..6795b22
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureResult;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, MediumTests.class})
+public class TestDeleteTableProcedure {
+ private static final Log LOG = LogFactory.getLog(TestDeleteTableProcedure.class);
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static void setupConf(Configuration conf) {
+ conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void cleanupTest() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Before
+ public void setup() throws Exception {
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
+ assertTrue("expected executor to be running", procExec.isRunning());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
+ for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) {
+ LOG.info("Tear down, remove table=" + htd.getTableName());
+ UTIL.deleteTable(htd.getTableName());
+ }
+ }
+
+ @Test(timeout=60000, expected=TableNotFoundException.class)
+ public void testDeleteNotExistentTable() throws Exception {
+ final TableName tableName = TableName.valueOf("testDeleteNotExistentTable");
+
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedurePrepareLatch latch = new ProcedurePrepareLatch.CompatibilityLatch();
+ long procId = ProcedureTestingUtility.submitAndWait(procExec,
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName, latch));
+ latch.await();
+ }
+
+ @Test(timeout=60000, expected=TableNotDisabledException.class)
+ public void testDeleteNotDisabledTable() throws Exception {
+ final TableName tableName = TableName.valueOf("testDeleteNotDisabledTable");
+
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f");
+
+ ProcedurePrepareLatch latch = new ProcedurePrepareLatch.CompatibilityLatch();
+ long procId = ProcedureTestingUtility.submitAndWait(procExec,
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName, latch));
+ latch.await();
+ }
+
+ @Test(timeout=60000)
+ public void testDeleteDeletedTable() throws Exception {
+ final TableName tableName = TableName.valueOf("testDeleteDeletedTable");
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+
+ HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+ procExec, tableName, null, "f");
+ UTIL.getHBaseAdmin().disableTable(tableName);
+
+ // delete the table (that exists)
+ long procId1 = procExec.submitProcedure(
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName));
+ // delete the table (that will no longer exist)
+ long procId2 = procExec.submitProcedure(
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName));
+
+ // Wait the completion
+ ProcedureTestingUtility.waitProcedure(procExec, procId1);
+ ProcedureTestingUtility.waitProcedure(procExec, procId2);
+
+ // First delete should succeed
+ ProcedureTestingUtility.assertProcNotFailed(procExec, procId1);
+ MasterProcedureTestingUtility.validateTableDeletion(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f");
+
+ // Second delete should fail with TableNotFound
+ ProcedureResult result = procExec.getResult(procId2);
+ assertTrue(result.isFailed());
+ LOG.debug("Delete failed with exception: " + result.getException());
+ assertTrue(result.getException().getCause() instanceof TableNotFoundException);
+ }
+
+ @Test(timeout=60000)
+ public void testSimpleDelete() throws Exception {
+ final TableName tableName = TableName.valueOf("testSimpleDelete");
+ final byte[][] splitKeys = null;
+ testSimpleDelete(tableName, splitKeys);
+ }
+
+ @Test(timeout=60000)
+ public void testSimpleDeleteWithSplits() throws Exception {
+ final TableName tableName = TableName.valueOf("testSimpleDeleteWithSplits");
+ final byte[][] splitKeys = new byte[][] {
+ Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c")
+ };
+ testSimpleDelete(tableName, splitKeys);
+ }
+
+ private void testSimpleDelete(final TableName tableName, byte[][] splitKeys) throws Exception {
+ HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+ UTIL.getHBaseAdmin().disableTable(tableName);
+
+ // delete the table
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ long procId = ProcedureTestingUtility.submitAndWait(procExec,
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName));
+ ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+ MasterProcedureTestingUtility.validateTableDeletion(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ @Test(timeout=60000)
+ public void testRecoveryAndDoubleExecution() throws Exception {
+ final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution");
+
+ // create the table
+ byte[][] splitKeys = null;
+ HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+ UTIL.getHBaseAdmin().disableTable(tableName);
+
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.waitNoProcedureRunning(procExec);
+ ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
+
+ // Start the Delete procedure && kill the executor
+ long procId = procExec.submitProcedure(
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName));
+
+ // Restart the executor and execute the step twice
+ // NOTE: the 6 (number of DeleteTableState steps) is hardcoded,
+ // so you have to look at this test at least once when you add a new step.
+ MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(
+ procExec, procId, 6, DeleteTableState.values());
+
+ MasterProcedureTestingUtility.validateTableDeletion(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+ return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
new file mode 100644
index 0000000..faf7845
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, LargeTests.class})
+public class TestMasterFailoverWithProcedures {
+ private static final Log LOG = LogFactory.getLog(TestMasterFailoverWithProcedures.class);
+
+ protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static void setupConf(Configuration conf) {
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupConf(UTIL.getConfiguration());
+ UTIL.startMiniCluster(2, 1);
+
+ final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, false);
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, false);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ UTIL.shutdownMiniCluster();
+ } catch (Exception e) {
+ LOG.warn("failure shutting down cluster", e);
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testWalRecoverLease() throws Exception {
+ final ProcedureStore masterStore = getMasterProcedureExecutor().getStore();
+ assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore);
+
+ HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
+ // Abort Latch for the master store
+ final CountDownLatch masterStoreAbort = new CountDownLatch(1);
+ masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void abortProcess() {
+ LOG.debug("Abort store of Master");
+ masterStoreAbort.countDown();
+ }
+ });
+
+ // startup a fake master the new WAL store will take the lease
+ // and the active master should abort.
+ HMaster backupMaster3 = Mockito.mock(HMaster.class);
+ Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
+ Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
+ final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
+ firstMaster.getMasterFileSystem().getFileSystem(),
+ ((WALProcedureStore)masterStore).getLogDir(),
+ new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
+ // Abort Latch for the test store
+ final CountDownLatch backupStore3Abort = new CountDownLatch(1);
+ backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
+ @Override
+ public void abortProcess() {
+ LOG.debug("Abort store of backupMaster3");
+ backupStore3Abort.countDown();
+ backupStore3.stop(true);
+ }
+ });
+ backupStore3.start(1);
+ backupStore3.recoverLease();
+
+ // Try to trigger a command on the master (WAL lease expired on the active one)
+ HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f");
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null);
+ LOG.debug("submit proc");
+ getMasterProcedureExecutor().submitProcedure(
+ new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
+ LOG.debug("wait master store abort");
+ masterStoreAbort.await();
+
+ // Now the real backup master should start up
+ LOG.debug("wait backup master to startup");
+ waitBackupMaster(UTIL, firstMaster);
+ assertEquals(true, firstMaster.isStopped());
+
+ // wait the store in here to abort (the test will fail due to timeout if it doesn't)
+ LOG.debug("wait the store to abort");
+ backupStore3.getStoreTracker().setDeleted(1, false);
+ backupStore3.delete(1);
+ backupStore3Abort.await();
+ }
+
+ // ==========================================================================
+ // Test Create Table
+ // ==========================================================================
+ @Test(timeout=60000)
+ public void testCreateWithFailover() throws Exception {
+ // TODO: Should we try every step? (master failover takes long time)
+ // It is already covered by TestCreateTableProcedure
+ // but without the master restart, only the executor/store is restarted.
+ // Without Master restart we may not find bug in the procedure code
+ // like missing "wait" for resources to be available (e.g. RS)
+ testCreateWithFailoverAtStep(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS.ordinal());
+ }
+
+ private void testCreateWithFailoverAtStep(final int step) throws Exception {
+ final TableName tableName = TableName.valueOf("testCreateWithFailoverAtStep" + step);
+
+ // create the table
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true);
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true);
+
+ // Start the Create procedure && kill the executor
+ byte[][] splitKeys = null;
+ HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2");
+ HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys);
+ long procId = procExec.submitProcedure(
+ new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
+ testRecoveryAndDoubleExecution(UTIL, procId, step, CreateTableState.values());
+
+ MasterProcedureTestingUtility.validateTableCreation(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ // ==========================================================================
+ // Test Delete Table
+ // ==========================================================================
+ @Test(timeout=60000)
+ public void testDeleteWithFailover() throws Exception {
+ // TODO: Should we try every step? (master failover takes long time)
+ // It is already covered by TestDeleteTableProcedure
+ // but without the master restart, only the executor/store is restarted.
+ // Without Master restart we may not find bug in the procedure code
+ // like missing "wait" for resources to be available (e.g. RS)
+ testDeleteWithFailoverAtStep(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS.ordinal());
+ }
+
+ private void testDeleteWithFailoverAtStep(final int step) throws Exception {
+ final TableName tableName = TableName.valueOf("testDeleteWithFailoverAtStep" + step);
+
+ // create the table
+ byte[][] splitKeys = null;
+ HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
+ getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2");
+ Path tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+ MasterProcedureTestingUtility.validateTableCreation(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ UTIL.getHBaseAdmin().disableTable(tableName);
+
+ ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
+ ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true);
+ ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true);
+
+ // Start the Delete procedure && kill the executor
+ long procId = procExec.submitProcedure(
+ new DeleteTableProcedure(procExec.getEnvironment(), tableName));
+ testRecoveryAndDoubleExecution(UTIL, procId, step, DeleteTableState.values());
+
+ MasterProcedureTestingUtility.validateTableDeletion(
+ UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2");
+ }
+
+ // ==========================================================================
+ // Test Helpers
+ // ==========================================================================
+ public static <TState> void testRecoveryAndDoubleExecution(final HBaseTestingUtility testUtil,
+ final long procId, final int lastStepBeforeFailover, TState[] states) throws Exception {
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+
+ for (int i = 0; i < lastStepBeforeFailover; ++i) {
+ LOG.info("Restart "+ i +" exec state: " + states[i]);
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+ ProcedureTestingUtility.restart(procExec);
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ }
+ ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
+
+ LOG.info("Trigger master failover");
+ masterFailover(testUtil);
+
+ procExec = testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ ProcedureTestingUtility.waitProcedure(procExec, procId);
+ ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
+ }
+
+ // ==========================================================================
+ // Master failover utils
+ // ==========================================================================
+ public static void masterFailover(final HBaseTestingUtility testUtil)
+ throws Exception {
+ MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
+
+ // Kill the master
+ HMaster oldMaster = cluster.getMaster();
+ cluster.killMaster(cluster.getMaster().getServerName());
+
+ // Wait the secondary
+ waitBackupMaster(testUtil, oldMaster);
+ }
+
+ public static void waitBackupMaster(final HBaseTestingUtility testUtil,
+ final HMaster oldMaster) throws Exception {
+ MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
+
+ HMaster newMaster = cluster.getMaster();
+ while (newMaster == null || newMaster == oldMaster) {
+ Thread.sleep(250);
+ newMaster = cluster.getMaster();
+ }
+
+ while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
+ Thread.sleep(250);
+ }
+ }
+
+ // ==========================================================================
+ // Helpers
+ // ==========================================================================
+ private MasterProcedureEnv getMasterProcedureEnv() {
+ return getMasterProcedureExecutor().getEnvironment();
+ }
+
+ private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
+ return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
+ }
+
+ private FileSystem getFileSystem() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+ }
+
+ private Path getRootDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+ }
+
+ private Path getTempDir() {
+ return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getTempDir();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f6716045/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
new file mode 100644
index 0000000..d22930f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
@@ -0,0 +1,433 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestMasterProcedureQueue {
+ private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class);
+
+ private MasterProcedureQueue queue;
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = HBaseConfiguration.create();
+ queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ assertEquals(0, queue.size());
+ }
+
+ /**
+ * Verify simple create/insert/fetch/delete of the table queue.
+ */
+ @Test
+ public void testSimpleTableOpsQueues() throws Exception {
+ final int NUM_TABLES = 10;
+ final int NUM_ITEMS = 10;
+
+ int count = 0;
+ for (int i = 1; i <= NUM_TABLES; ++i) {
+ TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+ // insert items
+ for (int j = 1; j <= NUM_ITEMS; ++j) {
+ queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
+ assertEquals(++count, queue.size());
+ }
+ }
+ assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
+
+ for (int j = 1; j <= NUM_ITEMS; ++j) {
+ for (int i = 1; i <= NUM_TABLES; ++i) {
+ Long procId = queue.poll();
+ assertEquals(--count, queue.size());
+ assertEquals(i * 1000 + j, procId.longValue());
+ }
+ }
+ assertEquals(0, queue.size());
+
+ for (int i = 1; i <= NUM_TABLES; ++i) {
+ TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+ // complete the table deletion
+ assertTrue(queue.markTableAsDeleted(tableName));
+ }
+ }
+
+ /**
+ * Check that the table queue is not deletable until every procedure
+ * in-progress is completed (this is a special case for write-locks).
+ */
+ @Test
+ public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
+ TableName tableName = TableName.valueOf("testtb");
+
+ queue.addBack(new TestTableProcedure(1, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
+
+ // table can't be deleted because one item is in the queue
+ assertFalse(queue.markTableAsDeleted(tableName));
+
+ // fetch item and take a lock
+ assertEquals(1, queue.poll().longValue());
+ // take the xlock
+ assertTrue(queue.tryAcquireTableWrite(tableName, "write"));
+ // table can't be deleted because we have the lock
+ assertEquals(0, queue.size());
+ assertFalse(queue.markTableAsDeleted(tableName));
+ // release the xlock
+ queue.releaseTableWrite(tableName);
+ // complete the table deletion
+ assertTrue(queue.markTableAsDeleted(tableName));
+ }
+
+ /**
+ * Check that the table queue is not deletable until every procedure
+ * in-progress is completed (this is a special case for read-locks).
+ */
+ @Test
+ public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
+ final TableName tableName = TableName.valueOf("testtb");
+ final int nitems = 2;
+
+ for (int i = 1; i <= nitems; ++i) {
+ queue.addBack(new TestTableProcedure(i, tableName,
+ TableProcedureInterface.TableOperationType.READ));
+ }
+
+ // table can't be deleted because one item is in the queue
+ assertFalse(queue.markTableAsDeleted(tableName));
+
+ for (int i = 1; i <= nitems; ++i) {
+ // fetch item and take a lock
+ assertEquals(i, queue.poll().longValue());
+ // take the rlock
+ assertTrue(queue.tryAcquireTableRead(tableName, "read " + i));
+ // table can't be deleted because we have locks and/or items in the queue
+ assertFalse(queue.markTableAsDeleted(tableName));
+ }
+
+ for (int i = 1; i <= nitems; ++i) {
+ // table can't be deleted because we have locks
+ assertFalse(queue.markTableAsDeleted(tableName));
+ // release the rlock
+ queue.releaseTableRead(tableName);
+ }
+
+ // there are no items and no lock in the queeu
+ assertEquals(0, queue.size());
+ // complete the table deletion
+ assertTrue(queue.markTableAsDeleted(tableName));
+ }
+
+ /**
+ * Verify the correct logic of RWLocks on the queue
+ */
+ @Test
+ public void testVerifyRwLocks() throws Exception {
+ TableName tableName = TableName.valueOf("testtb");
+ queue.addBack(new TestTableProcedure(1, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(new TestTableProcedure(2, tableName,
+ TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(new TestTableProcedure(3, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
+ queue.addBack(new TestTableProcedure(4, tableName,
+ TableProcedureInterface.TableOperationType.READ));
+ queue.addBack(new TestTableProcedure(5, tableName,
+ TableProcedureInterface.TableOperationType.READ));
+
+ // Fetch the 1st item and take the write lock
+ Long procId = queue.poll();
+ assertEquals(1, procId.longValue());
+ assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
+
+ // Fetch the 2nd item and verify that the lock can't be acquired
+ assertEquals(null, queue.poll());
+
+ // Release the write lock and acquire the read lock
+ queue.releaseTableWrite(tableName);
+
+ // Fetch the 2nd item and take the read lock
+ procId = queue.poll();
+ assertEquals(2, procId.longValue());
+ assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
+
+ // Fetch the 3rd item and verify that the lock can't be acquired
+ procId = queue.poll();
+ assertEquals(3, procId.longValue());
+ assertEquals(false, queue.tryAcquireTableWrite(tableName, "write " + procId));
+
+ // release the rdlock of item 2 and take the wrlock for the 3d item
+ queue.releaseTableRead(tableName);
+ assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId));
+
+ // Fetch 4th item and verify that the lock can't be acquired
+ assertEquals(null, queue.poll());
+
+ // Release the write lock and acquire the read lock
+ queue.releaseTableWrite(tableName);
+
+ // Fetch the 4th item and take the read lock
+ procId = queue.poll();
+ assertEquals(4, procId.longValue());
+ assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
+
+ // Fetch the 4th item and take the read lock
+ procId = queue.poll();
+ assertEquals(5, procId.longValue());
+ assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId));
+
+ // Release 4th and 5th read-lock
+ queue.releaseTableRead(tableName);
+ queue.releaseTableRead(tableName);
+
+ // remove table queue
+ assertEquals(0, queue.size());
+ assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
+ }
+
+ /**
+ * Verify that "write" operations for a single table are serialized,
+ * but different tables can be executed in parallel.
+ */
+ @Test(timeout=90000)
+ public void testConcurrentWriteOps() throws Exception {
+ final TestTableProcSet procSet = new TestTableProcSet(queue);
+
+ final int NUM_ITEMS = 10;
+ final int NUM_TABLES = 4;
+ final AtomicInteger opsCount = new AtomicInteger(0);
+ for (int i = 0; i < NUM_TABLES; ++i) {
+ TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
+ for (int j = 1; j < NUM_ITEMS; ++j) {
+ procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
+ TableProcedureInterface.TableOperationType.EDIT));
+ opsCount.incrementAndGet();
+ }
+ }
+ assertEquals(opsCount.get(), queue.size());
+
+ final Thread[] threads = new Thread[NUM_TABLES * 2];
+ final HashSet<TableName> concurrentTables = new HashSet<TableName>();
+ final ArrayList<String> failures = new ArrayList<String>();
+ final AtomicInteger concurrentCount = new AtomicInteger(0);
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i] = new Thread() {
+ @Override
+ public void run() {
+ while (opsCount.get() > 0) {
+ try {
+ TableProcedureInterface proc = procSet.acquire();
+ if (proc == null) {
+ queue.signalAll();
+ if (opsCount.get() > 0) {
+ continue;
+ }
+ break;
+ }
+ synchronized (concurrentTables) {
+ assertTrue("unexpected concurrency on " + proc.getTableName(),
+ concurrentTables.add(proc.getTableName()));
+ }
+ assertTrue(opsCount.decrementAndGet() >= 0);
+ try {
+ long procId = ((Procedure)proc).getProcId();
+ TableName tableId = proc.getTableName();
+ int concurrent = concurrentCount.incrementAndGet();
+ assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
+ concurrent >= 1 && concurrent <= NUM_TABLES);
+ LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+ Thread.sleep(2000);
+ concurrent = concurrentCount.decrementAndGet();
+ LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+ assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
+ } finally {
+ synchronized (concurrentTables) {
+ assertTrue(concurrentTables.remove(proc.getTableName()));
+ }
+ procSet.release(proc);
+ }
+ } catch (Throwable e) {
+ LOG.error("Failed " + e.getMessage(), e);
+ synchronized (failures) {
+ failures.add(e.getMessage());
+ }
+ } finally {
+ queue.signalAll();
+ }
+ }
+ }
+ };
+ threads[i].start();
+ }
+ for (int i = 0; i < threads.length; ++i) {
+ threads[i].join();
+ }
+ assertTrue(failures.toString(), failures.isEmpty());
+ assertEquals(0, opsCount.get());
+ assertEquals(0, queue.size());
+
+ for (int i = 1; i <= NUM_TABLES; ++i) {
+ TableName table = TableName.valueOf(String.format("testtb-%04d", i));
+ assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
+ }
+ }
+
+ public static class TestTableProcSet {
+ private final MasterProcedureQueue queue;
+ private Map<Long, TableProcedureInterface> procsMap =
+ new ConcurrentHashMap<Long, TableProcedureInterface>();
+
+ public TestTableProcSet(final MasterProcedureQueue queue) {
+ this.queue = queue;
+ }
+
+ public void addBack(TableProcedureInterface tableProc) {
+ Procedure proc = (Procedure)tableProc;
+ procsMap.put(proc.getProcId(), tableProc);
+ queue.addBack(proc);
+ }
+
+ public void addFront(TableProcedureInterface tableProc) {
+ Procedure proc = (Procedure)tableProc;
+ procsMap.put(proc.getProcId(), tableProc);
+ queue.addFront(proc);
+ }
+
+ public TableProcedureInterface acquire() {
+ TableProcedureInterface proc = null;
+ boolean avail = false;
+ while (!avail) {
+ Long procId = queue.poll();
+ proc = procId != null ? procsMap.remove(procId) : null;
+ if (proc == null) break;
+ switch (proc.getTableOperationType()) {
+ case CREATE:
+ case DELETE:
+ case EDIT:
+ avail = queue.tryAcquireTableWrite(proc.getTableName(),
+ "op="+ proc.getTableOperationType());
+ break;
+ case READ:
+ avail = queue.tryAcquireTableRead(proc.getTableName(),
+ "op="+ proc.getTableOperationType());
+ break;
+ }
+ if (!avail) {
+ addFront(proc);
+ LOG.debug("yield procId=" + procId);
+ }
+ }
+ return proc;
+ }
+
+ public void release(TableProcedureInterface proc) {
+ switch (proc.getTableOperationType()) {
+ case CREATE:
+ case DELETE:
+ case EDIT:
+ queue.releaseTableWrite(proc.getTableName());
+ break;
+ case READ:
+ queue.releaseTableRead(proc.getTableName());
+ break;
+ }
+ }
+ }
+
+ public static class TestTableProcedure extends Procedure<Void>
+ implements TableProcedureInterface {
+ private final TableOperationType opType;
+ private final TableName tableName;
+
+ public TestTableProcedure() {
+ throw new UnsupportedOperationException("recovery should not be triggered here");
+ }
+
+ public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
+ this.tableName = tableName;
+ this.opType = opType;
+ setProcId(procId);
+ }
+
+ @Override
+ public TableName getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return opType;
+ }
+
+ @Override
+ protected Procedure[] execute(Void env) {
+ return null;
+ }
+
+ @Override
+ protected void rollback(Void env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean abort(Void env) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void serializeStateData(final OutputStream stream) throws IOException {}
+
+ @Override
+ protected void deserializeStateData(final InputStream stream) throws IOException {}
+ }
+}