You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/06/17 14:23:35 UTC
git commit: HBASE-11069 Decouple region merging from ZooKeeper
(Sergey Soldatov)
Repository: hbase
Updated Branches:
refs/heads/master 6304eb2cc -> e476947d3
HBASE-11069 Decouple region merging from ZooKeeper (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e476947d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e476947d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e476947d
Branch: refs/heads/master
Commit: e476947d3f87708acce630372fef30bc13959b4c
Parents: 6304eb2
Author: Michael Stack <st...@apache.org>
Authored: Tue Jun 17 07:23:04 2014 -0500
Committer: Michael Stack <st...@apache.org>
Committed: Tue Jun 17 07:23:04 2014 -0500
----------------------------------------------------------------------
.../BaseCoordinatedStateManager.java | 7 +-
.../coordination/RegionMergeCoordination.java | 106 ++++++
.../coordination/ZkCoordinatedStateManager.java | 7 +
.../coordination/ZkRegionMergeCoordination.java | 326 ++++++++++++++++++
.../hadoop/hbase/master/AssignmentManager.java | 14 +-
.../regionserver/RegionMergeTransaction.java | 341 ++++---------------
.../hadoop/hbase/master/TestMasterFailover.java | 5 +-
7 files changed, 525 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e476947d/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index 6cee2df..9c9bfba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -67,4 +67,9 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
* Method to retrieve coordination for opening region operations.
*/
public abstract OpenRegionCoordination getOpenRegionCoordination();
-}
+
+ /**
+ * Method to retrieve coordination for region merge transaction
+ */
+ public abstract RegionMergeCoordination getRegionMergeCoordination();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/e476947d/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/RegionMergeCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/RegionMergeCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/RegionMergeCoordination.java
new file mode 100644
index 0000000..b51dd9c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/RegionMergeCoordination.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * Coordination operations for region merge transaction. The operation should be coordinated at the
+ * following stages:<br>
+ * 1. startRegionMergeTransaction - all preparation/initialization for merge region transaction<br>
+ * 2. waitForRegionMergeTransaction - wait until coordination complete all works related
+ * to merge<br>
+ * 3. confirmRegionMergeTransaction - confirm that the merge could be completed and none of merging
+ * regions moved somehow<br>
+ * 4. completeRegionMergeTransaction - all steps that are required to complete the transaction.
+ * Called after PONR (point of no return) <br>
+ */
+@InterfaceAudience.Private
+public interface RegionMergeCoordination {
+
+ RegionMergeDetails getDefaultDetails();
+
+ /**
+ * Dummy interface for region merge transaction details.
+ */
+ public static interface RegionMergeDetails {
+ }
+
+ /**
+ * Start the region merge transaction
+ * @param region region to be created as offline
+ * @param serverName server event originates from
+ * @throws IOException
+ */
+ void startRegionMergeTransaction(HRegionInfo region, ServerName serverName, HRegionInfo a,
+ HRegionInfo b) throws IOException;
+
+ /**
+ * Get everything ready for region merge
+ * @throws IOException
+ */
+ void waitForRegionMergeTransaction(RegionServerServices services, HRegionInfo mergedRegionInfo,
+ HRegion region_a, HRegion region_b, RegionMergeDetails details) throws IOException;
+
+ /**
+ * Confirm that the region merge can be performed
+ * @param merged region
+ * @param a merging region A
+ * @param b merging region B
+ * @param serverName server event originates from
+ * @param rmd region merge details
+ * @throws IOException If thrown, transaction failed.
+ */
+ void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
+ ServerName serverName, RegionMergeDetails rmd) throws IOException;
+
+ /**
+ * @param merged region
+ * @param a merging region A
+ * @param b merging region B
+ * @param serverName server event originates from
+ * @param rmd region merge details
+ * @throws IOException
+ */
+ void processRegionMergeRequest(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
+ ServerName serverName, RegionMergeDetails rmd) throws IOException;
+
+ /**
+ * Finish off merge transaction
+ * @param services Used to online/offline regions.
+ * @param merged region
+ * @param region_a merging region A
+ * @param region_b merging region B
+ * @param rmd region merge details
+ * @param mergedRegion
+ * @throws IOException If thrown, transaction failed. Call
+ * {@link RegionMergeTransaction#rollback(Server, RegionServerServices)}
+ */
+ void completeRegionMergeTransaction(RegionServerServices services, HRegionInfo merged,
+ HRegion region_a, HRegion region_b, RegionMergeDetails rmd, HRegion mergedRegion)
+ throws IOException;
+
+ /**
+ * This method is used during rollback
+ * @param merged region to be rolled back
+ */
+ void clean(HRegionInfo merged);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e476947d/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 2c2fa44..a5492a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -38,6 +38,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
protected SplitTransactionCoordination splitTransactionCoordination;
protected CloseRegionCoordination closeRegionCoordination;
protected OpenRegionCoordination openRegionCoordination;
+ protected RegionMergeCoordination regionMergeCoordination;
@Override
public void initialize(Server server) {
@@ -47,6 +48,7 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
splitTransactionCoordination = new ZKSplitTransactionCoordination(this, watcher);
closeRegionCoordination = new ZkCloseRegionCoordination(this, watcher);
openRegionCoordination = new ZkOpenRegionCoordination(this, watcher);
+ regionMergeCoordination = new ZkRegionMergeCoordination(this, watcher);
}
@Override
@@ -78,4 +80,9 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
public OpenRegionCoordination getOpenRegionCoordination() {
return openRegionCoordination;
}
+
+ @Override
+ public RegionMergeCoordination getRegionMergeCoordination() {
+ return regionMergeCoordination;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e476947d/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java
new file mode 100644
index 0000000..8c18821
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkRegionMergeCoordination.java
@@ -0,0 +1,326 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coordination;
+
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
+import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RegionTransition;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.data.Stat;
+
+public class ZkRegionMergeCoordination implements RegionMergeCoordination {
+
+ private CoordinatedStateManager manager;
+ private final ZooKeeperWatcher watcher;
+
+ private static final Log LOG = LogFactory.getLog(ZkRegionMergeCoordination.class);
+
+ public ZkRegionMergeCoordination(CoordinatedStateManager manager,
+ ZooKeeperWatcher watcher) {
+ this.manager = manager;
+ this.watcher = watcher;
+ }
+
+ /**
+ * ZK-based implementation. Has details about whether the state transition should be reflected in
+ * ZK, as well as expected version of znode.
+ */
+ public static class ZkRegionMergeDetails implements RegionMergeCoordination.RegionMergeDetails {
+ private int znodeVersion;
+
+ public ZkRegionMergeDetails() {
+ }
+
+ public int getZnodeVersion() {
+ return znodeVersion;
+ }
+
+ public void setZnodeVersion(int znodeVersion) {
+ this.znodeVersion = znodeVersion;
+ }
+ }
+
+ @Override
+ public RegionMergeDetails getDefaultDetails() {
+ ZkRegionMergeDetails zstd = new ZkRegionMergeDetails();
+ zstd.setZnodeVersion(-1);
+ return zstd;
+ }
+
+ /**
+ * Wait for the merging node to be transitioned from pending_merge
+ * to merging by master. That's how we are sure master has processed
+ * the event and is good with us to move on. If we don't get any update,
+ * we periodically transition the node so that master gets the callback.
+ * If the node is removed or is not in pending_merge state any more,
+ * we abort the merge.
+ * @throws IOException
+ */
+
+ @Override
+ public void waitForRegionMergeTransaction(RegionServerServices services,
+ HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
+ throws IOException {
+ try {
+ int spins = 0;
+ Stat stat = new Stat();
+ ServerName expectedServer = manager.getServer().getServerName();
+ String node = mergedRegionInfo.getEncodedName();
+ ZkRegionMergeDetails zdetails = (ZkRegionMergeDetails) details;
+ while (!(manager.getServer().isStopped() || services.isStopping())) {
+ if (spins % 5 == 0) {
+ LOG.debug("Still waiting for master to process " + "the pending_merge for " + node);
+ ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) getDefaultDetails();
+ transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(),
+ region_b.getRegionInfo(), expectedServer, zrmd, RS_ZK_REQUEST_REGION_MERGE,
+ RS_ZK_REQUEST_REGION_MERGE);
+ }
+ Thread.sleep(100);
+ spins++;
+ byte[] data = ZKAssign.getDataNoWatch(watcher, node, stat);
+ if (data == null) {
+ throw new IOException("Data is null, merging node " + node + " no longer exists");
+ }
+ RegionTransition rt = RegionTransition.parseFrom(data);
+ EventType et = rt.getEventType();
+ if (et == RS_ZK_REGION_MERGING) {
+ ServerName serverName = rt.getServerName();
+ if (!serverName.equals(expectedServer)) {
+ throw new IOException("Merging node " + node + " is for " + serverName + ", not us "
+ + expectedServer);
+ }
+ byte[] payloadOfMerging = rt.getPayload();
+ List<HRegionInfo> mergingRegions =
+ HRegionInfo.parseDelimitedFrom(payloadOfMerging, 0, payloadOfMerging.length);
+ assert mergingRegions.size() == 3;
+ HRegionInfo a = mergingRegions.get(1);
+ HRegionInfo b = mergingRegions.get(2);
+ HRegionInfo hri_a = region_a.getRegionInfo();
+ HRegionInfo hri_b = region_b.getRegionInfo();
+ if (!(hri_a.equals(a) && hri_b.equals(b))) {
+ throw new IOException("Merging node " + node + " is for " + a + ", " + b
+ + ", not expected regions: " + hri_a + ", " + hri_b);
+ }
+ // Master has processed it.
+ zdetails.setZnodeVersion(stat.getVersion());
+ return;
+ }
+ if (et != RS_ZK_REQUEST_REGION_MERGE) {
+ throw new IOException("Merging node " + node + " moved out of merging to " + et);
+ }
+ }
+ // Server is stopping/stopped
+ throw new IOException("Server is " + (services.isStopping() ? "stopping" : "stopped"));
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new IOException("Failed getting MERGING znode on "
+ + mergedRegionInfo.getRegionNameAsString(), e);
+ }
+ }
+
+ /**
+ * Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
+ * Create it ephemeral in case regionserver dies mid-merge.
+ *
+ * <p>
+ * Does not transition nodes from other states. If a node already exists for
+ * this region, a {@link NodeExistsException} will be thrown.
+ *
+ * @param region region to be created as offline
+ * @param serverName server event originates from
+ * @throws IOException
+ */
+ @Override
+ public void startRegionMergeTransaction(final HRegionInfo region, final ServerName serverName,
+ final HRegionInfo a, final HRegionInfo b) throws IOException {
+ LOG.debug(watcher.prefix("Creating ephemeral node for " + region.getEncodedName()
+ + " in PENDING_MERGE state"));
+ byte[] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
+ RegionTransition rt =
+ RegionTransition.createRegionTransition(RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(),
+ serverName, payload);
+ String node = ZKAssign.getNodeName(watcher, region.getEncodedName());
+ try {
+ if (!ZKUtil.createEphemeralNodeAndWatch(watcher, node, rt.toByteArray())) {
+ throw new IOException("Failed create of ephemeral " + node);
+ }
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hadoop.hbase.regionserver.coordination.RegionMergeCoordination#clean(org.apache.hadoop
+ * .hbase.Server, org.apache.hadoop.hbase.HRegionInfo)
+ */
+ @Override
+ public void clean(final HRegionInfo hri) {
+ try {
+ // Only delete if its in expected state; could have been hijacked.
+ if (!ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REQUEST_REGION_MERGE, manager
+ .getServer().getServerName())) {
+ ZKAssign.deleteNode(watcher, hri.getEncodedName(), RS_ZK_REGION_MERGING, manager
+ .getServer().getServerName());
+ }
+ } catch (KeeperException.NoNodeException e) {
+ LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
+ } catch (KeeperException e) {
+ manager.getServer().abort("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
+ }
+ }
+
+ /*
+ * ZooKeeper implementation of finishRegionMergeTransaction
+ */
+ @Override
+ public void completeRegionMergeTransaction(final RegionServerServices services,
+ HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails rmd,
+ HRegion mergedRegion) throws IOException {
+ ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
+ if (manager.getServer() == null
+ || manager.getServer().getCoordinatedStateManager() == null) {
+ return;
+ }
+ // Tell master about merge by updating zk. If we fail, abort.
+ try {
+ transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
+ manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
+
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ int spins = 0;
+ // Now wait for the master to process the merge. We know it's done
+ // when the znode is deleted. The reason we keep tickling the znode is
+ // that it's possible for the master to miss an event.
+ do {
+ if (spins % 10 == 0) {
+ LOG.debug("Still waiting on the master to process the merge for "
+ + mergedRegionInfo.getEncodedName() + ", waited "
+ + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
+ }
+ Thread.sleep(100);
+ // When this returns -1 it means the znode doesn't exist
+ transitionMergingNode(mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
+ manager.getServer().getServerName(), rmd, RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
+ spins++;
+ } while (zrmd.getZnodeVersion() != -1 && !manager.getServer().isStopped()
+ && !services.isStopping());
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new IOException("Failed telling master about merge "
+ + mergedRegionInfo.getEncodedName(), e);
+ }
+ // Leaving here, the mergedir with its dross will be in place but since the
+ // merge was successful, just leave it; it'll be cleaned when region_a is
+ // cleaned up by CatalogJanitor on master
+ }
+
+ /*
+ * Zookeeper implementation of region merge confirmation
+ */
+ @Override
+ public void confirmRegionMergeTransaction(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
+ ServerName serverName, RegionMergeDetails rmd) throws IOException {
+ transitionMergingNode(merged, a, b, serverName, rmd, RS_ZK_REGION_MERGING,
+ RS_ZK_REGION_MERGING);
+ }
+
+ /*
+ * Zookeeper implementation of region merge processing
+ */
+ @Override
+ public void processRegionMergeRequest(HRegionInfo p, HRegionInfo hri_a, HRegionInfo hri_b,
+ ServerName sn, RegionMergeDetails rmd) throws IOException {
+ transitionMergingNode(p, hri_a, hri_b, sn, rmd, EventType.RS_ZK_REQUEST_REGION_MERGE,
+ EventType.RS_ZK_REGION_MERGING);
+ }
+
+ /**
+ * Transitions an existing ephemeral node for the specified region which is
+ * currently in the begin state to be in the end state. Master cleans up the
+ * final MERGE znode when it reads it (or if we crash, zk will clean it up).
+ *
+ * <p>
+ * Does not transition nodes from other states. If for some reason the node
+ * could not be transitioned, the method returns -1. If the transition is
+ * successful, the version of the node after transition is updated in details.
+ *
+ * <p>
+ * This method can fail and return false for three different reasons:
+ * <ul>
+ * <li>Node for this region does not exist</li>
+ * <li>Node for this region is not in the begin state</li>
+ * <li>After verifying the begin state, update fails because of wrong version
+ * (this should never actually happen since an RS only does this transition
+ * following a transition to the begin state. If two RS are conflicting, one would
+ * fail the original transition to the begin state and not this transition)</li>
+ * </ul>
+ *
+ * <p>
+ * Does not set any watches.
+ *
+ * <p>
+ * This method should only be used by a RegionServer when merging two regions.
+ *
+ * @param merged region to be transitioned to opened
+ * @param a merging region A
+ * @param b merging region B
+ * @param serverName server event originates from
+ * @param rmd region merge details
+ * @param beginState the expected current state the node should be
+ * @param endState the state to be transition to
+ * @throws IOException
+ */
+ private void transitionMergingNode(HRegionInfo merged, HRegionInfo a, HRegionInfo b,
+ ServerName serverName, RegionMergeDetails rmd, final EventType beginState,
+ final EventType endState) throws IOException {
+ ZkRegionMergeDetails zrmd = (ZkRegionMergeDetails) rmd;
+ byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
+ try {
+ zrmd.setZnodeVersion(ZKAssign.transitionNode(watcher, merged, serverName, beginState,
+ endState, zrmd.getZnodeVersion(), payload));
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e476947d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 73b06b2..9f93e0b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -60,10 +60,12 @@ import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
+import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
@@ -82,7 +84,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
-import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -3538,11 +3539,14 @@ public class AssignmentManager extends ZooKeeperListener {
EventType et = rt.getEventType();
if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
try {
- if (RegionMergeTransaction.transitionMergingNode(watcher, p,
- hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
- EventType.RS_ZK_REGION_MERGING) == -1) {
+ RegionMergeCoordination.RegionMergeDetails std =
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().getDefaultDetails();
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
+ if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
byte[] data = ZKAssign.getData(watcher, encodedName);
- EventType currentType = null;
+ EventType currentType = null;
if (data != null) {
RegionTransition newRt = RegionTransition.parseFrom(data);
currentType = newRt.getEventType();
http://git-wip-us.apache.org/repos/asf/hbase/blob/e476947d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
index eedba2b..bd29104 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java
@@ -13,15 +13,11 @@
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
+ * License for the specific language governing permissions and limitationsME
* under the License.
*/
package org.apache.hadoop.hbase.regionserver;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGED;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REGION_MERGING;
-import static org.apache.hadoop.hbase.executor.EventType.RS_ZK_REQUEST_REGION_MERGE;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -35,7 +31,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaMutationAnnotation;
-import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
@@ -44,19 +39,15 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
+import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails;
import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.data.Stat;
/**
* Executes region merge as a "transaction". It is similar with
@@ -97,10 +88,9 @@ public class RegionMergeTransaction {
private final HRegion region_b;
// merges dir is under region_a
private final Path mergesdir;
- private int znodeVersion = -1;
// We only merge adjacent regions if forcible is false
private final boolean forcible;
- private boolean useZKForAssignment;
+ private boolean useCoordinationForAssignment;
/**
* Types to add to the transaction journal. Each enum is a step in the merge
@@ -110,7 +100,7 @@ public class RegionMergeTransaction {
/**
* Set region as in transition, set it into MERGING state.
*/
- SET_MERGING_IN_ZK,
+ SET_MERGING,
/**
* We created the temporary merge data directory.
*/
@@ -152,6 +142,8 @@ public class RegionMergeTransaction {
private RegionServerCoprocessorHost rsCoprocessorHost = null;
+ private RegionMergeDetails rmd;
+
/**
* Constructor
* @param a region a to merge
@@ -230,8 +222,7 @@ public class RegionMergeTransaction {
/**
* Run the transaction.
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
+ * @param server Hosting server instance. Can be null when testing
* @param services Used to online/offline regions.
* @throws IOException If thrown, transaction failed. Call
* {@link #rollback(Server, RegionServerServices)}
@@ -240,9 +231,15 @@ public class RegionMergeTransaction {
* @see #rollback(Server, RegionServerServices)
*/
public HRegion execute(final Server server,
- final RegionServerServices services) throws IOException {
- useZKForAssignment = server == null ? true :
- ConfigUtil.useZKForAssignment(server.getConfiguration());
+ final RegionServerServices services) throws IOException {
+ useCoordinationForAssignment =
+ server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
+ if (rmd == null) {
+ rmd =
+ server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server
+ .getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails()
+ : null;
+ }
if (rsCoprocessorHost == null) {
rsCoprocessorHost = server != null ?
((HRegionServer) server).getRegionServerCoprocessorHost() : null;
@@ -257,14 +254,20 @@ public class RegionMergeTransaction {
public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
HRegion mergedRegion) throws IOException {
openMergedRegion(server, services, mergedRegion);
- transitionZKNode(server, services, mergedRegion);
+ if (useCoordination(server)) {
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().completeRegionMergeTransaction(services, mergedRegionInfo,
+ region_a, region_b, rmd, mergedRegion);
+ }
+ if (rsCoprocessorHost != null) {
+ rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
+ }
return mergedRegion;
}
/**
* Prepare the merged region and region files.
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
+ * @param server Hosting server instance. Can be null when testing
* @param services Used to online/offline regions.
* @return merged region
* @throws IOException If thrown, transaction failed. Call
@@ -286,7 +289,7 @@ public class RegionMergeTransaction {
}
}
- // If true, no cluster to write meta edits to or to update znodes in.
+ // If true, no cluster to write meta edits to or to use coordination.
boolean testing = server == null ? true : server.getConfiguration()
.getBoolean("hbase.testing.nocluster", false);
@@ -320,7 +323,7 @@ public class RegionMergeTransaction {
// will determine whether the region is merged or not in case of failures.
// If it is successful, master will roll-forward, if not, master will
// rollback
- if (!testing && useZKForAssignment) {
+ if (!testing && useCoordinationForAssignment) {
if (metaEntries.isEmpty()) {
MetaEditor.mergeRegions(server.getCatalogTracker(), mergedRegion.getRegionInfo(), region_a
.getRegionInfo(), region_b.getRegionInfo(), server.getServerName());
@@ -328,7 +331,7 @@ public class RegionMergeTransaction {
mergeRegionsAndPutMetaEntries(server.getCatalogTracker(), mergedRegion.getRegionInfo(),
region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), metaEntries);
}
- } else if (services != null && !useZKForAssignment) {
+ } else if (services != null && !useCoordinationForAssignment) {
if (!services.reportRegionTransition(TransitionCode.MERGE_PONR,
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
// Passed PONR, let SSH clean it up
@@ -377,17 +380,24 @@ public class RegionMergeTransaction {
public HRegion stepsBeforePONR(final Server server, final RegionServerServices services,
boolean testing) throws IOException {
- // Set ephemeral MERGING znode up in zk. Mocked servers sometimes don't
- // have zookeeper so don't do zk stuff if server or zookeeper is null
- if (useZKAndZKIsSet(server)) {
+ if (rmd == null) {
+ rmd =
+ server != null && server.getCoordinatedStateManager() != null ? ((BaseCoordinatedStateManager) server
+ .getCoordinatedStateManager()).getRegionMergeCoordination().getDefaultDetails()
+ : null;
+ }
+
+ // If server doesn't have a coordination state manager, don't do coordination actions.
+ if (useCoordination(server)) {
try {
- createNodeMerging(server.getZooKeeper(), this.mergedRegionInfo,
- server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
- } catch (KeeperException e) {
- throw new IOException("Failed creating PENDING_MERGE znode on "
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().startRegionMergeTransaction(mergedRegionInfo,
+ server.getServerName(), region_a.getRegionInfo(), region_b.getRegionInfo());
+ } catch (IOException e) {
+ throw new IOException("Failed to start region merge transaction for "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
- } else if (services != null && !useZKForAssignment) {
+ } else if (services != null && !useCoordinationForAssignment) {
if (!services.reportRegionTransition(TransitionCode.READY_TO_MERGE,
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
throw new IOException("Failed to get ok from master to merge "
@@ -395,12 +405,14 @@ public class RegionMergeTransaction {
+ region_b.getRegionInfo().getRegionNameAsString());
}
}
- this.journal.add(JournalEntry.SET_MERGING_IN_ZK);
- if (useZKAndZKIsSet(server)) {
+ this.journal.add(JournalEntry.SET_MERGING);
+ if (useCoordination(server)) {
// After creating the merge node, wait for master to transition it
// from PENDING_MERGE to MERGING so that we can move on. We want master
// knows about it and won't transition any region which is merging.
- znodeVersion = getZKNode(server, services);
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().waitForRegionMergeTransaction(services, mergedRegionInfo,
+ region_a, region_b, rmd);
}
this.region_a.getRegionFileSystem().createMergesDir();
@@ -420,16 +432,15 @@ public class RegionMergeTransaction {
// clean this up.
mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB);
- if (server != null && useZKAndZKIsSet(server)) {
+ if (useCoordination(server)) {
try {
- // Do one more check on the merging znode (before it is too late) in case
- // any merging region is moved somehow. If so, the znode transition will fail.
- this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
- this.mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo(),
- server.getServerName(), this.znodeVersion,
- RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGING);
- } catch (KeeperException e) {
- throw new IOException("Failed setting MERGING znode on "
+ // Do the final check in case any merging region is moved somehow. If so, the transition
+ // will fail.
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().confirmRegionMergeTransaction(this.mergedRegionInfo,
+ region_a.getRegionInfo(), region_b.getRegionInfo(), server.getServerName(), rmd);
+ } catch (IOException e) {
+ throw new IOException("Failed setting MERGING on "
+ this.mergedRegionInfo.getRegionNameAsString(), e);
}
}
@@ -545,8 +556,7 @@ public class RegionMergeTransaction {
/**
* Perform time consuming opening of the merged region.
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
+ * @param server Hosting server instance. Can be null when testing
* @param services Used to online/offline regions.
* @param merged the merged region
* @throws IOException If thrown, transaction failed. Call
@@ -569,7 +579,7 @@ public class RegionMergeTransaction {
if (services != null) {
try {
- if (useZKForAssignment) {
+ if (useCoordinationForAssignment) {
services.postOpenDeployTasks(merged, server.getCatalogTracker());
} else if (!services.reportRegionTransition(TransitionCode.MERGED,
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
@@ -585,134 +595,6 @@ public class RegionMergeTransaction {
}
/**
- * Finish off merge transaction, transition the zknode
- * @param server Hosting server instance. Can be null when testing (won't try
- * and update in zk if a null server)
- * @param services Used to online/offline regions.
- * @throws IOException If thrown, transaction failed. Call
- * {@link #rollback(Server, RegionServerServices)}
- */
- void transitionZKNode(final Server server, final RegionServerServices services,
- HRegion mergedRegion) throws IOException {
- if (useZKAndZKIsSet(server)) {
- // Tell master about merge by updating zk. If we fail, abort.
- try {
- this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
- this.mergedRegionInfo, region_a.getRegionInfo(),
- region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
- RS_ZK_REGION_MERGING, RS_ZK_REGION_MERGED);
-
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
- int spins = 0;
- // Now wait for the master to process the merge. We know it's done
- // when the znode is deleted. The reason we keep tickling the znode is
- // that it's possible for the master to miss an event.
- do {
- if (spins % 10 == 0) {
- LOG.debug("Still waiting on the master to process the merge for "
- + this.mergedRegionInfo.getEncodedName() + ", waited "
- + (EnvironmentEdgeManager.currentTimeMillis() - startTime) + "ms");
- }
- Thread.sleep(100);
- // When this returns -1 it means the znode doesn't exist
- this.znodeVersion = transitionMergingNode(server.getZooKeeper(),
- this.mergedRegionInfo, region_a.getRegionInfo(),
- region_b.getRegionInfo(), server.getServerName(), this.znodeVersion,
- RS_ZK_REGION_MERGED, RS_ZK_REGION_MERGED);
- spins++;
- } while (this.znodeVersion != -1 && !server.isStopped()
- && !services.isStopping());
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new IOException("Failed telling master about merge "
- + mergedRegionInfo.getEncodedName(), e);
- }
- }
-
- if (rsCoprocessorHost != null) {
- rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
- }
-
- // Leaving here, the mergedir with its dross will be in place but since the
- // merge was successful, just leave it; it'll be cleaned when region_a is
- // cleaned up by CatalogJanitor on master
- }
-
- /**
- * Wait for the merging node to be transitioned from pending_merge
- * to merging by master. That's how we are sure master has processed
- * the event and is good with us to move on. If we don't get any update,
- * we periodically transition the node so that master gets the callback.
- * If the node is removed or is not in pending_merge state any more,
- * we abort the merge.
- */
- private int getZKNode(final Server server,
- final RegionServerServices services) throws IOException {
- // Wait for the master to process the pending_merge.
- try {
- int spins = 0;
- Stat stat = new Stat();
- ZooKeeperWatcher zkw = server.getZooKeeper();
- ServerName expectedServer = server.getServerName();
- String node = mergedRegionInfo.getEncodedName();
- while (!(server.isStopped() || services.isStopping())) {
- if (spins % 5 == 0) {
- LOG.debug("Still waiting for master to process "
- + "the pending_merge for " + node);
- transitionMergingNode(zkw, mergedRegionInfo, region_a.getRegionInfo(),
- region_b.getRegionInfo(), expectedServer, -1, RS_ZK_REQUEST_REGION_MERGE,
- RS_ZK_REQUEST_REGION_MERGE);
- }
- Thread.sleep(100);
- spins++;
- byte [] data = ZKAssign.getDataNoWatch(zkw, node, stat);
- if (data == null) {
- throw new IOException("Data is null, merging node "
- + node + " no longer exists");
- }
- RegionTransition rt = RegionTransition.parseFrom(data);
- EventType et = rt.getEventType();
- if (et == RS_ZK_REGION_MERGING) {
- ServerName serverName = rt.getServerName();
- if (!serverName.equals(expectedServer)) {
- throw new IOException("Merging node " + node + " is for "
- + serverName + ", not us " + expectedServer);
- }
- byte [] payloadOfMerging = rt.getPayload();
- List<HRegionInfo> mergingRegions = HRegionInfo.parseDelimitedFrom(
- payloadOfMerging, 0, payloadOfMerging.length);
- assert mergingRegions.size() == 3;
- HRegionInfo a = mergingRegions.get(1);
- HRegionInfo b = mergingRegions.get(2);
- HRegionInfo hri_a = region_a.getRegionInfo();
- HRegionInfo hri_b = region_b.getRegionInfo();
- if (!(hri_a.equals(a) && hri_b.equals(b))) {
- throw new IOException("Merging node " + node + " is for " + a + ", "
- + b + ", not expected regions: " + hri_a + ", " + hri_b);
- }
- // Master has processed it.
- return stat.getVersion();
- }
- if (et != RS_ZK_REQUEST_REGION_MERGE) {
- throw new IOException("Merging node " + node
- + " moved out of merging to " + et);
- }
- }
- // Server is stopping/stopped
- throw new IOException("Server is "
- + (services.isStopping() ? "stopping" : "stopped"));
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new IOException("Failed getting MERGING znode on "
- + mergedRegionInfo.getRegionNameAsString(), e);
- }
- }
-
- /**
* Create reference file(s) of merging regions under the region_a merges dir
* @param hstoreFilesOfRegionA
* @param hstoreFilesOfRegionB
@@ -769,14 +651,15 @@ public class RegionMergeTransaction {
JournalEntry je = iterator.previous();
switch (je) {
- case SET_MERGING_IN_ZK:
- if (useZKAndZKIsSet(server)) {
- cleanZK(server, this.mergedRegionInfo);
- } else if (services != null && !useZKForAssignment
+ case SET_MERGING:
+ if (useCoordination(server)) {
+ ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
+ .getRegionMergeCoordination().clean(this.mergedRegionInfo);
+ } else if (services != null && !useCoordinationForAssignment
&& !services.reportRegionTransition(TransitionCode.MERGE_REVERTED,
mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) {
return false;
- }
+ }
break;
case CREATED_MERGE_DIR:
@@ -851,100 +734,12 @@ public class RegionMergeTransaction {
return this.mergesdir;
}
- private boolean useZKAndZKIsSet(final Server server) {
- return server != null && useZKForAssignment && server.getZooKeeper() != null;
+ private boolean useCoordination(final Server server) {
+ return server != null && useCoordinationForAssignment
+ && server.getCoordinatedStateManager() != null;
}
- private static void cleanZK(final Server server, final HRegionInfo hri) {
- try {
- // Only delete if its in expected state; could have been hijacked.
- if (!ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
- RS_ZK_REQUEST_REGION_MERGE, server.getServerName())) {
- ZKAssign.deleteNode(server.getZooKeeper(), hri.getEncodedName(),
- RS_ZK_REGION_MERGING, server.getServerName());
- }
- } catch (KeeperException.NoNodeException e) {
- LOG.info("Failed cleanup zk node of " + hri.getRegionNameAsString(), e);
- } catch (KeeperException e) {
- server.abort("Failed cleanup zk node of " + hri.getRegionNameAsString(),e);
- }
- }
-
- /**
- * Creates a new ephemeral node in the PENDING_MERGE state for the merged region.
- * Create it ephemeral in case regionserver dies mid-merge.
- *
- * <p>
- * Does not transition nodes from other states. If a node already exists for
- * this region, a {@link NodeExistsException} will be thrown.
- *
- * @param zkw zk reference
- * @param region region to be created as offline
- * @param serverName server event originates from
- * @throws KeeperException
- * @throws IOException
- */
- public static void createNodeMerging(final ZooKeeperWatcher zkw, final HRegionInfo region,
- final ServerName serverName, final HRegionInfo a,
- final HRegionInfo b) throws KeeperException, IOException {
- LOG.debug(zkw.prefix("Creating ephemeral node for "
- + region.getEncodedName() + " in PENDING_MERGE state"));
- byte [] payload = HRegionInfo.toDelimitedByteArray(region, a, b);
- RegionTransition rt = RegionTransition.createRegionTransition(
- RS_ZK_REQUEST_REGION_MERGE, region.getRegionName(), serverName, payload);
- String node = ZKAssign.getNodeName(zkw, region.getEncodedName());
- if (!ZKUtil.createEphemeralNodeAndWatch(zkw, node, rt.toByteArray())) {
- throw new IOException("Failed create of ephemeral " + node);
- }
- }
- /**
- * Transitions an existing ephemeral node for the specified region which is
- * currently in the begin state to be in the end state. Master cleans up the
- * final MERGE znode when it reads it (or if we crash, zk will clean it up).
- *
- * <p>
- * Does not transition nodes from other states. If for some reason the node
- * could not be transitioned, the method returns -1. If the transition is
- * successful, the version of the node after transition is returned.
- *
- * <p>
- * This method can fail and return false for three different reasons:
- * <ul>
- * <li>Node for this region does not exist</li>
- * <li>Node for this region is not in the begin state</li>
- * <li>After verifying the begin state, update fails because of wrong version
- * (this should never actually happen since an RS only does this transition
- * following a transition to the begin state. If two RS are conflicting, one would
- * fail the original transition to the begin state and not this transition)</li>
- * </ul>
- *
- * <p>
- * Does not set any watches.
- *
- * <p>
- * This method should only be used by a RegionServer when merging two regions.
- *
- * @param zkw zk reference
- * @param merged region to be transitioned to opened
- * @param a merging region A
- * @param b merging region B
- * @param serverName server event originates from
- * @param znodeVersion expected version of data before modification
- * @param beginState the expected current state the znode should be
- * @param endState the state to be transition to
- * @return version of node after transition, -1 if unsuccessful transition
- * @throws KeeperException if unexpected zookeeper exception
- * @throws IOException
- */
- public static int transitionMergingNode(ZooKeeperWatcher zkw,
- HRegionInfo merged, HRegionInfo a, HRegionInfo b, ServerName serverName,
- final int znodeVersion, final EventType beginState,
- final EventType endState) throws KeeperException, IOException {
- byte[] payload = HRegionInfo.toDelimitedByteArray(merged, a, b);
- return ZKAssign.transitionNode(zkw, merged, serverName,
- beginState, endState, znodeVersion, payload);
- }
/**
* Checks if the given region has merge qualifier in hbase:meta
http://git-wip-us.apache.org/repos/asf/hbase/blob/e476947d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
index de7ce1a..a8fbf54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.RegionState.State;
@@ -389,8 +390,8 @@ public class TestMasterFailover {
// Regions of table of merging regions
// Cause: Master was down while merging was going on
- RegionMergeTransaction.createNodeMerging(
- zkw, newRegion, mergingServer, a, b);
+ ((BaseCoordinatedStateManager) hrs.getCoordinatedStateManager())
+ .getRegionMergeCoordination().startRegionMergeTransaction(newRegion, mergingServer, a, b);
/*
* ZK = NONE