You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/12/27 22:58:16 UTC
geode git commit: Removing unused Collaboration lock class and its
unit tests
Repository: geode
Updated Branches:
refs/heads/develop 462ebb032 -> a76aaf0c0
Removing unused Collaboration lock class and its unit tests
This class isn't used in Geode and isn't a public API.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a76aaf0c
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a76aaf0c
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a76aaf0c
Branch: refs/heads/develop
Commit: a76aaf0c02e2c4a32d7c001073f1ecc316470b60
Parents: 462ebb0
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Dec 27 14:57:21 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Dec 27 14:57:21 2016 -0800
----------------------------------------------------------------------
.../internal/locks/Collaboration.java | 454 --------------
.../internal/locks/CollaborationJUnitTest.java | 615 -------------------
2 files changed, 1069 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/a76aaf0c/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java
deleted file mode 100644
index ca9ef70..0000000
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.distributed.internal.locks;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * Synchronization structure which allows multiple threads to lock the structure. Implementation is
- * fair: the next waiting thread will be serviced.
- * <p>
- * Collaborating threads may jointly synchronize on this structure if they all agree on the same
- * topic of collaboration.
- * <p>
- * Threads that want to change the topic will wait until the current topic has been released.
- *
- */
-public class Collaboration {
-
- private static final Logger logger = LogService.getLogger();
-
- private final static Object NULL_TOPIC = null;
-
- /**
- * The current topic of collaboration
- *
- * guarded.By {@link #topicsQueue}
- */
- private volatile Topic currentTopic;
-
- /** Ordered queue of pending topics for collaboration */
- private final List topicsQueue = new ArrayList();
-
- /** Map of external topic to internal wrapper object (Topic) */
- private final Map topicsMap = new HashMap();
-
- private final CancelCriterion stopper;
-
- /**
- * Constructs new stoppable instance of Collaboration which will heed an interrupt request if it
- * is acceptable to the creator of the lock.
- */
- public Collaboration(CancelCriterion stopper) {
- this.stopper = stopper;
- }
-
- /**
- * Acquire permission to participate in the collaboration. Returns immediately if topic matches
- * the current topic. Otherwise, this will block until the Collaboration has been freed by the
- * threads working on the current topic. This call is interruptible.
- *
- * @param topicObject Object to collaborate on
- *
- * @throws InterruptedException if thread is interrupted
- */
- public void acquire(Object topicObject) throws InterruptedException {
- throw new UnsupportedOperationException(
- LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString());
- }
-
- /**
- * Must be synchronized on this.topicsQueue... Asserts that thread is not reentering.
- */
- private void assertNotRecursingTopic(Object topicObject) {
- Assert.assertTrue(false, Thread.currentThread() + " attempting to lock topic " + topicObject
- + " while locking topic " + this.currentTopic);
- }
-
- /**
- * Acquire permission to participate in the collaboration. Returns immediately if topic matches
- * the current topic. Otherwise, this will block until the Collaboration has been freed by the
- * threads working on the current topic. This call is uninterruptible.
- *
- * @param topic Object to collaborate on
- */
- public void acquireUninterruptibly(final Object topic) {
- Object topicObject = topic;
- if (topicObject == null) {
- topicObject = NULL_TOPIC;
- }
-
- Topic pendingTopic = null;
- synchronized (this.topicsQueue) {
- // if no topic then setup and return
- if (this.currentTopic == null) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Collaboration.acquireUninterruptibly: {}: no current topic, setting topic to {}",
- this.toString(), topicObject);
- }
- setCurrentTopic(new Topic(topicObject));
- this.currentTopic.addThread(Thread.currentThread());
- this.topicsMap.put(topicObject, this.currentTopic);
- return;
- }
-
- else if (isCurrentTopic(topicObject)) {
- // assertNotRecursingTopic(topicObject);
- if (logger.isDebugEnabled()) {
- logger.debug("Collaboration.acquireUninterruptibly: {}: already current topic: {}",
- this.toString(), topicObject);
- }
- this.currentTopic.addThread(Thread.currentThread());
- return;
- }
-
- else if (hasCurrentTopic(Thread.currentThread())) {
- assertNotRecursingTopic(topicObject);
- }
-
- // if other topic then add to pending topics and then wait
- else {
- pendingTopic = (Topic) this.topicsMap.get(topicObject);
- if (pendingTopic == null) {
- pendingTopic = new Topic(topicObject);
- this.topicsMap.put(topicObject, pendingTopic);
- this.topicsQueue.add(pendingTopic);
- }
- pendingTopic.addThread(Thread.currentThread());
- if (logger.isDebugEnabled()) {
- logger.debug(
- "Collaboration.acquireUninterruptibly: {}: adding pendingTopic {}; current topic is {}",
- this.toString(), pendingTopic, this.currentTopic);
- }
- }
- } // synchronized
- // now await the topic change uninterruptibly...
- boolean interrupted = Thread.interrupted();
- try {
- awaitTopic(pendingTopic, false);
- } catch (InterruptedException e) { // LOST INTERRUPT
- interrupted = true;
- this.stopper.checkCancelInProgress(e);
- } finally {
- if (interrupted)
- Thread.currentThread().interrupt();
- }
- }
-
- private void setCurrentTopic(Topic topic) {
- synchronized (this.topicsQueue) {
- if (this.currentTopic != null) {
- synchronized (this.currentTopic) {
- this.currentTopic.setCurrentTopic(false);
- this.currentTopic.setOldTopic(true);
- }
- }
- if (topic != null) {
- synchronized (topic) {
- topic.setCurrentTopic(true);
- this.currentTopic = topic;
- if (logger.isDebugEnabled()) {
- logger.debug("Collaboration.setCurrentTopic: {}: new topic is {}", this.getIdentity(),
- topic);
- }
- this.currentTopic.notifyAll();
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Collaboration.setCurrentTopic: {} setting current topic to null",
- this.toString());
- }
- this.currentTopic = null;
- }
- } // synchronized
- }
-
- private void awaitTopic(Topic topic, boolean interruptible) throws InterruptedException {
- // wait while currentTopic exists and doesn't match my topic
- boolean isDebugEnabled = logger.isDebugEnabled();
- synchronized (topic) {
- while (!topic.isCurrentTopic()) {
- if (topic.isOldTopic()) {
- // warning: cannot call toString while under sync(topic)
- Assert.assertTrue(false,
- "[" + getIdentity() + ".awaitTopic] attempting to wait on old topic");
- }
- boolean interrupted = Thread.interrupted();
- try {
- // In order to examine the current topic, we would need to
- // lock the topicsQueue and then the topic, in that order.
- // No can do in this instance (wrong lock ordering) but we still want
- // a sense of why we did the wait.
- Topic sniff = this.currentTopic;
- if (isDebugEnabled) {
- logger.debug(
- "Collaboration.awaitTopic: {} waiting for topic {}; current topic probably {}, which may have a thread count of {}",
- getIdentity(), topic, sniff.toString(), sniff.threadCount());
- }
- topic.wait();
- } catch (InterruptedException e) {
- if (interruptible)
- throw e;
- interrupted = true;
- this.stopper.checkCancelInProgress(e);
- } finally {
- if (interrupted)
- Thread.currentThread().interrupt();
- }
- }
- }
-
- // remove this assertion after we're sure this class is working...
- /*
- * Assert.assertTrue(isCurrentTopic(topic.getTopicObject()), "Failed to make " + topic +
- * " the topic for " + this);
- */
- }
-
- /**
- * Acquire permission to participate in the collaboration without waiting.
- *
- * @param topicObject Object to collaborate on
- * @return true if participation in the collaboration was acquired
- */
- public boolean tryAcquire(Object topicObject) {
- throw new UnsupportedOperationException(
- LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString());
- }
-
- /**
- * Acquire permission to participate in the collaboration; waits the specified timeout.
- *
- * @param topicObject Object to collaborate on
- * @param timeout the maximum time to wait for a permit
- * @param unit the time unit of the <tt>timeout</tt> argument.
- * @return true if participation in the collaboration was acquired
- *
- * @throws InterruptedException if thread is interrupted
- */
- public boolean tryAcquire(Object topicObject, long timeout, TimeUnit unit)
- throws InterruptedException {
- throw new UnsupportedOperationException(
- LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString());
- }
-
- /**
- * Releases the current thread's participation in the collaboration. When the last thread involved
- * in the current topic has released, a new topic can be started by any waiting threads.
- * <p>
- * Nothing happens if the calling thread is not participating in the current topic.
- */
- public void release() {
- final boolean isDebugEnabled = logger.isDebugEnabled();
- synchronized (this.topicsQueue) {
- Topic topic = this.currentTopic;
- if (topic == null) {
- throw new IllegalStateException(
- LocalizedStrings.Collaboration_COLLABORATION_HAS_NO_CURRENT_TOPIC.toLocalizedString());
- }
- if (isDebugEnabled) {
- logger.debug("Collaboration.release: {} releasing topic", this.toString());
- }
- if (topic.isEmptyAfterRemovingThread(Thread.currentThread())) {
- if (isDebugEnabled) {
- logger.debug("Collaboration.release: {} released old topic {}", this.toString(), topic);
- }
- // current topic is done... release it
- this.topicsMap.remove(topic.getTopicObject());
- if (!this.topicsQueue.isEmpty()) {
- // next topic becomes the current topic
- Topic nextTopic = (Topic) this.topicsQueue.remove(0);
- setCurrentTopic(nextTopic);
- } else {
- setCurrentTopic(null);
- }
- } else {
- if (isDebugEnabled) {
- logger.debug("Collaboration.release: {} released current topic ", this.toString());
- }
- }
- } // synchronized
- }
-
- /** Returns true if a collaboration topic currently exists. */
- public boolean hasCurrentTopic(Thread thread) {
- synchronized (this.topicsQueue) {
- if (this.currentTopic == null)
- return false;
- return this.currentTopic.hasThread(thread);
- }
- }
-
- /** Returns true if a collaboration topic currently exists. */
- public boolean hasCurrentTopic() {
- synchronized (this.topicsQueue) {
- return (this.currentTopic != null);
- }
- }
-
- /** Returns true if topic matches the current collaboration topic. */
- public boolean isCurrentTopic(Object topicObject) {
- if (topicObject == null) {
- throw new IllegalArgumentException(
- LocalizedStrings.Collaboration_TOPIC_MUST_BE_SPECIFIED.toLocalizedString());
- }
- synchronized (this.topicsQueue) {
- if (this.currentTopic == null) {
- return false;
- }
- return this.currentTopic.getTopicObject().equals(topicObject);
- }
- }
-
- @Override
- public String toString() {
- synchronized (this.topicsQueue) {
- Topic topic = this.currentTopic;
- int threadCount = 0;
- if (topic != null) {
- threadCount = topic.threadCount();
- }
- return getIdentity() + ": topic=" + topic + " threadCount=" + threadCount;
- }
- }
-
- protected String getIdentity() {
- String me = super.toString();
- return me.substring(me.lastIndexOf(".") + 1);
- }
-
- /**
- * Blocking threads will wait on this wrapper object. As threads release, they will be removed
- * from the Topic. The last one removed will notifyAll on the next Topic in topicsQueue.
- */
- static public class Topic {
-
- private boolean isCurrentTopic = false;
-
- private boolean isOldTopic = false;
-
- private final Object topicObject;
-
- /**
- * guarded.By {@link Collaboration#topicsQueue} guarded.By this instance, <em>after</em>
- * acquiring the topicsQueue
- */
- private final List participatingThreads = new ArrayList();
-
- /** Constructs new Topic to wrap the internal topicObject. */
- public Topic(Object topicObject) {
- this.topicObject = topicObject;
- }
-
- public boolean isCurrentTopic() {
- synchronized (this) {
- return this.isCurrentTopic;
- }
- }
-
- public boolean isOldTopic() {
- synchronized (this) {
- return this.isOldTopic;
- }
- }
-
- public Object getTopicObject() {
- synchronized (this) {
- return this.topicObject;
- }
- }
-
- public void setOldTopic(boolean v) {
- synchronized (this) {
- this.isOldTopic = v;
- }
- }
-
- public void setCurrentTopic(boolean v) {
- synchronized (this) {
- this.isOldTopic = v;
- }
- }
-
- /**
- * Atomically removes thread and returns true if there are no more participating threads.
- */
- public boolean isEmptyAfterRemovingThread(Thread thread) {
- synchronized (this) {
- boolean removed = this.participatingThreads.remove(thread);
- if (!removed) {
- Assert.assertTrue(false, "thread " + thread + " was not participating in " + this);
- }
- /*
- * if (Collaboration.this.debugEnabled()) { Collaboration.this.log.fine("[" +
- * Collaboration.this.getIdentity() + ".Topic] removed " + thread + " from " + this +
- * "; remaining threads: " + this.participatingThreads); }
- */
- return this.participatingThreads.isEmpty();
- }
- }
-
- /** Adds thread to list of threads participating in this topic. */
- public void addThread(Thread thread) {
- synchronized (this) {
- this.participatingThreads.add(thread);
- }
- }
-
- /** Returns true if the thread was removed from participating threads. */
- public boolean removeThread(Thread thread) {
- synchronized (this) {
- return this.participatingThreads.remove(thread);
- }
- }
-
- /** Returns count of threads participating in this topic. */
- public int threadCount() {
- synchronized (this) {
- return this.participatingThreads.size();
- }
- }
-
- /** Returns true if the thread is one of the participating threads. */
- public boolean hasThread(Thread thread) {
- synchronized (this) {
- return this.participatingThreads.contains(thread);
- }
- }
-
- @Override
- public String toString() {
- String nick = super.toString();
- nick = nick.substring(nick.lastIndexOf(".") + 1);
- return nick + ": " + topicObject;
- }
-
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/geode/blob/a76aaf0c/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java
deleted file mode 100755
index f658dec..0000000
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.distributed.internal.locks;
-
-import static org.junit.Assert.*;
-
-import java.util.*;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.LogWriter;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LocalLogWriter;
-import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.IntegrationTest;
-import org.apache.geode.internal.logging.InternalLogWriter;
-
-/**
- * Tests the Collaboration Lock used internally by dlock service.
- *
- * @since GemFire 4.1.1
- */
-@Category(IntegrationTest.class)
-@Ignore("Test is broken and was named CollaborationJUnitDisabledTest")
-public class CollaborationJUnitTest {
-
- protected LogWriter log = new LocalLogWriter(InternalLogWriter.INFO_LEVEL);
- protected Collaboration collaboration;
-
- @Before
- public void setUp() throws Exception {
- this.collaboration = new Collaboration(new CancelCriterion() {
- @Override
- public String cancelInProgress() {
- return null;
- }
-
- @Override
- public RuntimeException generateCancelledException(Throwable e) {
- return null;
- }
- });
- }
-
- @After
- public void tearDown() throws Exception {
- this.collaboration = null;
- }
-
- protected volatile boolean flagTestBlocksUntilRelease = false;
- protected volatile boolean threadBStartedTestBlocksUntilRelease = false;
-
- @Test
- public void testBlocksUntilRelease() throws Exception {
- this.log.info("[testBlocksUntilRelease]");
- Thread threadA = new Thread(group, new Runnable() {
- @Override
- public void run() {
- collaboration.acquireUninterruptibly("topicA");
- try {
- flagTestBlocksUntilRelease = true;
- while (flagTestBlocksUntilRelease) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException ignore) {
- fail("interrupted");
- }
- }
- } finally {
- collaboration.release();
- }
- }
- });
-
- // thread one acquires
- threadA.start();
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return CollaborationJUnitTest.this.flagTestBlocksUntilRelease;
- }
-
- @Override
- public String description() {
- return "waiting for thread";
- }
- };
- Wait.waitForCriterion(ev, 5 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadA));
-
- // thread two blocks until one releeases
- Thread threadB = new Thread(group, new Runnable() {
- @Override
- public void run() {
- threadBStartedTestBlocksUntilRelease = true;
- collaboration.acquireUninterruptibly("topicB");
- try {
- flagTestBlocksUntilRelease = true;
- WaitCriterion ev2 = new WaitCriterion() {
- @Override
- public boolean done() {
- return !flagTestBlocksUntilRelease;
- }
-
- @Override
- public String description() {
- return "waiting for release";
- }
- };
- Wait.waitForCriterion(ev2, 20 * 1000, 200, true);
- } finally {
- collaboration.release();
- }
- }
- });
-
- // start up threadB
- threadB.start();
- ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return threadBStartedTestBlocksUntilRelease;
- }
-
- @Override
- public String description() {
- return "waiting for thread b";
- }
- };
- Wait.waitForCriterion(ev, 5 * 1000, 200, true);
-
- // threadA holds topic and threadB is waiting...
- assertTrue(this.collaboration.hasCurrentTopic(threadA));
- assertFalse(this.collaboration.hasCurrentTopic(threadB));
-
- // let threadA release so that threadB gets lock
- this.flagTestBlocksUntilRelease = false;
- ThreadUtils.join(threadA, 30 * 1000);
-
- // make sure threadB is doing what it's supposed to do...
- ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return flagTestBlocksUntilRelease;
- }
-
- @Override
- public String description() {
- return "threadB";
- }
- };
- Wait.waitForCriterion(ev, 5 * 1000, 200, true);
- // threadB must have lock now... let threadB release
- assertTrue(this.collaboration.hasCurrentTopic(threadB));
- this.flagTestBlocksUntilRelease = false;
- ThreadUtils.join(threadB, 30 * 1000);
-
- // collaboration should be free now
- assertFalse(this.collaboration.hasCurrentTopic(threadA));
- assertFalse(this.collaboration.hasCurrentTopic(threadB));
- assertFalse(this.collaboration.hasCurrentTopic());
- }
-
- protected volatile boolean threadAFlag_TestLateComerJoinsIn = false;
- protected volatile boolean threadBFlag_TestLateComerJoinsIn = false;
- protected volatile boolean threadCFlag_TestLateComerJoinsIn = true;
- protected volatile boolean threadDFlag_TestLateComerJoinsIn = false;
-
- @Test
- public void testLateComerJoinsIn() throws Exception {
- this.log.info("[testLateComerJoinsIn]");
-
- final Object topicA = "topicA";
- final Object topicB = "topicB";
-
- // threads one and two acquire
- Thread threadA = new Thread(group, new Runnable() {
- @Override
- public void run() {
- collaboration.acquireUninterruptibly(topicA);
- try {
- threadAFlag_TestLateComerJoinsIn = true;
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return !threadAFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- } finally {
- collaboration.release();
- }
- }
- });
- threadA.start();
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return threadAFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return "wait for ThreadA";
- }
- };
- Wait.waitForCriterion(ev, 30 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadA));
- assertTrue(this.collaboration.isCurrentTopic(topicA));
-
- Thread threadB = new Thread(group, new Runnable() {
- @Override
- public void run() {
- collaboration.acquireUninterruptibly(topicA);
- try {
- threadBFlag_TestLateComerJoinsIn = true;
- WaitCriterion ev2 = new WaitCriterion() {
- @Override
- public boolean done() {
- return !threadBFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev2, 60 * 1000, 200, true);
- } finally {
- collaboration.release();
- }
- }
- });
- threadB.start();
- ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return threadBFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return "";
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadB));
-
- // thread three blocks for new topic
- Thread threadC = new Thread(group, new Runnable() {
- @Override
- public void run() {
- threadCFlag_TestLateComerJoinsIn = false;
- collaboration.acquireUninterruptibly(topicB);
- try {
- threadCFlag_TestLateComerJoinsIn = true;
- WaitCriterion ev2 = new WaitCriterion() {
- @Override
- public boolean done() {
- return !threadCFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev2, 60 * 1000, 200, true);
- } finally {
- collaboration.release();
- }
- }
- });
- threadC.start();
- ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return threadCFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
-
- // thread four (lateComer) acquires current topic immediately
- Thread threadD = new Thread(group, new Runnable() {
- @Override
- public void run() {
- collaboration.acquireUninterruptibly(topicA);
- try {
- threadDFlag_TestLateComerJoinsIn = true;
- while (threadDFlag_TestLateComerJoinsIn) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException ignore) {
- fail("interrupted");
- }
- }
- } finally {
- collaboration.release();
- }
- }
- });
- threadD.start();
- ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return threadDFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadD));
-
- // release threadA
- this.threadAFlag_TestLateComerJoinsIn = false;
- ThreadUtils.join(threadA, 30 * 1000);
- assertFalse(this.collaboration.hasCurrentTopic(threadA));
- assertTrue(this.collaboration.hasCurrentTopic(threadB));
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertTrue(this.collaboration.hasCurrentTopic(threadD));
- assertTrue(this.collaboration.isCurrentTopic(topicA));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
-
- // release threadB
- this.threadBFlag_TestLateComerJoinsIn = false;
- ThreadUtils.join(threadB, 30 * 1000);
- assertFalse(this.collaboration.hasCurrentTopic(threadB));
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertTrue(this.collaboration.hasCurrentTopic(threadD));
- assertTrue(this.collaboration.isCurrentTopic(topicA));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
-
- // release threadD
- this.threadDFlag_TestLateComerJoinsIn = false;
- ThreadUtils.join(threadD, 30 * 1000);
- ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return threadCFlag_TestLateComerJoinsIn;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- assertTrue(this.collaboration.hasCurrentTopic(threadC));
- assertFalse(this.collaboration.hasCurrentTopic(threadD));
- assertFalse(this.collaboration.isCurrentTopic(topicA));
- assertTrue(this.collaboration.isCurrentTopic(topicB));
-
- // release threadC
- this.threadCFlag_TestLateComerJoinsIn = false;
- ThreadUtils.join(threadC, 30 * 1000);
- assertFalse(this.collaboration.hasCurrentTopic(threadC));
- assertFalse(this.collaboration.isCurrentTopic(topicA));
- assertFalse(this.collaboration.isCurrentTopic(topicB));
- }
-
- protected List waitingList = Collections.synchronizedList(new ArrayList());
- protected List fairnessList = Collections.synchronizedList(new ArrayList());
- protected volatile boolean runTestFairnessStressfully = true;
-
- @Test
- public void testFairnessStressfully() throws Exception {
- this.log.info("[testFairnessStressfully]");
- final int numThreads = 20;
- Thread threads[] = new Thread[numThreads];
-
- Runnable run = new Runnable() {
- public void run() {
- boolean released = false;
- try {
- String uniqueTopic = Thread.currentThread().getName();
- while (runTestFairnessStressfully) {
- waitingList.add(uniqueTopic);
- collaboration.acquireUninterruptibly(uniqueTopic);
- try {
- released = false;
- fairnessList.add(uniqueTopic);
- waitingList.remove(uniqueTopic);
- } finally {
- // wait for the other threads to line up...
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return !runTestFairnessStressfully || waitingList.size() >= numThreads - 1;
- }
-
- @Override
- public String description() {
- return "other threads lining up";
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- collaboration.release();
- released = true;
- }
- }
- } finally {
- if (!released) {
- collaboration.release();
- }
- }
- }
- };
-
- try {
- // many threads loop: acquire and release with unique topic
- for (int t = 0; t < threads.length; t++) {
- threads[t] = new Thread(group, run, String.valueOf(t));
- threads[t].start();
- }
-
- log.info("Started all threads... waiting for test to complete.");
-
- // wait for numThreads * 10
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return fairnessList.size() >= numThreads * 20;
- }
-
- @Override
- public String description() {
- return "waiting for numThreads * 10";
- }
- };
- Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
- } finally {
- if (this.runTestFairnessStressfully) {
- this.runTestFairnessStressfully = false;
- }
- }
-
- for (int t = 0; t < threads.length; t++) {
- ThreadUtils.join(threads[t], 30 * 1000);
- }
-
- // assert that all topics are acquired in order
- // count number of occurrences of each thread
- int count[] = new int[numThreads];
- for (int i = 0; i < count.length; i++) { // shouldn't be necessary
- count[i] = 0;
- }
- synchronized (this.fairnessList) {
- for (Iterator iter = this.fairnessList.iterator(); iter.hasNext();) {
- int id = Integer.valueOf((String) iter.next()).intValue();
- count[id] = count[id] + 1;
- }
- }
-
- int totalLocks = 0;
- int minLocks = Integer.MAX_VALUE;
- int maxLocks = 0;
- for (int i = 0; i < count.length; i++) {
- int locks = count[i];
- this.log.fine("testFairnessStressfully thread-" + i + " acquired topic " + locks + " times.");
- if (locks < minLocks)
- minLocks = locks;
- if (locks > maxLocks)
- maxLocks = locks;
- totalLocks = totalLocks + locks;
- }
-
- this.log.info("[testFairnessStressfully] totalLocks=" + totalLocks + " minLocks=" + minLocks
- + " maxLocks=" + maxLocks);
-
- int expectedLocks = (totalLocks / numThreads) + 1;
-
- // NOTE: if you turn on fine logs, this deviation may be too small...
- // slower machines may also fail depending on thread scheduling
- int deviation = (int) (expectedLocks * 0.25);
- int lowThreshold = expectedLocks - deviation;
- int highThreshold = expectedLocks + deviation;
-
- this.log.info("[testFairnessStressfully] deviation=" + deviation + " expectedLocks="
- + expectedLocks + " lowThreshold=" + lowThreshold + " highThreshold=" + highThreshold);
-
- // if these assertions keep failing we'll have to rewrite the test
- // to handle scheduling of the threads...
-
- assertTrue("minLocks is less than lowThreshold", minLocks >= lowThreshold);
- assertTrue("maxLocks is greater than highThreshold", maxLocks <= highThreshold);
- }
-
- @Test
- public void testHasCurrentTopic() throws Exception {
- this.log.info("[testHasCurrentTopic]");
- assertTrue(!this.collaboration.hasCurrentTopic());
- this.collaboration.acquireUninterruptibly("testHasCurrentTopic");
- try {
- assertTrue(this.collaboration.hasCurrentTopic());
- } finally {
- this.collaboration.release();
- }
- assertTrue(!this.collaboration.hasCurrentTopic());
- }
-
- protected volatile boolean flagTestThreadHasCurrentTopic = false;
-
- @Test
- public void testThreadHasCurrentTopic() throws Exception {
- this.log.info("[testThreadHasCurrentTopic]");
- Thread thread = new Thread(group, new Runnable() {
- @Override
- public void run() {
- collaboration.acquireUninterruptibly("testThreadHasCurrentTopic");
- try {
- flagTestThreadHasCurrentTopic = true;
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return !flagTestThreadHasCurrentTopic;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
- } finally {
- collaboration.release();
- }
- }
- });
-
- // before starting thread, hasCurrentTopic(thread) returns false
- assertTrue(!this.collaboration.hasCurrentTopic(thread));
- thread.start();
- WaitCriterion ev = new WaitCriterion() {
- @Override
- public boolean done() {
- return flagTestThreadHasCurrentTopic;
- }
-
- @Override
- public String description() {
- return null;
- }
- };
- Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-
- // after starting thread, hasCurrentTopic(thread) returns true
- assertTrue(this.collaboration.hasCurrentTopic(thread));
- this.flagTestThreadHasCurrentTopic = false;
- ThreadUtils.join(thread, 30 * 1000);
-
- // after thread finishes, hasCurrentTopic(thread) returns false
- assertTrue(!this.collaboration.hasCurrentTopic(thread));
- }
-
- @Test
- public void testIsCurrentTopic() throws Exception {
- this.log.info("[testIsCurrentTopic]");
- Object topic = "testIsCurrentTopic";
- assertTrue(!this.collaboration.isCurrentTopic(topic));
- this.collaboration.acquireUninterruptibly(topic);
- try {
- assertTrue(this.collaboration.isCurrentTopic(topic));
- } finally {
- this.collaboration.release();
- }
- assertTrue(!this.collaboration.isCurrentTopic(topic));
- }
-
- protected final ThreadGroup group = new ThreadGroup("CollaborationJUnitTest Threads") {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- if (e instanceof VirtualMachineError) {
- SystemFailure.setFailure((VirtualMachineError) e); // don't throw
- }
- String s = "Uncaught exception in thread " + t;
- log.error(s, e);
- fail(s);
- }
- };
-}
-