You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2013/02/22 01:15:53 UTC
svn commit: r1448867 [3/3] - in /hbase/trunk:
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/
hbase-server/src/main/java/org/apache/hadoop/hbase/
hbase-server/src/main/java/org/apache/hadoop/hb...
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java?rev=1448867&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestTableLockManager.java Fri Feb 22 00:15:52 2013
@@ -0,0 +1,294 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.LockTimeoutException;
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests the default table lock manager
+ */
+@Category(MediumTests.class)
+public class TestTableLockManager {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestTableLockManager.class);
+
+ private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks");
+
+ private static final byte[] FAMILY = Bytes.toBytes("f1");
+
+ private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
+
+ private final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private static final CountDownLatch deleteColumn = new CountDownLatch(1);
+ private static final CountDownLatch addColumn = new CountDownLatch(1);
+
+ public void prepareMiniCluster() throws Exception {
+ TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
+ TEST_UTIL.startMiniCluster(2);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ }
+
+ public void prepareMiniZkCluster() throws Exception {
+ TEST_UTIL.startMiniZKCluster(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test(timeout = 600000)
+ public void testLockTimeoutException() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 3000);
+ prepareMiniCluster();
+ HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class,
+ 0, TEST_UTIL.getConfiguration());
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<Object> shouldFinish = executor.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ admin.deleteColumn(TABLE_NAME, FAMILY);
+ return null;
+ }
+ });
+
+ deleteColumn.await();
+
+ try {
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
+ fail("Was expecting TableLockTimeoutException");
+ } catch (LockTimeoutException ex) {
+ //expected
+ }
+ shouldFinish.get();
+ }
+
+ public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
+ @Override
+ public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ byte[] tableName, byte[] c) throws IOException {
+ deleteColumn.countDown();
+ }
+ @Override
+ public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ byte[] tableName, byte[] c) throws IOException {
+ Threads.sleep(10000);
+ }
+
+ @Override
+ public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ byte[] tableName, HColumnDescriptor column) throws IOException {
+ fail("Add column should have timeouted out for acquiring the table lock");
+ }
+ }
+
+ @Test(timeout = 600000)
+ public void testAlterAndDisable() throws Exception {
+ prepareMiniCluster();
+ // Send a request to alter a table, then sleep during
+ // the alteration phase. In the mean time, from another
+ // thread, send a request to disable, and then delete a table.
+
+ HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ master.getCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
+ 0, TEST_UTIL.getConfiguration());
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ Future<Object> alterTableFuture = executor.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
+ LOG.info("Added new column family");
+ HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
+ assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
+ return null;
+ }
+ });
+ Future<Object> disableTableFuture = executor.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ admin.disableTable(TABLE_NAME);
+ assertTrue(admin.isTableDisabled(TABLE_NAME));
+ admin.deleteTable(TABLE_NAME);
+ assertFalse(admin.tableExists(TABLE_NAME));
+ return null;
+ }
+ });
+
+ try {
+ disableTableFuture.get();
+ alterTableFuture.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof AssertionError) {
+ throw (AssertionError) e.getCause();
+ }
+ throw e;
+ }
+ }
+
+ public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
+ @Override
+ public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ byte[] tableName, HColumnDescriptor column) throws IOException {
+ LOG.debug("addColumn called");
+ addColumn.countDown();
+ }
+
+ @Override
+ public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ byte[] tableName, HColumnDescriptor column) throws IOException {
+ Threads.sleep(6000);
+ try {
+ ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
+ } catch(TableNotDisabledException expected) {
+ //pass
+ return;
+ } catch(IOException ex) {
+ }
+ fail("was expecting the table to be enabled");
+ }
+
+ @Override
+ public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ byte[] tableName) throws IOException {
+ try {
+ LOG.debug("Waiting for addColumn to be processed first");
+ //wait for addColumn to be processed first
+ addColumn.await();
+ LOG.debug("addColumn started, we can continue");
+ } catch (InterruptedException ex) {
+ LOG.warn("Sleep interrupted while waiting for addColumn countdown");
+ }
+ }
+
+ @Override
+ public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
+ byte[] tableName) throws IOException {
+ Threads.sleep(3000);
+ }
+ }
+
+ @Test(timeout = 600000)
+ public void testDelete() throws Exception {
+ prepareMiniCluster();
+
+ HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+ admin.disableTable(TABLE_NAME);
+ admin.deleteTable(TABLE_NAME);
+
+ //ensure that znode for the table node has been deleted
+ ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
+
+ assertTrue(ZKUtil.checkExists(zkWatcher,
+ ZKUtil.joinZNode(zkWatcher.tableLockZNode, Bytes.toString(TABLE_NAME))) < 0);
+
+ }
+
+
+ @Test(timeout = 600000)
+ public void testReapAllTableLocks() throws Exception {
+ prepareMiniZkCluster();
+ ServerName serverName = new ServerName("localhost:10000", 0);
+ final TableLockManager lockManager = TableLockManager.createTableLockManager(
+ TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
+
+ String tables[] = {"table1", "table2", "table3", "table4"};
+ ExecutorService executor = Executors.newFixedThreadPool(6);
+
+ final CountDownLatch writeLocksObtained = new CountDownLatch(4);
+ final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
+ //TODO: read lock tables
+
+ //6 threads will be stuck waiting for the table lock
+ for (int i = 0; i < tables.length; i++) {
+ final String table = tables[i];
+ for (int j = 0; j < i+1; j++) { //i+1 write locks attempted for table[i]
+ executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ writeLocksAttempted.countDown();
+ lockManager.writeLock(Bytes.toBytes(table), "testReapAllTableLocks").acquire();
+ writeLocksObtained.countDown();
+ return null;
+ }
+ });
+ }
+ }
+
+ writeLocksObtained.await();
+ writeLocksAttempted.await();
+
+ //now reap all table locks
+ lockManager.reapAllTableWriteLocks();
+
+ TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
+ TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
+ TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
+
+ //should not throw table lock timeout exception
+ zeroTimeoutLockManager.writeLock(Bytes.toBytes(tables[tables.length -1]), "zero timeout")
+ .acquire();
+
+ executor.shutdownNow();
+ }
+
+}
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java?rev=1448867&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/lock/TestZKInterProcessReadWriteLock.java Fri Feb 22 00:15:52 2013
@@ -0,0 +1,359 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper.lock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.InterProcessLock;
+import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(MediumTests.class)
+public class TestZKInterProcessReadWriteLock {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestZKInterProcessReadWriteLock.class);
+
+ private static final HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private static final int NUM_THREADS = 10;
+
+ private static Configuration conf;
+
+ private final AtomicBoolean isLockHeld = new AtomicBoolean(false);
+ private final ExecutorService executor =
+ Executors.newFixedThreadPool(NUM_THREADS,
+ new DaemonThreadFactory("TestZKInterProcessReadWriteLock-"));
+
+ @BeforeClass
+ public static void beforeAllTests() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.startMiniZKCluster();
+ conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
+ ZooKeeperWatcher zkw = getZooKeeperWatcher("setup");
+ ZKUtil.createWithParents(zkw, zkw.tableLockZNode);
+ }
+
+ @AfterClass
+ public static void afterAllTests() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @After
+ public void tearDown() {
+ executor.shutdown();
+ }
+
+ private static ZooKeeperWatcher getZooKeeperWatcher(String desc)
+ throws IOException {
+ return TEST_UTIL.getZooKeeperWatcher();
+ }
+
+
+ @Test(timeout = 30000)
+ public void testWriteLockExcludesWriters() throws Exception {
+ final String testName = "testWriteLockExcludesWriters";
+ final ZKInterProcessReadWriteLock readWriteLock =
+ getReadWriteLock(testName);
+ List<Future<Void>> results = Lists.newArrayList();
+ for (int i = 0; i < NUM_THREADS; ++i) {
+ final String threadDesc = testName + i;
+ results.add(executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ ZKInterProcessWriteLock writeLock =
+ readWriteLock.writeLock(Bytes.toBytes(threadDesc));
+ try {
+ writeLock.acquire();
+ try {
+ // No one else should hold the lock
+ assertTrue(isLockHeld.compareAndSet(false, true));
+ Thread.sleep(1000);
+ // No one else should have released the lock
+ assertTrue(isLockHeld.compareAndSet(true, false));
+ } finally {
+ isLockHeld.set(false);
+ writeLock.release();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn(threadDesc + " interrupted", e);
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ return null;
+ }
+ }));
+
+ }
+ MultithreadedTestUtil.assertOnFutures(results);
+ }
+
+ @Test(timeout = 30000)
+ public void testReadLockDoesNotExcludeReaders() throws Exception {
+ final String testName = "testReadLockDoesNotExcludeReaders";
+ final ZKInterProcessReadWriteLock readWriteLock =
+ getReadWriteLock(testName);
+ final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS);
+ final AtomicInteger locksHeld = new AtomicInteger(0);
+ List<Future<Void>> results = Lists.newArrayList();
+ for (int i = 0; i < NUM_THREADS; ++i) {
+ final String threadDesc = testName + i;
+ results.add(executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ ZKInterProcessReadLock readLock =
+ readWriteLock.readLock(Bytes.toBytes(threadDesc));
+ readLock.acquire();
+ try {
+ locksHeld.incrementAndGet();
+ locksAcquiredLatch.countDown();
+ Thread.sleep(1000);
+ } finally {
+ readLock.release();
+ locksHeld.decrementAndGet();
+ }
+ return null;
+ }
+ }));
+ }
+ locksAcquiredLatch.await();
+ assertEquals(locksHeld.get(), NUM_THREADS);
+ MultithreadedTestUtil.assertOnFutures(results);
+ }
+
+ @Test(timeout = 3000)
+ public void testReadLockExcludesWriters() throws Exception {
+ // Submit a read lock request first
+ // Submit a write lock request second
+ final String testName = "testReadLockExcludesWriters";
+ List<Future<Void>> results = Lists.newArrayList();
+ final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1);
+ Callable<Void> acquireReadLock = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final String threadDesc = testName + "-acquireReadLock";
+ ZKInterProcessReadLock readLock =
+ getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
+ readLock.acquire();
+ try {
+ assertTrue(isLockHeld.compareAndSet(false, true));
+ readLockAcquiredLatch.countDown();
+ Thread.sleep(1000);
+ } finally {
+ isLockHeld.set(false);
+ readLock.release();
+ }
+ return null;
+ }
+ };
+ Callable<Void> acquireWriteLock = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final String threadDesc = testName + "-acquireWriteLock";
+ ZKInterProcessWriteLock writeLock =
+ getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+ readLockAcquiredLatch.await();
+ assertTrue(isLockHeld.get());
+ writeLock.acquire();
+ try {
+ assertFalse(isLockHeld.get());
+ } finally {
+ writeLock.release();
+ }
+ return null;
+ }
+ };
+ results.add(executor.submit(acquireReadLock));
+ results.add(executor.submit(acquireWriteLock));
+ MultithreadedTestUtil.assertOnFutures(results);
+ }
+
+ private static ZKInterProcessReadWriteLock getReadWriteLock(String testName)
+ throws IOException {
+ MetadataHandler handler = new MetadataHandler() {
+ @Override
+ public void handleMetadata(byte[] ownerMetadata) {
+ LOG.info("Lock info: " + Bytes.toString(ownerMetadata));
+ }
+ };
+ ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName);
+ String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, testName);
+
+ return new ZKInterProcessReadWriteLock(zkWatcher, znode, handler);
+ }
+
+ @Test(timeout = 30000)
+ public void testWriteLockExcludesReaders() throws Exception {
+ // Submit a read lock request first
+ // Submit a write lock request second
+ final String testName = "testReadLockExcludesWriters";
+ List<Future<Void>> results = Lists.newArrayList();
+ final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
+ Callable<Void> acquireWriteLock = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final String threadDesc = testName + "-acquireWriteLock";
+ ZKInterProcessWriteLock writeLock =
+ getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+ writeLock.acquire();
+ try {
+ writeLockAcquiredLatch.countDown();
+ assertTrue(isLockHeld.compareAndSet(false, true));
+ Thread.sleep(1000);
+ } finally {
+ isLockHeld.set(false);
+ writeLock.release();
+ }
+ return null;
+ }
+ };
+ Callable<Void> acquireReadLock = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final String threadDesc = testName + "-acquireReadLock";
+ ZKInterProcessReadLock readLock =
+ getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
+ writeLockAcquiredLatch.await();
+ readLock.acquire();
+ try {
+ assertFalse(isLockHeld.get());
+ } finally {
+ readLock.release();
+ }
+ return null;
+ }
+ };
+ results.add(executor.submit(acquireWriteLock));
+ results.add(executor.submit(acquireReadLock));
+ MultithreadedTestUtil.assertOnFutures(results);
+ }
+
+ @Test(timeout = 60000)
+ public void testTimeout() throws Exception {
+ final String testName = "testTimeout";
+ final CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
+ Callable<Void> shouldHog = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final String threadDesc = testName + "-shouldHog";
+ ZKInterProcessWriteLock lock =
+ getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+ lock.acquire();
+ lockAcquiredLatch.countDown();
+ Thread.sleep(10000);
+ lock.release();
+ return null;
+ }
+ };
+ Callable<Void> shouldTimeout = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final String threadDesc = testName + "-shouldTimeout";
+ ZKInterProcessWriteLock lock =
+ getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+ lockAcquiredLatch.await();
+ assertFalse(lock.tryAcquire(5000));
+ return null;
+ }
+ };
+ Callable<Void> shouldAcquireLock = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ final String threadDesc = testName + "-shouldAcquireLock";
+ ZKInterProcessWriteLock lock =
+ getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
+ lockAcquiredLatch.await();
+ assertTrue(lock.tryAcquire(30000));
+ lock.release();
+ return null;
+ }
+ };
+ List<Future<Void>> results = Lists.newArrayList();
+ results.add(executor.submit(shouldHog));
+ results.add(executor.submit(shouldTimeout));
+ results.add(executor.submit(shouldAcquireLock));
+ MultithreadedTestUtil.assertOnFutures(results);
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleClients() throws Exception {
+ //tests lock usage from multiple zookeeper clients with different sessions.
+ //acquire one read lock, then one write lock
+ final String testName = "testMultipleClients";
+
+ //different zookeeper sessions with separate identifiers
+ ZooKeeperWatcher zkWatcher1 = new ZooKeeperWatcher(conf, "testMultipleClients-1", null);
+ ZooKeeperWatcher zkWatcher2 = new ZooKeeperWatcher(conf, "testMultipleClients-2", null);
+
+ String znode = ZKUtil.joinZNode(zkWatcher1.tableLockZNode, testName);
+
+ ZKInterProcessReadWriteLock clientLock1
+ = new ZKInterProcessReadWriteLock(zkWatcher1, znode, null);
+ ZKInterProcessReadWriteLock clientLock2
+ = new ZKInterProcessReadWriteLock(zkWatcher2, znode, null);
+
+ InterProcessLock lock1 = clientLock1.readLock(Bytes.toBytes("client1"));
+ lock1.acquire();
+
+ //try to acquire, but it will timeout. We are testing whether this will cause any problems
+ //due to the read lock being from another client
+ InterProcessLock lock2 = clientLock2.writeLock(Bytes.toBytes("client2"));
+ assertFalse(lock2.tryAcquire(1000));
+
+ lock1.release();
+
+ //this time it will acquire
+ assertTrue(lock2.tryAcquire(5000));
+ lock2.release();
+ zkWatcher1.close();
+ zkWatcher2.close();
+ }
+}