You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/01/14 18:26:03 UTC

[7/9] hbase git commit: HBASE-14837 Procedure v2 - Procedure Queue Improvement

http://git-wip-us.apache.org/repos/asf/hbase/blob/3c2229a9/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
new file mode 100644
index 0000000..2b92e52
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.master.TableLockManager;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestMasterProcedureScheduler {
+  private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
+
+  private MasterProcedureScheduler queue;
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = HBaseConfiguration.create();
+    queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    assertEquals(0, queue.size());
+  }
+
+  @Test
+  public void testConcurrentCreateDelete() throws Exception {
+    final MasterProcedureScheduler procQueue = queue;
+    final TableName table = TableName.valueOf("testtb");
+    final AtomicBoolean running = new AtomicBoolean(true);
+    final AtomicBoolean failure = new AtomicBoolean(false);
+    Thread createThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          while (running.get() && !failure.get()) {
+            if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
+              procQueue.releaseTableExclusiveLock(table);
+            }
+          }
+        } catch (Throwable e) {
+          LOG.error("create failed", e);
+          failure.set(true);
+        }
+      }
+    };
+
+    Thread deleteThread = new Thread() {
+      @Override
+      public void run() {
+        try {
+          while (running.get() && !failure.get()) {
+            if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
+              procQueue.releaseTableExclusiveLock(table);
+            }
+            procQueue.markTableAsDeleted(table);
+          }
+        } catch (Throwable e) {
+          LOG.error("delete failed", e);
+          failure.set(true);
+        }
+      }
+    };
+
+    createThread.start();
+    deleteThread.start();
+    for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
+      Thread.sleep(100);
+    }
+    running.set(false);
+    createThread.join();
+    deleteThread.join();
+    assertEquals(false, failure.get());
+  }
+
+  /**
+   * Verify simple create/insert/fetch/delete of the table queue.
+   */
+  @Test
+  public void testSimpleTableOpsQueues() throws Exception {
+    final int NUM_TABLES = 10;
+    final int NUM_ITEMS = 10;
+
+    int count = 0;
+    for (int i = 1; i <= NUM_TABLES; ++i) {
+      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+      // insert items
+      for (int j = 1; j <= NUM_ITEMS; ++j) {
+        queue.addBack(new TestTableProcedure(i * 1000 + j, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+        assertEquals(++count, queue.size());
+      }
+    }
+    assertEquals(NUM_TABLES * NUM_ITEMS, queue.size());
+
+    for (int j = 1; j <= NUM_ITEMS; ++j) {
+      for (int i = 1; i <= NUM_TABLES; ++i) {
+        Procedure proc = queue.poll();
+        assertTrue(proc != null);
+        TableName tableName = ((TestTableProcedure)proc).getTableName();
+        queue.tryAcquireTableExclusiveLock(tableName, "test");
+        queue.releaseTableExclusiveLock(tableName);
+        queue.completionCleanup(proc);
+        assertEquals(--count, queue.size());
+        assertEquals(i * 1000 + j, proc.getProcId());
+      }
+    }
+    assertEquals(0, queue.size());
+
+    for (int i = 1; i <= NUM_TABLES; ++i) {
+      TableName tableName = TableName.valueOf(String.format("test-%04d", i));
+      // complete the table deletion
+      assertTrue(queue.markTableAsDeleted(tableName));
+    }
+  }
+
+  /**
+   * Check that the table queue is not deletable until every procedure
+   * in-progress is completed (this is a special case for write-locks).
+   */
+  @Test
+  public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
+    TableName tableName = TableName.valueOf("testtb");
+
+    queue.addBack(new TestTableProcedure(1, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+
+    // table can't be deleted because one item is in the queue
+    assertFalse(queue.markTableAsDeleted(tableName));
+
+    // fetch item and take a lock
+    assertEquals(1, queue.poll().getProcId());
+    // take the xlock
+    assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write"));
+    // table can't be deleted because we have the lock
+    assertEquals(0, queue.size());
+    assertFalse(queue.markTableAsDeleted(tableName));
+    // release the xlock
+    queue.releaseTableExclusiveLock(tableName);
+    // complete the table deletion
+    assertTrue(queue.markTableAsDeleted(tableName));
+  }
+
+  /**
+   * Check that the table queue is not deletable until every procedure
+   * in-progress is completed (this is a special case for read-locks).
+   */
+  @Test
+  public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
+    final TableName tableName = TableName.valueOf("testtb");
+    final int nitems = 2;
+
+    for (int i = 1; i <= nitems; ++i) {
+      queue.addBack(new TestTableProcedure(i, tableName,
+            TableProcedureInterface.TableOperationType.READ));
+    }
+
+    // table can't be deleted because one item is in the queue
+    assertFalse(queue.markTableAsDeleted(tableName));
+
+    for (int i = 1; i <= nitems; ++i) {
+      // fetch item and take a lock
+      assertEquals(i, queue.poll().getProcId());
+      // take the rlock
+      assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i));
+      // table can't be deleted because we have locks and/or items in the queue
+      assertFalse(queue.markTableAsDeleted(tableName));
+    }
+
+    for (int i = 1; i <= nitems; ++i) {
+      // table can't be deleted because we have locks
+      assertFalse(queue.markTableAsDeleted(tableName));
+      // release the rlock
+      queue.releaseTableSharedLock(tableName);
+    }
+
+    // there are no items and no lock in the queeu
+    assertEquals(0, queue.size());
+    // complete the table deletion
+    assertTrue(queue.markTableAsDeleted(tableName));
+  }
+
+  /**
+   * Verify the correct logic of RWLocks on the queue
+   */
+  @Test
+  public void testVerifyRwLocks() throws Exception {
+    TableName tableName = TableName.valueOf("testtb");
+    queue.addBack(new TestTableProcedure(1, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+    queue.addBack(new TestTableProcedure(2, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+    queue.addBack(new TestTableProcedure(3, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+    queue.addBack(new TestTableProcedure(4, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+    queue.addBack(new TestTableProcedure(5, tableName,
+          TableProcedureInterface.TableOperationType.READ));
+
+    // Fetch the 1st item and take the write lock
+    long procId = queue.poll().getProcId();
+    assertEquals(1, procId);
+    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
+
+    // Fetch the 2nd item and verify that the lock can't be acquired
+    assertEquals(null, queue.poll(0));
+
+    // Release the write lock and acquire the read lock
+    queue.releaseTableExclusiveLock(tableName);
+
+    // Fetch the 2nd item and take the read lock
+    procId = queue.poll().getProcId();
+    assertEquals(2, procId);
+    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
+
+    // Fetch the 3rd item and verify that the lock can't be acquired
+    procId = queue.poll().getProcId();
+    assertEquals(3, procId);
+    assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
+
+    // release the rdlock of item 2 and take the wrlock for the 3d item
+    queue.releaseTableSharedLock(tableName);
+    assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId));
+
+    // Fetch 4th item and verify that the lock can't be acquired
+    assertEquals(null, queue.poll(0));
+
+    // Release the write lock and acquire the read lock
+    queue.releaseTableExclusiveLock(tableName);
+
+    // Fetch the 4th item and take the read lock
+    procId = queue.poll().getProcId();
+    assertEquals(4, procId);
+    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
+
+    // Fetch the 4th item and take the read lock
+    procId = queue.poll().getProcId();
+    assertEquals(5, procId);
+    assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId));
+
+    // Release 4th and 5th read-lock
+    queue.releaseTableSharedLock(tableName);
+    queue.releaseTableSharedLock(tableName);
+
+    // remove table queue
+    assertEquals(0, queue.size());
+    assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName));
+  }
+
+  /**
+   * Verify that "write" operations for a single table are serialized,
+   * but different tables can be executed in parallel.
+   */
+  @Test(timeout=90000)
+  public void testConcurrentWriteOps() throws Exception {
+    final TestTableProcSet procSet = new TestTableProcSet(queue);
+
+    final int NUM_ITEMS = 10;
+    final int NUM_TABLES = 4;
+    final AtomicInteger opsCount = new AtomicInteger(0);
+    for (int i = 0; i < NUM_TABLES; ++i) {
+      TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
+      for (int j = 1; j < NUM_ITEMS; ++j) {
+        procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
+          TableProcedureInterface.TableOperationType.EDIT));
+        opsCount.incrementAndGet();
+      }
+    }
+    assertEquals(opsCount.get(), queue.size());
+
+    final Thread[] threads = new Thread[NUM_TABLES * 2];
+    final HashSet<TableName> concurrentTables = new HashSet<TableName>();
+    final ArrayList<String> failures = new ArrayList<String>();
+    final AtomicInteger concurrentCount = new AtomicInteger(0);
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i] = new Thread() {
+        @Override
+        public void run() {
+          while (opsCount.get() > 0) {
+            try {
+              TableProcedureInterface proc = procSet.acquire();
+              if (proc == null) {
+                queue.signalAll();
+                if (opsCount.get() > 0) {
+                  continue;
+                }
+                break;
+              }
+              synchronized (concurrentTables) {
+                assertTrue("unexpected concurrency on " + proc.getTableName(),
+                  concurrentTables.add(proc.getTableName()));
+              }
+              assertTrue(opsCount.decrementAndGet() >= 0);
+              try {
+                long procId = ((Procedure)proc).getProcId();
+                TableName tableId = proc.getTableName();
+                int concurrent = concurrentCount.incrementAndGet();
+                assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
+                  concurrent >= 1 && concurrent <= NUM_TABLES);
+                LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+                Thread.sleep(2000);
+                concurrent = concurrentCount.decrementAndGet();
+                LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
+                assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
+              } finally {
+                synchronized (concurrentTables) {
+                  assertTrue(concurrentTables.remove(proc.getTableName()));
+                }
+                procSet.release(proc);
+              }
+            } catch (Throwable e) {
+              LOG.error("Failed " + e.getMessage(), e);
+              synchronized (failures) {
+                failures.add(e.getMessage());
+              }
+            } finally {
+              queue.signalAll();
+            }
+          }
+        }
+      };
+      threads[i].start();
+    }
+    for (int i = 0; i < threads.length; ++i) {
+      threads[i].join();
+    }
+    assertTrue(failures.toString(), failures.isEmpty());
+    assertEquals(0, opsCount.get());
+    assertEquals(0, queue.size());
+
+    for (int i = 1; i <= NUM_TABLES; ++i) {
+      TableName table = TableName.valueOf(String.format("testtb-%04d", i));
+      assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
+    }
+  }
+
+  public static class TestTableProcSet {
+    private final MasterProcedureScheduler queue;
+    private Map<Long, TableProcedureInterface> procsMap =
+      new ConcurrentHashMap<Long, TableProcedureInterface>();
+
+    public TestTableProcSet(final MasterProcedureScheduler queue) {
+      this.queue = queue;
+    }
+
+    public void addBack(TableProcedureInterface tableProc) {
+      Procedure proc = (Procedure)tableProc;
+      procsMap.put(proc.getProcId(), tableProc);
+      queue.addBack(proc);
+    }
+
+    public void addFront(TableProcedureInterface tableProc) {
+      Procedure proc = (Procedure)tableProc;
+      procsMap.put(proc.getProcId(), tableProc);
+      queue.addFront(proc);
+    }
+
+    public TableProcedureInterface acquire() {
+      TableProcedureInterface proc = null;
+      boolean avail = false;
+      while (!avail) {
+        Procedure xProc = queue.poll();
+        proc = xProc != null ? procsMap.remove(xProc.getProcId()) : null;
+        if (proc == null) break;
+        switch (proc.getTableOperationType()) {
+          case CREATE:
+          case DELETE:
+          case EDIT:
+            avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(),
+              "op="+ proc.getTableOperationType());
+            break;
+          case READ:
+            avail = queue.tryAcquireTableSharedLock(proc.getTableName(),
+              "op="+ proc.getTableOperationType());
+            break;
+        }
+        if (!avail) {
+          addFront(proc);
+          LOG.debug("yield procId=" + proc);
+        }
+      }
+      return proc;
+    }
+
+    public void release(TableProcedureInterface proc) {
+      switch (proc.getTableOperationType()) {
+        case CREATE:
+        case DELETE:
+        case EDIT:
+          queue.releaseTableExclusiveLock(proc.getTableName());
+          break;
+        case READ:
+          queue.releaseTableSharedLock(proc.getTableName());
+          break;
+      }
+    }
+  }
+
+  public static class TestTableProcedure extends Procedure<Void>
+      implements TableProcedureInterface {
+    private final TableOperationType opType;
+    private final TableName tableName;
+
+    public TestTableProcedure() {
+      throw new UnsupportedOperationException("recovery should not be triggered here");
+    }
+
+    public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) {
+      this.tableName = tableName;
+      this.opType = opType;
+      setProcId(procId);
+    }
+
+    @Override
+    public TableName getTableName() {
+      return tableName;
+    }
+
+    @Override
+    public TableOperationType getTableOperationType() {
+      return opType;
+    }
+
+    @Override
+    protected Procedure[] execute(Void env) {
+      return null;
+    }
+
+    @Override
+    protected void rollback(Void env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected boolean abort(Void env) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void serializeStateData(final OutputStream stream) throws IOException {}
+
+    @Override
+    protected void deserializeStateData(final InputStream stream) throws IOException {}
+  }
+}