You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/08/30 17:06:34 UTC
[1/2] hbase git commit: HBASE-16082 Procedure v2 - Move out helpers
from MasterProcedureScheduler
Repository: hbase
Updated Branches:
refs/heads/master 0f92e943a -> 4a4f8e704
HBASE-16082 Procedure v2 - Move out helpers from MasterProcedureScheduler
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2acd788d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2acd788d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2acd788d
Branch: refs/heads/master
Commit: 2acd788dced837b2daaf66906a862baa6e5b08fa
Parents: 0f92e94
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Tue Aug 30 09:44:35 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue Aug 30 09:44:35 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/AvlUtil.java | 559 +++++++++++++++++++
.../apache/hadoop/hbase/util/TestAvlUtil.java | 261 +++++++++
.../procedure/MasterProcedureScheduler.java | 297 +++-------
3 files changed, 889 insertions(+), 228 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2acd788d/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java
new file mode 100644
index 0000000..260a8b2
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java
@@ -0,0 +1,559 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Helper class that allows to create and manipulate an AvlTree.
+ * The main utility is in cases where over time we have a lot of add/remove of the same object,
+ * and we want to avoid all the allocations/deallocations of the "node" objects that the
+ * java containers will create.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class AvlUtil {
+ private AvlUtil() {}
+
+ /**
+ * This class represent a node that will be used in an AvlTree.
+ * Instead of creating another object for the tree node,
+ * like the TreeMap and the other java contains, here the node can be extended
+ * and the content can be embedded directly in the node itself.
+ * This is useful in cases where over time we have a lot of add/remove of the same object.
+ */
+ @InterfaceAudience.Private
+ public static abstract class AvlNode<TNode extends AvlNode> {
+ protected TNode avlLeft;
+ protected TNode avlRight;
+ protected int avlHeight;
+
+ public abstract int compareTo(TNode other);
+ }
+
+ /**
+ * This class extends the AvlNode and adds two links that will be used in conjunction
+ * with the AvlIterableList class.
+ * This is useful in situations where your node must be in a map to have a quick lookup by key,
+ * but it also require to be in something like a list/queue.
+ * This is useful in cases where over time we have a lot of add/remove of the same object.
+ */
+ @InterfaceAudience.Private
+ public static abstract class AvlLinkedNode<TNode extends AvlLinkedNode> extends AvlNode<TNode> {
+ protected TNode iterNext = null;
+ protected TNode iterPrev = null;
+ }
+
+ @InterfaceAudience.Private
+ public interface AvlInsertOrReplace<TNode extends AvlNode> {
+ TNode insert(Object searchKey);
+ TNode replace(Object searchKey, TNode prevNode);
+ }
+
+ /**
+ * The AvlTree allows to lookup an object using a custom key.
+ * e.g. the java Map allows only to lookup by key using the Comparator
+ * specified in the constructor.
+ * In this case you can pass a specific comparator for every needs.
+ */
+ @InterfaceAudience.Private
+ public static interface AvlKeyComparator<TNode extends AvlNode> {
+ int compareKey(TNode node, Object key);
+ }
+
+ /**
+ * Visitor that allows to traverse a set of AvlNodes.
+ * If you don't like the callback style of the visitor you can always use the AvlTreeIterator.
+ */
+ @InterfaceAudience.Private
+ public static interface AvlNodeVisitor<TNode extends AvlNode> {
+ /**
+ * @param node the node that we are currently visiting
+ * @return false to stop the iteration. true to continue.
+ */
+ boolean visitNode(TNode node);
+ }
+
+ /**
+ * Helper class that allows to create and manipulate an AVL Tree
+ */
+ @InterfaceAudience.Private
+ public static class AvlTree {
+ /**
+ * @param root the current root of the tree
+ * @param key the key for the node we are trying to find
+ * @param keyComparator the comparator to use to match node and key
+ * @return the node that matches the specified key or null in case of node not found.
+ */
+ public static <TNode extends AvlNode> TNode get(TNode root, final Object key,
+ final AvlKeyComparator<TNode> keyComparator) {
+ while (root != null) {
+ int cmp = keyComparator.compareKey(root, key);
+ if (cmp > 0) {
+ root = (TNode)root.avlLeft;
+ } else if (cmp < 0) {
+ root = (TNode)root.avlRight;
+ } else {
+ return (TNode)root;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * @param root the current root of the tree
+ * @return the first (min) node of the tree
+ */
+ public static <TNode extends AvlNode> TNode getFirst(TNode root) {
+ if (root != null) {
+ while (root.avlLeft != null) {
+ root = (TNode)root.avlLeft;
+ }
+ }
+ return root;
+ }
+
+ /**
+ * @param root the current root of the tree
+ * @return the last (max) node of the tree
+ */
+ public static <TNode extends AvlNode> TNode getLast(TNode root) {
+ if (root != null) {
+ while (root.avlRight != null) {
+ root = (TNode)root.avlRight;
+ }
+ }
+ return root;
+ }
+
+ /**
+ * Insert a node into the tree. It uses the AvlNode.compareTo() for ordering.
+ * NOTE: The node must not be already in the tree.
+ * @param root the current root of the tree
+ * @param node the node to insert
+ * @return the new root of the tree
+ */
+ public static <TNode extends AvlNode> TNode insert(TNode root, TNode node) {
+ if (root == null) return node;
+
+ int cmp = node.compareTo(root);
+ assert cmp != 0 : "node already inserted: " + root;
+ if (cmp < 0) {
+ root.avlLeft = insert(root.avlLeft, node);
+ } else {
+ root.avlRight = insert(root.avlRight, node);
+ }
+ return balance(root);
+ }
+
+ /**
+ * Insert a node into the tree.
+ * This is useful when you want to create a new node or replace the content
+ * depending if the node already exists or not.
+ * Using AvlInsertOrReplace class you can return the node to add/replace.
+ *
+ * @param root the current root of the tree
+ * @param key the key for the node we are trying to insert
+ * @param keyComparator the comparator to use to match node and key
+ * @param insertOrReplace the class to use to insert or replace the node
+ * @return the new root of the tree
+ */
+ public static <TNode extends AvlNode> TNode insert(TNode root, Object key,
+ final AvlKeyComparator<TNode> keyComparator,
+ final AvlInsertOrReplace<TNode> insertOrReplace) {
+ if (root == null) {
+ return insertOrReplace.insert(key);
+ }
+
+ int cmp = keyComparator.compareKey(root, key);
+ if (cmp < 0) {
+ root.avlLeft = insert((TNode)root.avlLeft, key, keyComparator, insertOrReplace);
+ } else if (cmp > 0) {
+ root.avlRight = insert((TNode)root.avlRight, key, keyComparator, insertOrReplace);
+ } else {
+ TNode left = (TNode)root.avlLeft;
+ TNode right = (TNode)root.avlRight;
+ root = insertOrReplace.replace(key, root);
+ root.avlLeft = left;
+ root.avlRight = right;
+ return root;
+ }
+ return balance(root);
+ }
+
+ private static <TNode extends AvlNode> TNode removeMin(TNode p) {
+ if (p.avlLeft == null)
+ return (TNode)p.avlRight;
+ p.avlLeft = removeMin(p.avlLeft);
+ return balance(p);
+ }
+
+ /**
+ * Removes the node matching the specified key from the tree
+ * @param root the current root of the tree
+ * @param key the key for the node we are trying to find
+ * @param keyComparator the comparator to use to match node and key
+ * @return the new root of the tree
+ */
+ public static <TNode extends AvlNode> TNode remove(TNode root, Object key,
+ final AvlKeyComparator<TNode> keyComparator) {
+ return remove(root, key, keyComparator, null);
+ }
+
+ /**
+ * Removes the node matching the specified key from the tree
+ * @param root the current root of the tree
+ * @param key the key for the node we are trying to find
+ * @param keyComparator the comparator to use to match node and key
+ * @param removed will be set to true if the node was found and removed, otherwise false
+ * @return the new root of the tree
+ */
+ public static <TNode extends AvlNode> TNode remove(TNode root, Object key,
+ final AvlKeyComparator<TNode> keyComparator, final AtomicBoolean removed) {
+ if (root == null) return null;
+
+ int cmp = keyComparator.compareKey(root, key);
+ if (cmp == 0) {
+ if (removed != null) removed.set(true);
+
+ TNode q = (TNode)root.avlLeft;
+ TNode r = (TNode)root.avlRight;
+ if (r == null) return q;
+ TNode min = getFirst(r);
+ min.avlRight = removeMin(r);
+ min.avlLeft = q;
+ return balance(min);
+ } else if (cmp > 0) {
+ root.avlLeft = remove((TNode)root.avlLeft, key, keyComparator);
+ } else /* if (cmp < 0) */ {
+ root.avlRight = remove((TNode)root.avlRight, key, keyComparator);
+ }
+ return balance(root);
+ }
+
+ /**
+ * Visit each node of the tree
+ * @param root the current root of the tree
+ * @param visitor the AvlNodeVisitor instance
+ */
+ public static <TNode extends AvlNode> void visit(final TNode root,
+ final AvlNodeVisitor<TNode> visitor) {
+ if (root == null) return;
+
+ final AvlTreeIterator<TNode> iterator = new AvlTreeIterator<TNode>(root);
+ boolean visitNext = true;
+ while (visitNext && iterator.hasNext()) {
+ visitNext = visitor.visitNode(iterator.next());
+ }
+ }
+
+ private static <TNode extends AvlNode> TNode balance(TNode p) {
+ fixHeight(p);
+ int balance = balanceFactor(p);
+ if (balance == 2) {
+ if (balanceFactor(p.avlRight) < 0) {
+ p.avlRight = rotateRight(p.avlRight);
+ }
+ return rotateLeft(p);
+ } else if (balance == -2) {
+ if (balanceFactor(p.avlLeft) > 0) {
+ p.avlLeft = rotateLeft(p.avlLeft);
+ }
+ return rotateRight(p);
+ }
+ return p;
+ }
+
+ private static <TNode extends AvlNode> TNode rotateRight(TNode p) {
+ TNode q = (TNode)p.avlLeft;
+ p.avlLeft = q.avlRight;
+ q.avlRight = p;
+ fixHeight(p);
+ fixHeight(q);
+ return q;
+ }
+
+ private static <TNode extends AvlNode> TNode rotateLeft(TNode q) {
+ TNode p = (TNode)q.avlRight;
+ q.avlRight = p.avlLeft;
+ p.avlLeft = q;
+ fixHeight(q);
+ fixHeight(p);
+ return p;
+ }
+
+ private static <TNode extends AvlNode> void fixHeight(TNode node) {
+ final int heightLeft = height(node.avlLeft);
+ final int heightRight = height(node.avlRight);
+ node.avlHeight = 1 + Math.max(heightLeft, heightRight);
+ }
+
+ private static <TNode extends AvlNode> int height(TNode node) {
+ return node != null ? node.avlHeight : 0;
+ }
+
+ private static <TNode extends AvlNode> int balanceFactor(TNode node) {
+ return height(node.avlRight) - height(node.avlLeft);
+ }
+ }
+
+ /**
+ * Iterator for the AvlTree
+ */
+ @InterfaceAudience.Private
+ public static class AvlTreeIterator<TNode extends AvlNode> implements Iterator<TNode> {
+ private final Object[] stack = new Object[64];
+
+ private TNode current = null;
+ private int height = 0;
+
+ public AvlTreeIterator() {
+ }
+
+ /**
+ * Create the iterator starting from the first (min) node of the tree
+ */
+ public AvlTreeIterator(final TNode root) {
+ seekFirst(root);
+ }
+
+ /**
+ * Create the iterator starting from the specified key
+ * @param root the current root of the tree
+ * @param key the key for the node we are trying to find
+ * @param keyComparator the comparator to use to match node and key
+ */
+ public AvlTreeIterator(final TNode root, final Object key,
+ final AvlKeyComparator<TNode> keyComparator) {
+ seekTo(root, key, keyComparator);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ @Override
+ public TNode next() {
+ final TNode node = this.current;
+ seekNext();
+ return node;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Reset the iterator, and seeks to the first (min) node of the tree
+ * @param root the current root of the tree
+ */
+ public void seekFirst(final TNode root) {
+ current = root;
+ height = 0;
+ if (root != null) {
+ while (current.avlLeft != null) {
+ stack[height++] = current;
+ current = (TNode)current.avlLeft;
+ }
+ }
+ }
+
+ /**
+ * Reset the iterator, and seeks to the specified key
+ * @param root the current root of the tree
+ * @param key the key for the node we are trying to find
+ * @param keyComparator the comparator to use to match node and key
+ */
+ public void seekTo(final TNode root, final Object key,
+ final AvlKeyComparator<TNode> keyComparator) {
+ current = null;
+ height = 0;
+
+ TNode node = root;
+ while (node != null) {
+ if (keyComparator.compareKey(node, key) >= 0) {
+ if (node.avlLeft != null) {
+ stack[height++] = node;
+ node = (TNode)node.avlLeft;
+ } else {
+ current = node;
+ return;
+ }
+ } else {
+ if (node.avlRight != null) {
+ stack[height++] = node;
+ node = (TNode)node.avlRight;
+ } else {
+ if (height > 0) {
+ TNode parent = (TNode)stack[--height];
+ while (node == parent.avlRight) {
+ if (height == 0) {
+ current = null;
+ return;
+ }
+ node = parent;
+ parent = (TNode)stack[--height];
+ }
+ current = parent;
+ return;
+ }
+ current = null;
+ return;
+ }
+ }
+ }
+ }
+
+ private void seekNext() {
+ if (current == null) return;
+ if (current.avlRight != null) {
+ stack[height++] = current;
+ current = (TNode)current.avlRight;
+ while (current.avlLeft != null) {
+ stack[height++] = current;
+ current = (TNode)current.avlLeft;
+ }
+ } else {
+ TNode node;
+ do {
+ if (height == 0) {
+ current = null;
+ return;
+ }
+ node = current;
+ current = (TNode)stack[--height];
+ } while (current.avlRight == node);
+ }
+ }
+ }
+
+ /**
+ * Helper class that allows to create and manipulate a linked list of AvlLinkedNodes
+ */
+ @InterfaceAudience.Private
+ public static class AvlIterableList {
+ /**
+ * @param node the current node
+ * @return the successor of the current node
+ */
+ public static <TNode extends AvlLinkedNode> TNode readNext(TNode node) {
+ return (TNode)node.iterNext;
+ }
+
+ /**
+ * @param node the current node
+ * @return the predecessor of the current node
+ */
+ public static <TNode extends AvlLinkedNode> TNode readPrev(TNode node) {
+ return (TNode)node.iterPrev;
+ }
+
+ /**
+ * @param head the head of the linked list
+ * @param node the node to add to the front of the list
+ * @return the new head of the list
+ */
+ public static <TNode extends AvlLinkedNode> TNode prepend(TNode head, TNode node) {
+ assert !isLinked(node) : node + " is already linked";
+ if (head != null) {
+ TNode tail = (TNode)head.iterPrev;
+ tail.iterNext = node;
+ head.iterPrev = node;
+ node.iterNext = head;
+ node.iterPrev = tail;
+ } else {
+ node.iterNext = node;
+ node.iterPrev = node;
+ }
+ return node;
+ }
+
+ /**
+ * @param head the head of the linked list
+ * @param node the node to add to the tail of the list
+ * @return the new head of the list
+ */
+ public static <TNode extends AvlLinkedNode> TNode append(TNode head, TNode node) {
+ assert !isLinked(node) : node + " is already linked";
+ if (head != null) {
+ TNode tail = (TNode)head.iterPrev;
+ tail.iterNext = node;
+ node.iterNext = head;
+ node.iterPrev = tail;
+ head.iterPrev = node;
+ return head;
+ }
+ node.iterNext = node;
+ node.iterPrev = node;
+ return node;
+ }
+
+ /**
+ * @param head the head of the current linked list
+ * @param otherHead the head of the list to append to the current list
+ * @return the new head of the current list
+ */
+ public static <TNode extends AvlLinkedNode> TNode appendList(TNode head, TNode otherHead) {
+ if (head == null) return otherHead;
+ if (otherHead == null) return head;
+
+ TNode tail = (TNode)head.iterPrev;
+ TNode otherTail = (TNode)otherHead.iterPrev;
+ tail.iterNext = otherHead;
+ otherHead.iterPrev = tail;
+ otherTail.iterNext = head;
+ head.iterPrev = otherTail;
+ return head;
+ }
+
+ /**
+ * @param head the head of the linked list
+ * @param node the node to remove from the list
+ * @return the new head of the list
+ */
+ public static <TNode extends AvlLinkedNode> TNode remove(TNode head, TNode node) {
+ assert isLinked(node) : node + " is not linked";
+ if (node != node.iterNext) {
+ node.iterPrev.iterNext = node.iterNext;
+ node.iterNext.iterPrev = node.iterPrev;
+ head = (head == node) ? (TNode)node.iterNext : head;
+ } else {
+ head = null;
+ }
+ node.iterNext = null;
+ node.iterPrev = null;
+ return head;
+ }
+
+ /**
+ * @param node the node to check
+ * @return true if the node is linked to a list, false otherwise
+ */
+ public static <TNode extends AvlLinkedNode> boolean isLinked(TNode node) {
+ return node.iterPrev != null && node.iterNext != null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2acd788d/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestAvlUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestAvlUtil.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestAvlUtil.java
new file mode 100644
index 0000000..3c7b680
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestAvlUtil.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.util.Random;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlNode;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlNodeVisitor;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestAvlUtil {
+ private static final TestAvlKeyComparator KEY_COMPARATOR = new TestAvlKeyComparator();
+
+ @Test
+ public void testAvlTreeCrud() {
+ final int MAX_KEY = 99999999;
+ final int NELEM = 10000;
+
+ final TreeMap<Integer, Object> treeMap = new TreeMap<Integer, Object>();
+ TestAvlNode root = null;
+
+ final Random rand = new Random();
+ for (int i = 0; i < NELEM; ++i) {
+ int key = rand.nextInt(MAX_KEY);
+ if (AvlTree.get(root, key, KEY_COMPARATOR) != null) {
+ i--;
+ continue;
+ }
+ root = AvlTree.insert(root, new TestAvlNode(key));
+ treeMap.put(key, null);
+ for (Integer keyX: treeMap.keySet()) {
+ TestAvlNode node = AvlTree.get(root, keyX, KEY_COMPARATOR);
+ assertNotNull(node);
+ assertEquals(keyX.intValue(), node.getKey());
+ }
+ }
+
+ for (int i = 0; i < NELEM; ++i) {
+ int key = rand.nextInt(MAX_KEY);
+ TestAvlNode node = AvlTree.get(root, key, KEY_COMPARATOR);
+ if (!treeMap.containsKey(key)) {
+ assert node == null;
+ continue;
+ }
+ treeMap.remove(key);
+ assertEquals(key, node.getKey());
+ root = AvlTree.remove(root, key, KEY_COMPARATOR);
+ for (Integer keyX: treeMap.keySet()) {
+ node = AvlTree.get(root, keyX, KEY_COMPARATOR);
+ assertNotNull(node);
+ assertEquals(keyX.intValue(), node.getKey());
+ }
+ }
+ }
+
+ @Test
+ public void testAvlTreeVisitor() {
+ final int MIN_KEY = 0;
+ final int MAX_KEY = 50;
+
+ TestAvlNode root = null;
+ for (int i = MAX_KEY; i >= MIN_KEY; --i) {
+ root = AvlTree.insert(root, new TestAvlNode(i));
+ }
+
+ AvlTree.visit(root, new AvlNodeVisitor<TestAvlNode>() {
+ private int prevKey = -1;
+ public boolean visitNode(TestAvlNode node) {
+ assertEquals(prevKey, node.getKey() - 1);
+ assertTrue(node.getKey() >= MIN_KEY);
+ assertTrue(node.getKey() <= MAX_KEY);
+ prevKey = node.getKey();
+ return node.getKey() <= MAX_KEY;
+ }
+ });
+ }
+
+ @Test
+ public void testAvlTreeIterSeekFirst() {
+ final int MIN_KEY = 1;
+ final int MAX_KEY = 50;
+
+ TestAvlNode root = null;
+ for (int i = MIN_KEY; i < MAX_KEY; ++i) {
+ root = AvlTree.insert(root, new TestAvlNode(i));
+ }
+
+ AvlTreeIterator<TestAvlNode> iter = new AvlTreeIterator<TestAvlNode>(root);
+ assertTrue(iter.hasNext());
+ long prevKey = 0;
+ while (iter.hasNext()) {
+ TestAvlNode node = iter.next();
+ assertEquals(prevKey + 1, node.getKey());
+ prevKey = node.getKey();
+ }
+ assertEquals(MAX_KEY - 1, prevKey);
+ }
+
+ @Test
+ public void testAvlTreeIterSeekTo() {
+ final int MIN_KEY = 1;
+ final int MAX_KEY = 50;
+
+ TestAvlNode root = null;
+ for (int i = MIN_KEY; i < MAX_KEY; i += 2) {
+ root = AvlTree.insert(root, new TestAvlNode(i));
+ }
+
+ for (int i = MIN_KEY - 1; i < MAX_KEY + 1; ++i) {
+ AvlTreeIterator<TestAvlNode> iter = new AvlTreeIterator<TestAvlNode>(root, i, KEY_COMPARATOR);
+ if (i < MAX_KEY) {
+ assertTrue(iter.hasNext());
+ } else {
+ // searching for something greater than the last node
+ assertFalse(iter.hasNext());
+ break;
+ }
+
+ TestAvlNode node = iter.next();
+ assertEquals((i % 2 == 0) ? i + 1 : i, node.getKey());
+
+ long prevKey = node.getKey();
+ while (iter.hasNext()) {
+ node = iter.next();
+ assertTrue(node.getKey() > prevKey);
+ prevKey = node.getKey();
+ }
+ }
+ }
+
+ @Test
+ public void testAvlIterableListCrud() {
+ final int NITEMS = 10;
+ TestLinkedAvlNode prependHead = null;
+ TestLinkedAvlNode appendHead = null;
+ // prepend()/append()
+ for (int i = 0; i <= NITEMS; ++i) {
+ TestLinkedAvlNode pNode = new TestLinkedAvlNode(i);
+ assertFalse(AvlIterableList.isLinked(pNode));
+ prependHead = AvlIterableList.prepend(prependHead, pNode);
+ assertTrue(AvlIterableList.isLinked(pNode));
+
+ TestLinkedAvlNode aNode = new TestLinkedAvlNode(i);
+ assertFalse(AvlIterableList.isLinked(aNode));
+ appendHead = AvlIterableList.append(appendHead, aNode);
+ assertTrue(AvlIterableList.isLinked(aNode));
+ }
+ // readNext()
+ TestLinkedAvlNode pNode = prependHead;
+ TestLinkedAvlNode aNode = appendHead;
+ for (int i = 0; i <= NITEMS; ++i) {
+ assertEquals(NITEMS - i, pNode.getKey());
+ pNode = AvlIterableList.readNext(pNode);
+
+ assertEquals(i, aNode.getKey());
+ aNode = AvlIterableList.readNext(aNode);
+ }
+ // readPrev()
+ pNode = AvlIterableList.readPrev(prependHead);
+ aNode = AvlIterableList.readPrev(appendHead);
+ for (int i = 0; i <= NITEMS; ++i) {
+ assertEquals(i, pNode.getKey());
+ pNode = AvlIterableList.readPrev(pNode);
+
+ assertEquals(NITEMS - i, aNode.getKey());
+ aNode = AvlIterableList.readPrev(aNode);
+ }
+ // appendList()
+ TestLinkedAvlNode node = AvlIterableList.appendList(prependHead, appendHead);
+ for (int i = NITEMS; i >= 0; --i) {
+ assertEquals(i, node.getKey());
+ node = AvlIterableList.readNext(node);
+ }
+ for (int i = 0; i <= NITEMS; ++i) {
+ assertEquals(i, node.getKey());
+ node = AvlIterableList.readNext(node);
+ }
+ }
+
+ private static class TestAvlNode extends AvlNode<TestAvlNode> {
+ private final int key;
+
+ public TestAvlNode(int key) {
+ this.key = key;
+ }
+
+ public int getKey() {
+ return key;
+ }
+
+ @Override
+ public int compareTo(TestAvlNode other) {
+ return this.key - other.key;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("TestAvlNode(%d)", key);
+ }
+ }
+
+ private static class TestLinkedAvlNode extends AvlLinkedNode<TestLinkedAvlNode> {
+ private final int key;
+
+ public TestLinkedAvlNode(int key) {
+ this.key = key;
+ }
+
+ public int getKey() {
+ return key;
+ }
+
+ @Override
+ public int compareTo(TestLinkedAvlNode other) {
+ return this.key - other.key;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("TestLinkedAvlNode(%d)", key);
+ }
+ }
+
+ private static class TestAvlKeyComparator implements AvlKeyComparator<TestAvlNode> {
+ public int compareKey(TestAvlNode node, Object key) {
+ return node.getKey() - (int)key;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/2acd788d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
index d368a82..84ecf22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java
@@ -40,6 +40,10 @@ import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
+import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
/**
* ProcedureRunnableSet for the Master Procedures.
@@ -65,13 +69,20 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private final ReentrantLock schedLock = new ReentrantLock();
private final Condition schedWaitCond = schedLock.newCondition();
+ private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR =
+ new NamespaceQueueKeyComparator();
+ private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR =
+ new ServerQueueKeyComparator();
+ private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR =
+ new TableQueueKeyComparator();
+
private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
private int queueSize = 0;
- private final Object[] serverBuckets = new Object[128];
- private Queue<String> namespaceMap = null;
- private Queue<TableName> tableMap = null;
+ private final ServerQueue[] serverBuckets = new ServerQueue[128];
+ private NamespaceQueue namespaceMap = null;
+ private TableQueue tableMap = null;
private final int metaTablePriority;
private final int userTablePriority;
@@ -142,7 +153,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// the queue is not suspended or removed from the fairq (run-queue)
// because someone has an xlock on it.
// so, if the queue is not-linked we should add it
- if (queue.size() == 1 && !IterableList.isLinked(queue)) {
+ if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) {
fairq.add(queue);
}
queueSize++;
@@ -152,7 +163,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// our (proc) parent has the xlock,
// so the queue is not in the fairq (run-queue)
// add it back to let the child run (inherit the lock)
- if (!IterableList.isLinked(queue)) {
+ if (!AvlIterableList.isLinked(queue)) {
fairq.add(queue);
}
queueSize++;
@@ -230,12 +241,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
try {
// Remove Servers
for (int i = 0; i < serverBuckets.length; ++i) {
- clear((ServerQueue)serverBuckets[i], serverRunQueue);
+ clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[i] = null;
}
// Remove Tables
- clear(tableMap, tableRunQueue);
+ clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
@@ -244,11 +255,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
- private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) {
+ private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
+ final FairQueue<T> fairq, final AvlKeyComparator<TNode> comparator) {
while (treeMap != null) {
Queue<T> node = AvlTree.getFirst(treeMap);
assert !node.isSuspended() : "can't clear suspended " + node.getKey();
- treeMap = AvlTree.remove(treeMap, node.getKey());
+ treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
removeFromRunQueue(fairq, node);
}
}
@@ -302,7 +314,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
- if (IterableList.isLinked(queue)) return;
+ if (AvlIterableList.isLinked(queue)) return;
if (!queue.isEmpty()) {
fairq.add(queue);
queueSize += queue.size();
@@ -310,7 +322,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
- if (!IterableList.isLinked(queue)) return;
+ if (!AvlIterableList.isLinked(queue)) return;
fairq.remove(queue);
queueSize -= queue.size();
}
@@ -507,11 +519,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private void suspendTableQueue(Queue<TableName> queue) {
- waitingTables = IterableList.append(waitingTables, queue);
+ waitingTables = AvlIterableList.append(waitingTables, queue);
}
private void suspendServerQueue(Queue<ServerName> queue) {
- waitingServers = IterableList.append(waitingServers, queue);
+ waitingServers = AvlIterableList.append(waitingServers, queue);
}
private boolean hasWaitingTables() {
@@ -520,7 +532,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private Queue<TableName> popWaitingTable() {
Queue<TableName> node = waitingTables;
- waitingTables = IterableList.remove(waitingTables, node);
+ waitingTables = AvlIterableList.remove(waitingTables, node);
node.setSuspended(false);
return node;
}
@@ -531,7 +543,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private Queue<ServerName> popWaitingServer() {
Queue<ServerName> node = waitingServers;
- waitingServers = IterableList.remove(waitingServers, node);
+ waitingServers = AvlIterableList.remove(waitingServers, node);
node.setSuspended(false);
return node;
}
@@ -555,17 +567,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private TableQueue getTableQueue(TableName tableName) {
- Queue<TableName> node = AvlTree.get(tableMap, tableName);
- if (node != null) return (TableQueue)node;
+ TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
+ if (node != null) return node;
NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString());
node = new TableQueue(tableName, nsQueue, getTablePriority(tableName));
tableMap = AvlTree.insert(tableMap, node);
- return (TableQueue)node;
+ return node;
}
private void removeTableQueue(TableName tableName) {
- tableMap = AvlTree.remove(tableMap, tableName);
+ tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
}
private int getTablePriority(TableName tableName) {
@@ -589,12 +601,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// Namespace Queue Lookup Helpers
// ============================================================================
private NamespaceQueue getNamespaceQueue(String namespace) {
- Queue<String> node = AvlTree.get(namespaceMap, namespace);
+ NamespaceQueue node = AvlTree.get(namespaceMap, namespace, NAMESPACE_QUEUE_KEY_COMPARATOR);
if (node != null) return (NamespaceQueue)node;
node = new NamespaceQueue(namespace);
namespaceMap = AvlTree.insert(namespaceMap, node);
- return (NamespaceQueue)node;
+ return node;
}
// ============================================================================
@@ -610,24 +622,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private ServerQueue getServerQueue(ServerName serverName) {
- int index = getBucketIndex(serverBuckets, serverName.hashCode());
- Queue<ServerName> root = getTreeRoot(serverBuckets, index);
- Queue<ServerName> node = AvlTree.get(root, serverName);
- if (node != null) return (ServerQueue)node;
+ final int index = getBucketIndex(serverBuckets, serverName.hashCode());
+ ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
+ if (node != null) return node;
node = new ServerQueue(serverName);
- serverBuckets[index] = AvlTree.insert(root, node);
+ serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
return (ServerQueue)node;
}
private void removeServerQueue(ServerName serverName) {
- int index = getBucketIndex(serverBuckets, serverName.hashCode());
- serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName);
- }
-
- @SuppressWarnings("unchecked")
- private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) {
- return (Queue<T>) buckets[index];
+ final int index = getBucketIndex(serverBuckets, serverName.hashCode());
+ final ServerQueue root = serverBuckets[index];
+ serverBuckets[index] = AvlTree.remove(root, serverName, SERVER_QUEUE_KEY_COMPARATOR);
}
private static int getBucketIndex(Object[] buckets, int hashCode) {
@@ -645,6 +652,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// ============================================================================
// Table and Server Queue Implementation
// ============================================================================
+ private static class ServerQueueKeyComparator implements AvlKeyComparator<ServerQueue> {
+ @Override
+ public int compareKey(ServerQueue node, Object key) {
+ return node.compareKey((ServerName)key);
+ }
+ }
+
public static class ServerQueue extends QueueImpl<ServerName> {
public ServerQueue(ServerName serverName) {
super(serverName);
@@ -699,6 +713,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
+ private static class TableQueueKeyComparator implements AvlKeyComparator<TableQueue> {
+ @Override
+ public int compareKey(TableQueue node, Object key) {
+ return node.compareKey((TableName)key);
+ }
+ }
+
public static class TableQueue extends QueueImpl<TableName> {
private final NamespaceQueue namespaceQueue;
@@ -852,6 +873,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
+ private static class NamespaceQueueKeyComparator implements AvlKeyComparator<NamespaceQueue> {
+ @Override
+ public int compareKey(NamespaceQueue node, Object key) {
+ return node.compareKey((String)key);
+ }
+ }
+
/**
* the namespace is currently used just as a rwlock, not as a queue.
* because ns operation are not frequent enough. so we want to avoid
@@ -1024,7 +1052,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (queue.isEmpty() && queue.tryExclusiveLock(0)) {
// remove the table from the run-queue and the map
- if (IterableList.isLinked(queue)) {
+ if (AvlIterableList.isLinked(queue)) {
tableRunQueue.remove(queue);
}
@@ -1268,13 +1296,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
boolean isSuspended();
}
- private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface {
- private Queue<TKey> avlRight = null;
- private Queue<TKey> avlLeft = null;
- private int avlHeight = 1;
-
- private Queue<TKey> iterNext = null;
- private Queue<TKey> iterPrev = null;
+ private static abstract class Queue<TKey extends Comparable<TKey>>
+ extends AvlLinkedNode<Queue<TKey>> implements QueueInterface {
private boolean suspended = false;
private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
@@ -1366,6 +1389,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return key.compareTo(cmpKey);
}
+ @Override
public int compareTo(Queue<TKey> other) {
return compareKey(other.key);
}
@@ -1441,13 +1465,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
public void add(Queue<T> queue) {
- queueHead = IterableList.append(queueHead, queue);
+ queueHead = AvlIterableList.append(queueHead, queue);
if (currentQueue == null) setNextQueue(queueHead);
}
public void remove(Queue<T> queue) {
- Queue<T> nextQueue = queue.iterNext;
- queueHead = IterableList.remove(queueHead, queue);
+ Queue<T> nextQueue = AvlIterableList.readNext(queue);
+ queueHead = AvlIterableList.remove(queueHead, queue);
if (currentQueue == queue) {
setNextQueue(queueHead != null ? nextQueue : null);
}
@@ -1478,7 +1502,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private boolean nextQueue() {
if (currentQueue == null) return false;
- currentQueue = currentQueue.iterNext;
+ currentQueue = AvlIterableList.readNext(currentQueue);
return currentQueue != null;
}
@@ -1495,187 +1519,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return Math.max(1, queue.getPriority() * quantum); // TODO
}
}
-
- private static class AvlTree {
- public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) {
- while (root != null) {
- int cmp = root.compareKey(key);
- if (cmp > 0) {
- root = root.avlLeft;
- } else if (cmp < 0) {
- root = root.avlRight;
- } else {
- return root;
- }
- }
- return null;
- }
-
- public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) {
- if (root != null) {
- while (root.avlLeft != null) {
- root = root.avlLeft;
- }
- }
- return root;
- }
-
- public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) {
- if (root != null) {
- while (root.avlRight != null) {
- root = root.avlRight;
- }
- }
- return root;
- }
-
- public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) {
- if (root == null) return node;
- if (node.compareTo(root) < 0) {
- root.avlLeft = insert(root.avlLeft, node);
- } else {
- root.avlRight = insert(root.avlRight, node);
- }
- return balance(root);
- }
-
- private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) {
- if (p.avlLeft == null)
- return p.avlRight;
- p.avlLeft = removeMin(p.avlLeft);
- return balance(p);
- }
-
- public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) {
- if (root == null) return null;
-
- int cmp = root.compareKey(key);
- if (cmp == 0) {
- Queue<T> q = root.avlLeft;
- Queue<T> r = root.avlRight;
- if (r == null) return q;
- Queue<T> min = getFirst(r);
- min.avlRight = removeMin(r);
- min.avlLeft = q;
- return balance(min);
- } else if (cmp > 0) {
- root.avlLeft = remove(root.avlLeft, key);
- } else /* if (cmp < 0) */ {
- root.avlRight = remove(root.avlRight, key);
- }
- return balance(root);
- }
-
- private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) {
- fixHeight(p);
- int balance = balanceFactor(p);
- if (balance == 2) {
- if (balanceFactor(p.avlRight) < 0) {
- p.avlRight = rotateRight(p.avlRight);
- }
- return rotateLeft(p);
- } else if (balance == -2) {
- if (balanceFactor(p.avlLeft) > 0) {
- p.avlLeft = rotateLeft(p.avlLeft);
- }
- return rotateRight(p);
- }
- return p;
- }
-
- private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) {
- Queue<T> q = p.avlLeft;
- p.avlLeft = q.avlRight;
- q.avlRight = p;
- fixHeight(p);
- fixHeight(q);
- return q;
- }
-
- private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) {
- Queue<T> p = q.avlRight;
- q.avlRight = p.avlLeft;
- p.avlLeft = q;
- fixHeight(q);
- fixHeight(p);
- return p;
- }
-
- private static <T extends Comparable<T>> void fixHeight(Queue<T> node) {
- int heightLeft = height(node.avlLeft);
- int heightRight = height(node.avlRight);
- node.avlHeight = 1 + Math.max(heightLeft, heightRight);
- }
-
- private static <T extends Comparable<T>> int height(Queue<T> node) {
- return node != null ? node.avlHeight : 0;
- }
-
- private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) {
- return height(node.avlRight) - height(node.avlLeft);
- }
- }
-
- private static class IterableList {
- public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) {
- assert !isLinked(node) : node + " is already linked";
- if (head != null) {
- Queue<T> tail = head.iterPrev;
- tail.iterNext = node;
- head.iterPrev = node;
- node.iterNext = head;
- node.iterPrev = tail;
- } else {
- node.iterNext = node;
- node.iterPrev = node;
- }
- return node;
- }
-
- public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) {
- assert !isLinked(node) : node + " is already linked";
- if (head != null) {
- Queue<T> tail = head.iterPrev;
- tail.iterNext = node;
- node.iterNext = head;
- node.iterPrev = tail;
- head.iterPrev = node;
- return head;
- }
- node.iterNext = node;
- node.iterPrev = node;
- return node;
- }
-
- public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) {
- if (head == null) return otherHead;
- if (otherHead == null) return head;
-
- Queue<T> tail = head.iterPrev;
- Queue<T> otherTail = otherHead.iterPrev;
- tail.iterNext = otherHead;
- otherHead.iterPrev = tail;
- otherTail.iterNext = head;
- head.iterPrev = otherTail;
- return head;
- }
-
- private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) {
- assert isLinked(node) : node + " is not linked";
- if (node != node.iterNext) {
- node.iterPrev.iterNext = node.iterNext;
- node.iterNext.iterPrev = node.iterPrev;
- head = (head == node) ? node.iterNext : head;
- } else {
- head = null;
- }
- node.iterNext = null;
- node.iterPrev = null;
- return head;
- }
-
- private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) {
- return node.iterPrev != null && node.iterNext != null;
- }
- }
}
\ No newline at end of file
[2/2] hbase git commit: HBASE-16519 Procedure v2 - Avoid sync wait on
DDLs operation
Posted by mb...@apache.org.
HBASE-16519 Procedure v2 - Avoid sync wait on DDLs operation
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4a4f8e70
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4a4f8e70
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4a4f8e70
Branch: refs/heads/master
Commit: 4a4f8e704903f02dac38d584ab85a472a9f3d2ce
Parents: 2acd788
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Tue Aug 30 09:45:56 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue Aug 30 09:45:56 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Admin.java | 57 ++++++++++++++++----
.../apache/hadoop/hbase/client/HBaseAdmin.java | 38 ++++++++++---
.../org/apache/hadoop/hbase/master/HMaster.java | 31 +++++++----
.../procedure/AddColumnFamilyProcedure.java | 19 +++++--
.../procedure/DeleteColumnFamilyProcedure.java | 19 +++++--
.../procedure/ModifyColumnFamilyProcedure.java | 19 +++++--
.../master/procedure/ModifyTableProcedure.java | 20 +++++--
.../master/procedure/ProcedurePrepareLatch.java | 22 ++++++--
.../master/procedure/ProcedureSyncWait.java | 2 +-
.../procedure/TruncateTableProcedure.java | 15 ++++++
10 files changed, 198 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index 0610517..321ea55 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -506,6 +506,16 @@ public interface Admin extends Abortable, Closeable {
throws IOException;
/**
+ * Add a column family to an existing table.
+ *
+ * @param tableName name of the table to add column family to
+ * @param columnFamily column family descriptor of column family to be added
+ * @throws IOException if a remote or network exception occurs
+ */
+ void addColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily)
+ throws IOException;
+
+ /**
* Add a column family to an existing table. Asynchronous operation.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
@@ -518,7 +528,7 @@ public interface Admin extends Abortable, Closeable {
* @return the result of the async add column family. You can use Future.get(long, TimeUnit) to
* wait on the operation to complete.
*/
- Future<Void> addColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily)
+ Future<Void> addColumnFamilyAsync(final TableName tableName, final HColumnDescriptor columnFamily)
throws IOException;
/**
@@ -537,6 +547,15 @@ public interface Admin extends Abortable, Closeable {
/**
* Delete a column family from a table. Asynchronous operation.
+ *
+ * @param tableName name of table
+ * @param columnFamily name of column family to be deleted
+ * @throws IOException if a remote or network exception occurs
+ */
+ void deleteColumnFamily(final TableName tableName, final byte[] columnFamily) throws IOException;
+
+ /**
+ * Delete a column family from a table. Asynchronous operation.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
* It may throw ExecutionException if there was an error while executing the operation
* or TimeoutException in case the wait timeout was not long enough to allow the
@@ -548,15 +567,11 @@ public interface Admin extends Abortable, Closeable {
* @return the result of the async delete column family. You can use Future.get(long, TimeUnit) to
* wait on the operation to complete.
*/
- Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
+ Future<Void> deleteColumnFamilyAsync(final TableName tableName, final byte[] columnFamily)
throws IOException;
/**
- * Modify an existing column family on a table. Asynchronous operation.
- * You can use Future.get(long, TimeUnit) to wait on the operation to complete.
- * It may throw ExecutionException if there was an error while executing the operation
- * or TimeoutException in case the wait timeout was not long enough to allow the
- * operation to complete.
+ * Modify an existing column family on a table.
*
* @param tableName name of table
* @param columnFamily new column family descriptor to use
@@ -571,7 +586,21 @@ public interface Admin extends Abortable, Closeable {
throws IOException;
/**
+ * Modify an existing column family on a table.
+ *
+ * @param tableName name of table
+ * @param columnFamily new column family descriptor to use
+ * @throws IOException if a remote or network exception occurs
+ */
+ void modifyColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily)
+ throws IOException;
+
+ /**
* Modify an existing column family on a table. Asynchronous operation.
+ * You can use Future.get(long, TimeUnit) to wait on the operation to complete.
+ * It may throw ExecutionException if there was an error while executing the operation
+ * or TimeoutException in case the wait timeout was not long enough to allow the
+ * operation to complete.
*
* @param tableName name of table
* @param columnFamily new column family descriptor to use
@@ -579,7 +608,7 @@ public interface Admin extends Abortable, Closeable {
* @return the result of the async modify column family. You can use Future.get(long, TimeUnit) to
* wait on the operation to complete.
*/
- Future<Void> modifyColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily)
+ Future<Void> modifyColumnFamilyAsync(TableName tableName, HColumnDescriptor columnFamily)
throws IOException;
@@ -927,6 +956,16 @@ public interface Admin extends Abortable, Closeable {
throws IOException;
/**
+ * Modify an existing table, more IRB friendly version.
+ *
+ * @param tableName name of table.
+ * @param htd modified description of the table
+ * @throws IOException if a remote or network exception occurs
+ */
+ void modifyTable(final TableName tableName, final HTableDescriptor htd)
+ throws IOException;
+
+ /**
* Modify an existing table, more IRB friendly version. Asynchronous operation. This means that
* it may be a while before your schema change is updated across all of the table.
* You can use Future.get(long, TimeUnit) to wait on the operation to complete.
@@ -940,7 +979,7 @@ public interface Admin extends Abortable, Closeable {
* @return the result of the async modify. You can use Future.get(long, TimeUnit) to wait on the
* operation to complete
*/
- Future<Void> modifyTable(final TableName tableName, final HTableDescriptor htd)
+ Future<Void> modifyTableAsync(final TableName tableName, final HTableDescriptor htd)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 48a614f..6035895 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -863,7 +863,13 @@ public class HBaseAdmin implements Admin {
}
@Override
- public Future<Void> addColumnFamily(final TableName tableName,
+ public void addColumnFamily(final TableName tableName, final HColumnDescriptor columnFamily)
+ throws IOException {
+ get(addColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future<Void> addColumnFamilyAsync(final TableName tableName,
final HColumnDescriptor columnFamily) throws IOException {
AddColumnResponse response =
executeCallable(new MasterCallable<AddColumnResponse>(getConnection(),
@@ -906,7 +912,13 @@ public class HBaseAdmin implements Admin {
}
@Override
- public Future<Void> deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
+ public void deleteColumnFamily(final TableName tableName, final byte[] columnFamily)
+ throws IOException {
+ get(deleteColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future<Void> deleteColumnFamilyAsync(final TableName tableName, final byte[] columnFamily)
throws IOException {
DeleteColumnResponse response =
executeCallable(new MasterCallable<DeleteColumnResponse>(getConnection(),
@@ -917,8 +929,7 @@ public class HBaseAdmin implements Admin {
DeleteColumnRequest req =
RequestConverter.buildDeleteColumnRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
- master.deleteColumn(getRpcController(), req);
- return null;
+ return master.deleteColumn(getRpcController(), req);
}
});
return new DeleteColumnFamilyFuture(this, tableName, response);
@@ -950,7 +961,13 @@ public class HBaseAdmin implements Admin {
}
@Override
- public Future<Void> modifyColumnFamily(final TableName tableName,
+ public void modifyColumnFamily(final TableName tableName,
+ final HColumnDescriptor columnFamily) throws IOException {
+ get(modifyColumnFamilyAsync(tableName, columnFamily), syncWaitTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future<Void> modifyColumnFamilyAsync(final TableName tableName,
final HColumnDescriptor columnFamily) throws IOException {
ModifyColumnResponse response =
executeCallable(new MasterCallable<ModifyColumnResponse>(getConnection(),
@@ -961,8 +978,7 @@ public class HBaseAdmin implements Admin {
ModifyColumnRequest req =
RequestConverter.buildModifyColumnRequest(tableName, columnFamily,
ng.getNonceGroup(), ng.newNonce());
- master.modifyColumn(getRpcController(), req);
- return null;
+ return master.modifyColumn(getRpcController(), req);
}
});
return new ModifyColumnFamilyFuture(this, tableName, response);
@@ -1642,7 +1658,13 @@ public class HBaseAdmin implements Admin {
}
@Override
- public Future<Void> modifyTable(final TableName tableName, final HTableDescriptor htd)
+ public void modifyTable(final TableName tableName, final HTableDescriptor htd)
+ throws IOException {
+ get(modifyTableAsync(tableName, htd), syncWaitTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Future<Void> modifyTableAsync(final TableName tableName, final HTableDescriptor htd)
throws IOException {
if (!tableName.equals(htd.getTableName())) {
throw new IllegalArgumentException("the specified table name '" + tableName +
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index f4c2c1c..c643fa8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1847,11 +1847,13 @@ public class HMaster extends HRegionServer implements MasterServices {
}
LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
+ ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
long procId = this.procedureExecutor.submitProcedure(
- new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
+ new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName,
+ preserveSplits, latch),
nonceGroup,
nonce);
- ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
+ latch.await();
if (cpHost != null) {
cpHost.postTruncateTable(tableName);
@@ -1876,11 +1878,14 @@ public class HMaster extends HRegionServer implements MasterServices {
}
}
// Execute the operation synchronously - wait for the operation to complete before continuing.
+ ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
long procId = this.procedureExecutor.submitProcedure(
- new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnDescriptor),
+ new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName,
+ columnDescriptor, latch),
nonceGroup,
nonce);
- ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
+ latch.await();
+
if (cpHost != null) {
cpHost.postAddColumn(tableName, columnDescriptor);
}
@@ -1906,11 +1911,13 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
// Execute the operation synchronously - wait for the operation to complete before continuing.
+ ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
long procId = this.procedureExecutor.submitProcedure(
- new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
+ new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName,
+ descriptor, latch),
nonceGroup,
nonce);
- ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
+ latch.await();
if (cpHost != null) {
cpHost.postModifyColumn(tableName, descriptor);
@@ -1934,11 +1941,13 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
// Execute the operation synchronously - wait for the operation to complete before continuing.
+ ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
long procId = this.procedureExecutor.submitProcedure(
- new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
+ new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName,
+ columnName, latch),
nonceGroup,
nonce);
- ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
+ latch.await();
if (cpHost != null) {
cpHost.postDeleteColumn(tableName, columnName);
@@ -2060,12 +2069,12 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
// Execute the operation synchronously - wait for the operation completes before continuing.
+ ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
long procId = this.procedureExecutor.submitProcedure(
- new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
+ new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor, latch),
nonceGroup,
nonce);
-
- ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
+ latch.await();
if (cpHost != null) {
cpHost.postModifyTable(tableName, descriptor);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
index 195f738..c9478cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java
@@ -59,16 +59,23 @@ public class AddColumnFamilyProcedure
private List<HRegionInfo> regionInfoList;
private Boolean traceEnabled;
+ // used for compatibility with old clients, until 2.0 the client had a sync behavior
+ private final ProcedurePrepareLatch syncLatch;
+
public AddColumnFamilyProcedure() {
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
+ this.syncLatch = null;
}
- public AddColumnFamilyProcedure(
- final MasterProcedureEnv env,
- final TableName tableName,
+ public AddColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
final HColumnDescriptor cfDescriptor) throws IOException {
+ this(env, tableName, cfDescriptor, null);
+ }
+
+ public AddColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
+ final HColumnDescriptor cfDescriptor, final ProcedurePrepareLatch latch) throws IOException {
this.tableName = tableName;
this.cfDescriptor = cfDescriptor;
this.user = env.getRequestUser();
@@ -76,6 +83,7 @@ public class AddColumnFamilyProcedure
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
+ this.syncLatch = latch;
}
@Override
@@ -152,6 +160,11 @@ public class AddColumnFamilyProcedure
}
@Override
+ protected void completionCleanup(final MasterProcedureEnv env) {
+ ProcedurePrepareLatch.releaseLatch(syncLatch, this);
+ }
+
+ @Override
protected AddColumnFamilyState getState(final int stateId) {
return AddColumnFamilyState.valueOf(stateId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
index 8bcbd82..da24cef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java
@@ -62,16 +62,23 @@ public class DeleteColumnFamilyProcedure
private List<HRegionInfo> regionInfoList;
private Boolean traceEnabled;
+ // used for compatibility with old clients, until 2.0 the client had a sync behavior
+ private final ProcedurePrepareLatch syncLatch;
+
public DeleteColumnFamilyProcedure() {
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
+ this.syncLatch = null;
}
- public DeleteColumnFamilyProcedure(
- final MasterProcedureEnv env,
- final TableName tableName,
+ public DeleteColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
final byte[] familyName) throws IOException {
+ this(env, tableName, familyName, null);
+ }
+
+ public DeleteColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
+ final byte[] familyName, final ProcedurePrepareLatch latch) throws IOException {
this.tableName = tableName;
this.familyName = familyName;
this.user = env.getRequestUser();
@@ -79,6 +86,7 @@ public class DeleteColumnFamilyProcedure
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
+ this.syncLatch = latch;
}
@Override
@@ -170,6 +178,11 @@ public class DeleteColumnFamilyProcedure
}
@Override
+ protected void completionCleanup(final MasterProcedureEnv env) {
+ ProcedurePrepareLatch.releaseLatch(syncLatch, this);
+ }
+
+ @Override
protected DeleteColumnFamilyState getState(final int stateId) {
return DeleteColumnFamilyState.valueOf(stateId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
index 6a408da..1769306 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java
@@ -59,21 +59,29 @@ public class ModifyColumnFamilyProcedure
private Boolean traceEnabled;
+ // used for compatibility with old clients, until 2.0 the client had a sync behavior
+ private final ProcedurePrepareLatch syncLatch;
+
public ModifyColumnFamilyProcedure() {
this.unmodifiedHTableDescriptor = null;
this.traceEnabled = null;
+ this.syncLatch = null;
}
- public ModifyColumnFamilyProcedure(
- final MasterProcedureEnv env,
- final TableName tableName,
+ public ModifyColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
final HColumnDescriptor cfDescriptor) throws IOException {
+ this(env, tableName, cfDescriptor, null);
+ }
+
+ public ModifyColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
+ final HColumnDescriptor cfDescriptor, final ProcedurePrepareLatch latch) throws IOException {
this.tableName = tableName;
this.cfDescriptor = cfDescriptor;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
this.unmodifiedHTableDescriptor = null;
this.traceEnabled = null;
+ this.syncLatch = latch;
}
@Override
@@ -150,6 +158,11 @@ public class ModifyColumnFamilyProcedure
}
@Override
+ protected void completionCleanup(final MasterProcedureEnv env) {
+ ProcedurePrepareLatch.releaseLatch(syncLatch, this);
+ }
+
+ @Override
protected ModifyColumnFamilyState getState(final int stateId) {
return ModifyColumnFamilyState.valueOf(stateId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index c523f23..8299bcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -68,17 +68,26 @@ public class ModifyTableProcedure
private List<HRegionInfo> regionInfoList;
private Boolean traceEnabled = null;
+ // used for compatibility with old clients, until 2.0 the client had a sync behavior
+ private final ProcedurePrepareLatch syncLatch;
+
public ModifyTableProcedure() {
initilize();
+ this.syncLatch = null;
}
- public ModifyTableProcedure(
- final MasterProcedureEnv env,
- final HTableDescriptor htd) throws IOException {
+ public ModifyTableProcedure(final MasterProcedureEnv env, final HTableDescriptor htd)
+ throws IOException {
+ this(env, htd, null);
+ }
+
+ public ModifyTableProcedure(final MasterProcedureEnv env, final HTableDescriptor htd,
+ final ProcedurePrepareLatch latch) throws IOException {
initilize();
this.modifiedHTableDescriptor = htd;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
+ this.syncLatch = latch;
}
private void initilize() {
@@ -185,6 +194,11 @@ public class ModifyTableProcedure
}
@Override
+ protected void completionCleanup(final MasterProcedureEnv env) {
+ ProcedurePrepareLatch.releaseLatch(syncLatch, this);
+ }
+
+ @Override
protected ModifyTableState getState(final int stateId) {
return ModifyTableState.valueOf(stateId);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
index b13e44d..eaeb9ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedurePrepareLatch.java
@@ -36,13 +36,29 @@ import org.apache.hadoop.hbase.procedure2.Procedure;
public abstract class ProcedurePrepareLatch {
private static final NoopLatch noopLatch = new NoopLatch();
+ /**
+ * Create a latch if the client does not have async proc support.
+ * This uses the default 1.1 version.
+ * @return a CompatibilityLatch or a NoopLatch if the client has async proc support
+ */
public static ProcedurePrepareLatch createLatch() {
+ // don't use the latch if we have procedure support (default 1.1)
+ return createLatch(1, 1);
+ }
+
+ /**
+ * Create a latch if the client does not have async proc support
+ * @param major major version with async proc support
+ * @param minor minor version with async proc support
+ * @return a CompatibilityLatch or a NoopLatch if the client has async proc support
+ */
+ public static ProcedurePrepareLatch createLatch(int major, int minor) {
// don't use the latch if we have procedure support
- return hasProcedureSupport() ? noopLatch : new CompatibilityLatch();
+ return hasProcedureSupport(major, minor) ? noopLatch : new CompatibilityLatch();
}
- public static boolean hasProcedureSupport() {
- return VersionInfoUtil.currentClientHasMinimumVersion(1, 1);
+ private static boolean hasProcedureSupport(int major, int minor) {
+ return VersionInfoUtil.currentClientHasMinimumVersion(major, minor);
}
protected abstract void countDown(final Procedure proc);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
index 8a5eb35..cf8fdd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java
@@ -71,7 +71,7 @@ public final class ProcedureSyncWait {
return waitForProcedureToComplete(procExec, procId);
}
- public static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
+ private static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec,
final long procId) throws IOException {
while (!procExec.isFinished(procId) && procExec.isRunning()) {
// TODO: add a config to make it tunable
http://git-wip-us.apache.org/repos/asf/hbase/blob/4a4f8e70/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
index 0b60cea..8e3ef3e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java
@@ -56,16 +56,26 @@ public class TruncateTableProcedure
private HTableDescriptor hTableDescriptor;
private TableName tableName;
+ // used for compatibility with old clients, until 2.0 the client had a sync behavior
+ private final ProcedurePrepareLatch syncLatch;
+
public TruncateTableProcedure() {
// Required by the Procedure framework to create the procedure on replay
+ syncLatch = null;
}
public TruncateTableProcedure(final MasterProcedureEnv env, final TableName tableName,
boolean preserveSplits) throws IOException {
+ this(env, tableName, preserveSplits, null);
+ }
+
+ public TruncateTableProcedure(final MasterProcedureEnv env, final TableName tableName,
+ boolean preserveSplits, ProcedurePrepareLatch latch) throws IOException {
this.tableName = tableName;
this.preserveSplits = preserveSplits;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
+ this.syncLatch = latch;
}
@Override
@@ -150,6 +160,11 @@ public class TruncateTableProcedure
}
@Override
+ protected void completionCleanup(final MasterProcedureEnv env) {
+ ProcedurePrepareLatch.releaseLatch(syncLatch, this);
+ }
+
+ @Override
protected TruncateTableState getState(final int stateId) {
return TruncateTableState.valueOf(stateId);
}