You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/12/27 22:58:16 UTC

geode git commit: Removing unused Collaboration lock class and its unit tests

Repository: geode
Updated Branches:
  refs/heads/develop 462ebb032 -> a76aaf0c0


Removing unused Collaboration lock class and its unit tests

This class isn't used in Geode and isn't a public API.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a76aaf0c
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a76aaf0c
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a76aaf0c

Branch: refs/heads/develop
Commit: a76aaf0c02e2c4a32d7c001073f1ecc316470b60
Parents: 462ebb0
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Tue Dec 27 14:57:21 2016 -0800
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Tue Dec 27 14:57:21 2016 -0800

----------------------------------------------------------------------
 .../internal/locks/Collaboration.java           | 454 --------------
 .../internal/locks/CollaborationJUnitTest.java  | 615 -------------------
 2 files changed, 1069 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/a76aaf0c/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java
deleted file mode 100644
index ca9ef70..0000000
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/Collaboration.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.distributed.internal.locks;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * Synchronization structure which allows multiple threads to lock the structure. Implementation is
- * fair: the next waiting thread will be serviced.
- * <p>
- * Collaborating threads may jointly synchronize on this structure if they all agree on the same
- * topic of collaboration.
- * <p>
- * Threads that want to change the topic will wait until the current topic has been released.
- *
- */
-public class Collaboration {
-
-  private static final Logger logger = LogService.getLogger();
-
-  private final static Object NULL_TOPIC = null;
-
-  /**
-   * The current topic of collaboration
-   *
-   * guarded.By {@link #topicsQueue}
-   */
-  private volatile Topic currentTopic;
-
-  /** Ordered queue of pending topics for collaboration */
-  private final List topicsQueue = new ArrayList();
-
-  /** Map of external topic to internal wrapper object (Topic) */
-  private final Map topicsMap = new HashMap();
-
-  private final CancelCriterion stopper;
-
-  /**
-   * Constructs new stoppable instance of Collaboration which will heed an interrupt request if it
-   * is acceptable to the creator of the lock.
-   */
-  public Collaboration(CancelCriterion stopper) {
-    this.stopper = stopper;
-  }
-
-  /**
-   * Acquire permission to participate in the collaboration. Returns immediately if topic matches
-   * the current topic. Otherwise, this will block until the Collaboration has been freed by the
-   * threads working on the current topic. This call is interruptible.
-   *
-   * @param topicObject Object to collaborate on
-   *
-   * @throws InterruptedException if thread is interrupted
-   */
-  public void acquire(Object topicObject) throws InterruptedException {
-    throw new UnsupportedOperationException(
-        LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString());
-  }
-
-  /**
-   * Must be synchronized on this.topicsQueue... Asserts that thread is not reentering.
-   */
-  private void assertNotRecursingTopic(Object topicObject) {
-    Assert.assertTrue(false, Thread.currentThread() + " attempting to lock topic " + topicObject
-        + " while locking topic " + this.currentTopic);
-  }
-
-  /**
-   * Acquire permission to participate in the collaboration. Returns immediately if topic matches
-   * the current topic. Otherwise, this will block until the Collaboration has been freed by the
-   * threads working on the current topic. This call is uninterruptible.
-   *
-   * @param topic Object to collaborate on
-   */
-  public void acquireUninterruptibly(final Object topic) {
-    Object topicObject = topic;
-    if (topicObject == null) {
-      topicObject = NULL_TOPIC;
-    }
-
-    Topic pendingTopic = null;
-    synchronized (this.topicsQueue) {
-      // if no topic then setup and return
-      if (this.currentTopic == null) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "Collaboration.acquireUninterruptibly: {}: no current topic, setting topic to {}",
-              this.toString(), topicObject);
-        }
-        setCurrentTopic(new Topic(topicObject));
-        this.currentTopic.addThread(Thread.currentThread());
-        this.topicsMap.put(topicObject, this.currentTopic);
-        return;
-      }
-
-      else if (isCurrentTopic(topicObject)) {
-        // assertNotRecursingTopic(topicObject);
-        if (logger.isDebugEnabled()) {
-          logger.debug("Collaboration.acquireUninterruptibly: {}: already current topic: {}",
-              this.toString(), topicObject);
-        }
-        this.currentTopic.addThread(Thread.currentThread());
-        return;
-      }
-
-      else if (hasCurrentTopic(Thread.currentThread())) {
-        assertNotRecursingTopic(topicObject);
-      }
-
-      // if other topic then add to pending topics and then wait
-      else {
-        pendingTopic = (Topic) this.topicsMap.get(topicObject);
-        if (pendingTopic == null) {
-          pendingTopic = new Topic(topicObject);
-          this.topicsMap.put(topicObject, pendingTopic);
-          this.topicsQueue.add(pendingTopic);
-        }
-        pendingTopic.addThread(Thread.currentThread());
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "Collaboration.acquireUninterruptibly: {}: adding pendingTopic {}; current topic is {}",
-              this.toString(), pendingTopic, this.currentTopic);
-        }
-      }
-    } // synchronized
-    // now await the topic change uninterruptibly...
-    boolean interrupted = Thread.interrupted();
-    try {
-      awaitTopic(pendingTopic, false);
-    } catch (InterruptedException e) { // LOST INTERRUPT
-      interrupted = true;
-      this.stopper.checkCancelInProgress(e);
-    } finally {
-      if (interrupted)
-        Thread.currentThread().interrupt();
-    }
-  }
-
-  private void setCurrentTopic(Topic topic) {
-    synchronized (this.topicsQueue) {
-      if (this.currentTopic != null) {
-        synchronized (this.currentTopic) {
-          this.currentTopic.setCurrentTopic(false);
-          this.currentTopic.setOldTopic(true);
-        }
-      }
-      if (topic != null) {
-        synchronized (topic) {
-          topic.setCurrentTopic(true);
-          this.currentTopic = topic;
-          if (logger.isDebugEnabled()) {
-            logger.debug("Collaboration.setCurrentTopic: {}: new topic is {}", this.getIdentity(),
-                topic);
-          }
-          this.currentTopic.notifyAll();
-        }
-      } else {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Collaboration.setCurrentTopic: {} setting current topic to null",
-              this.toString());
-        }
-        this.currentTopic = null;
-      }
-    } // synchronized
-  }
-
-  private void awaitTopic(Topic topic, boolean interruptible) throws InterruptedException {
-    // wait while currentTopic exists and doesn't match my topic
-    boolean isDebugEnabled = logger.isDebugEnabled();
-    synchronized (topic) {
-      while (!topic.isCurrentTopic()) {
-        if (topic.isOldTopic()) {
-          // warning: cannot call toString while under sync(topic)
-          Assert.assertTrue(false,
-              "[" + getIdentity() + ".awaitTopic] attempting to wait on old topic");
-        }
-        boolean interrupted = Thread.interrupted();
-        try {
-          // In order to examine the current topic, we would need to
-          // lock the topicsQueue and then the topic, in that order.
-          // No can do in this instance (wrong lock ordering) but we still want
-          // a sense of why we did the wait.
-          Topic sniff = this.currentTopic;
-          if (isDebugEnabled) {
-            logger.debug(
-                "Collaboration.awaitTopic: {} waiting for topic {}; current topic probably {}, which may have a thread count of {}",
-                getIdentity(), topic, sniff.toString(), sniff.threadCount());
-          }
-          topic.wait();
-        } catch (InterruptedException e) {
-          if (interruptible)
-            throw e;
-          interrupted = true;
-          this.stopper.checkCancelInProgress(e);
-        } finally {
-          if (interrupted)
-            Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-    // remove this assertion after we're sure this class is working...
-    /*
-     * Assert.assertTrue(isCurrentTopic(topic.getTopicObject()), "Failed to make " + topic +
-     * " the topic for " + this);
-     */
-  }
-
-  /**
-   * Acquire permission to participate in the collaboration without waiting.
-   *
-   * @param topicObject Object to collaborate on
-   * @return true if participation in the collaboration was acquired
-   */
-  public boolean tryAcquire(Object topicObject) {
-    throw new UnsupportedOperationException(
-        LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString());
-  }
-
-  /**
-   * Acquire permission to participate in the collaboration; waits the specified timeout.
-   *
-   * @param topicObject Object to collaborate on
-   * @param timeout the maximum time to wait for a permit
-   * @param unit the time unit of the <tt>timeout</tt> argument.
-   * @return true if participation in the collaboration was acquired
-   *
-   * @throws InterruptedException if thread is interrupted
-   */
-  public boolean tryAcquire(Object topicObject, long timeout, TimeUnit unit)
-      throws InterruptedException {
-    throw new UnsupportedOperationException(
-        LocalizedStrings.Collaboration_NOT_IMPLEMENTED.toLocalizedString());
-  }
-
-  /**
-   * Releases the current thread's participation in the collaboration. When the last thread involved
-   * in the current topic has released, a new topic can be started by any waiting threads.
-   * <p>
-   * Nothing happens if the calling thread is not participating in the current topic.
-   */
-  public void release() {
-    final boolean isDebugEnabled = logger.isDebugEnabled();
-    synchronized (this.topicsQueue) {
-      Topic topic = this.currentTopic;
-      if (topic == null) {
-        throw new IllegalStateException(
-            LocalizedStrings.Collaboration_COLLABORATION_HAS_NO_CURRENT_TOPIC.toLocalizedString());
-      }
-      if (isDebugEnabled) {
-        logger.debug("Collaboration.release: {} releasing topic", this.toString());
-      }
-      if (topic.isEmptyAfterRemovingThread(Thread.currentThread())) {
-        if (isDebugEnabled) {
-          logger.debug("Collaboration.release: {} released old topic {}", this.toString(), topic);
-        }
-        // current topic is done... release it
-        this.topicsMap.remove(topic.getTopicObject());
-        if (!this.topicsQueue.isEmpty()) {
-          // next topic becomes the current topic
-          Topic nextTopic = (Topic) this.topicsQueue.remove(0);
-          setCurrentTopic(nextTopic);
-        } else {
-          setCurrentTopic(null);
-        }
-      } else {
-        if (isDebugEnabled) {
-          logger.debug("Collaboration.release: {} released current topic ", this.toString());
-        }
-      }
-    } // synchronized
-  }
-
-  /** Returns true if a collaboration topic currently exists. */
-  public boolean hasCurrentTopic(Thread thread) {
-    synchronized (this.topicsQueue) {
-      if (this.currentTopic == null)
-        return false;
-      return this.currentTopic.hasThread(thread);
-    }
-  }
-
-  /** Returns true if a collaboration topic currently exists. */
-  public boolean hasCurrentTopic() {
-    synchronized (this.topicsQueue) {
-      return (this.currentTopic != null);
-    }
-  }
-
-  /** Returns true if topic matches the current collaboration topic. */
-  public boolean isCurrentTopic(Object topicObject) {
-    if (topicObject == null) {
-      throw new IllegalArgumentException(
-          LocalizedStrings.Collaboration_TOPIC_MUST_BE_SPECIFIED.toLocalizedString());
-    }
-    synchronized (this.topicsQueue) {
-      if (this.currentTopic == null) {
-        return false;
-      }
-      return this.currentTopic.getTopicObject().equals(topicObject);
-    }
-  }
-
-  @Override
-  public String toString() {
-    synchronized (this.topicsQueue) {
-      Topic topic = this.currentTopic;
-      int threadCount = 0;
-      if (topic != null) {
-        threadCount = topic.threadCount();
-      }
-      return getIdentity() + ": topic=" + topic + " threadCount=" + threadCount;
-    }
-  }
-
-  protected String getIdentity() {
-    String me = super.toString();
-    return me.substring(me.lastIndexOf(".") + 1);
-  }
-
-  /**
-   * Blocking threads will wait on this wrapper object. As threads release, they will be removed
-   * from the Topic. The last one removed will notifyAll on the next Topic in topicsQueue.
-   */
-  static public class Topic {
-
-    private boolean isCurrentTopic = false;
-
-    private boolean isOldTopic = false;
-
-    private final Object topicObject;
-
-    /**
-     * guarded.By {@link Collaboration#topicsQueue} guarded.By this instance, <em>after</em>
-     * acquiring the topicsQueue
-     */
-    private final List participatingThreads = new ArrayList();
-
-    /** Constructs new Topic to wrap the internal topicObject. */
-    public Topic(Object topicObject) {
-      this.topicObject = topicObject;
-    }
-
-    public boolean isCurrentTopic() {
-      synchronized (this) {
-        return this.isCurrentTopic;
-      }
-    }
-
-    public boolean isOldTopic() {
-      synchronized (this) {
-        return this.isOldTopic;
-      }
-    }
-
-    public Object getTopicObject() {
-      synchronized (this) {
-        return this.topicObject;
-      }
-    }
-
-    public void setOldTopic(boolean v) {
-      synchronized (this) {
-        this.isOldTopic = v;
-      }
-    }
-
-    public void setCurrentTopic(boolean v) {
-      synchronized (this) {
-        this.isOldTopic = v;
-      }
-    }
-
-    /**
-     * Atomically removes thread and returns true if there are no more participating threads.
-     */
-    public boolean isEmptyAfterRemovingThread(Thread thread) {
-      synchronized (this) {
-        boolean removed = this.participatingThreads.remove(thread);
-        if (!removed) {
-          Assert.assertTrue(false, "thread " + thread + " was not participating in " + this);
-        }
-        /*
-         * if (Collaboration.this.debugEnabled()) { Collaboration.this.log.fine("[" +
-         * Collaboration.this.getIdentity() + ".Topic] removed " + thread + " from " + this +
-         * "; remaining threads: " + this.participatingThreads); }
-         */
-        return this.participatingThreads.isEmpty();
-      }
-    }
-
-    /** Adds thread to list of threads participating in this topic. */
-    public void addThread(Thread thread) {
-      synchronized (this) {
-        this.participatingThreads.add(thread);
-      }
-    }
-
-    /** Returns true if the thread was removed from participating threads. */
-    public boolean removeThread(Thread thread) {
-      synchronized (this) {
-        return this.participatingThreads.remove(thread);
-      }
-    }
-
-    /** Returns count of threads participating in this topic. */
-    public int threadCount() {
-      synchronized (this) {
-        return this.participatingThreads.size();
-      }
-    }
-
-    /** Returns true if the thread is one of the participating threads. */
-    public boolean hasThread(Thread thread) {
-      synchronized (this) {
-        return this.participatingThreads.contains(thread);
-      }
-    }
-
-    @Override
-    public String toString() {
-      String nick = super.toString();
-      nick = nick.substring(nick.lastIndexOf(".") + 1);
-      return nick + ": " + topicObject;
-    }
-
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/geode/blob/a76aaf0c/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java
deleted file mode 100755
index f658dec..0000000
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/CollaborationJUnitTest.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.distributed.internal.locks;
-
-import static org.junit.Assert.*;
-
-import java.util.*;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.CancelCriterion;
-import org.apache.geode.LogWriter;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.internal.logging.LocalLogWriter;
-import org.apache.geode.test.dunit.ThreadUtils;
-import org.apache.geode.test.dunit.Wait;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.IntegrationTest;
-import org.apache.geode.internal.logging.InternalLogWriter;
-
-/**
- * Tests the Collaboration Lock used internally by dlock service.
- *
- * @since GemFire 4.1.1
- */
-@Category(IntegrationTest.class)
-@Ignore("Test is broken and was named CollaborationJUnitDisabledTest")
-public class CollaborationJUnitTest {
-
-  protected LogWriter log = new LocalLogWriter(InternalLogWriter.INFO_LEVEL);
-  protected Collaboration collaboration;
-
-  @Before
-  public void setUp() throws Exception {
-    this.collaboration = new Collaboration(new CancelCriterion() {
-      @Override
-      public String cancelInProgress() {
-        return null;
-      }
-
-      @Override
-      public RuntimeException generateCancelledException(Throwable e) {
-        return null;
-      }
-    });
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    this.collaboration = null;
-  }
-
-  protected volatile boolean flagTestBlocksUntilRelease = false;
-  protected volatile boolean threadBStartedTestBlocksUntilRelease = false;
-
-  @Test
-  public void testBlocksUntilRelease() throws Exception {
-    this.log.info("[testBlocksUntilRelease]");
-    Thread threadA = new Thread(group, new Runnable() {
-      @Override
-      public void run() {
-        collaboration.acquireUninterruptibly("topicA");
-        try {
-          flagTestBlocksUntilRelease = true;
-          while (flagTestBlocksUntilRelease) {
-            try {
-              Thread.sleep(10);
-            } catch (InterruptedException ignore) {
-              fail("interrupted");
-            }
-          }
-        } finally {
-          collaboration.release();
-        }
-      }
-    });
-
-    // thread one acquires
-    threadA.start();
-    WaitCriterion ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return CollaborationJUnitTest.this.flagTestBlocksUntilRelease;
-      }
-
-      @Override
-      public String description() {
-        return "waiting for thread";
-      }
-    };
-    Wait.waitForCriterion(ev, 5 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadA));
-
-    // thread two blocks until one releeases
-    Thread threadB = new Thread(group, new Runnable() {
-      @Override
-      public void run() {
-        threadBStartedTestBlocksUntilRelease = true;
-        collaboration.acquireUninterruptibly("topicB");
-        try {
-          flagTestBlocksUntilRelease = true;
-          WaitCriterion ev2 = new WaitCriterion() {
-            @Override
-            public boolean done() {
-              return !flagTestBlocksUntilRelease;
-            }
-
-            @Override
-            public String description() {
-              return "waiting for release";
-            }
-          };
-          Wait.waitForCriterion(ev2, 20 * 1000, 200, true);
-        } finally {
-          collaboration.release();
-        }
-      }
-    });
-
-    // start up threadB
-    threadB.start();
-    ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return threadBStartedTestBlocksUntilRelease;
-      }
-
-      @Override
-      public String description() {
-        return "waiting for thread b";
-      }
-    };
-    Wait.waitForCriterion(ev, 5 * 1000, 200, true);
-
-    // threadA holds topic and threadB is waiting...
-    assertTrue(this.collaboration.hasCurrentTopic(threadA));
-    assertFalse(this.collaboration.hasCurrentTopic(threadB));
-
-    // let threadA release so that threadB gets lock
-    this.flagTestBlocksUntilRelease = false;
-    ThreadUtils.join(threadA, 30 * 1000);
-
-    // make sure threadB is doing what it's supposed to do...
-    ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return flagTestBlocksUntilRelease;
-      }
-
-      @Override
-      public String description() {
-        return "threadB";
-      }
-    };
-    Wait.waitForCriterion(ev, 5 * 1000, 200, true);
-    // threadB must have lock now... let threadB release
-    assertTrue(this.collaboration.hasCurrentTopic(threadB));
-    this.flagTestBlocksUntilRelease = false;
-    ThreadUtils.join(threadB, 30 * 1000);
-
-    // collaboration should be free now
-    assertFalse(this.collaboration.hasCurrentTopic(threadA));
-    assertFalse(this.collaboration.hasCurrentTopic(threadB));
-    assertFalse(this.collaboration.hasCurrentTopic());
-  }
-
-  protected volatile boolean threadAFlag_TestLateComerJoinsIn = false;
-  protected volatile boolean threadBFlag_TestLateComerJoinsIn = false;
-  protected volatile boolean threadCFlag_TestLateComerJoinsIn = true;
-  protected volatile boolean threadDFlag_TestLateComerJoinsIn = false;
-
-  @Test
-  public void testLateComerJoinsIn() throws Exception {
-    this.log.info("[testLateComerJoinsIn]");
-
-    final Object topicA = "topicA";
-    final Object topicB = "topicB";
-
-    // threads one and two acquire
-    Thread threadA = new Thread(group, new Runnable() {
-      @Override
-      public void run() {
-        collaboration.acquireUninterruptibly(topicA);
-        try {
-          threadAFlag_TestLateComerJoinsIn = true;
-          WaitCriterion ev = new WaitCriterion() {
-            @Override
-            public boolean done() {
-              return !threadAFlag_TestLateComerJoinsIn;
-            }
-
-            @Override
-            public String description() {
-              return null;
-            }
-          };
-          Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-        } finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadA.start();
-    WaitCriterion ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return threadAFlag_TestLateComerJoinsIn;
-      }
-
-      @Override
-      public String description() {
-        return "wait for ThreadA";
-      }
-    };
-    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadA));
-    assertTrue(this.collaboration.isCurrentTopic(topicA));
-
-    Thread threadB = new Thread(group, new Runnable() {
-      @Override
-      public void run() {
-        collaboration.acquireUninterruptibly(topicA);
-        try {
-          threadBFlag_TestLateComerJoinsIn = true;
-          WaitCriterion ev2 = new WaitCriterion() {
-            @Override
-            public boolean done() {
-              return !threadBFlag_TestLateComerJoinsIn;
-            }
-
-            @Override
-            public String description() {
-              return null;
-            }
-          };
-          Wait.waitForCriterion(ev2, 60 * 1000, 200, true);
-        } finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadB.start();
-    ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return threadBFlag_TestLateComerJoinsIn;
-      }
-
-      @Override
-      public String description() {
-        return "";
-      }
-    };
-    Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadB));
-
-    // thread three blocks for new topic
-    Thread threadC = new Thread(group, new Runnable() {
-      @Override
-      public void run() {
-        threadCFlag_TestLateComerJoinsIn = false;
-        collaboration.acquireUninterruptibly(topicB);
-        try {
-          threadCFlag_TestLateComerJoinsIn = true;
-          WaitCriterion ev2 = new WaitCriterion() {
-            @Override
-            public boolean done() {
-              return !threadCFlag_TestLateComerJoinsIn;
-            }
-
-            @Override
-            public String description() {
-              return null;
-            }
-          };
-          Wait.waitForCriterion(ev2, 60 * 1000, 200, true);
-        } finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadC.start();
-    ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return threadCFlag_TestLateComerJoinsIn;
-      }
-
-      @Override
-      public String description() {
-        return null;
-      }
-    };
-    Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-
-    // thread four (lateComer) acquires current topic immediately
-    Thread threadD = new Thread(group, new Runnable() {
-      @Override
-      public void run() {
-        collaboration.acquireUninterruptibly(topicA);
-        try {
-          threadDFlag_TestLateComerJoinsIn = true;
-          while (threadDFlag_TestLateComerJoinsIn) {
-            try {
-              Thread.sleep(10);
-            } catch (InterruptedException ignore) {
-              fail("interrupted");
-            }
-          }
-        } finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadD.start();
-    ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return threadDFlag_TestLateComerJoinsIn;
-      }
-
-      @Override
-      public String description() {
-        return null;
-      }
-    };
-    Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadD));
-
-    // release threadA
-    this.threadAFlag_TestLateComerJoinsIn = false;
-    ThreadUtils.join(threadA, 30 * 1000);
-    assertFalse(this.collaboration.hasCurrentTopic(threadA));
-    assertTrue(this.collaboration.hasCurrentTopic(threadB));
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertTrue(this.collaboration.hasCurrentTopic(threadD));
-    assertTrue(this.collaboration.isCurrentTopic(topicA));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-
-    // release threadB
-    this.threadBFlag_TestLateComerJoinsIn = false;
-    ThreadUtils.join(threadB, 30 * 1000);
-    assertFalse(this.collaboration.hasCurrentTopic(threadB));
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertTrue(this.collaboration.hasCurrentTopic(threadD));
-    assertTrue(this.collaboration.isCurrentTopic(topicA));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-
-    // release threadD
-    this.threadDFlag_TestLateComerJoinsIn = false;
-    ThreadUtils.join(threadD, 30 * 1000);
-    ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return threadCFlag_TestLateComerJoinsIn;
-      }
-
-      @Override
-      public String description() {
-        return null;
-      }
-    };
-    Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadC));
-    assertFalse(this.collaboration.hasCurrentTopic(threadD));
-    assertFalse(this.collaboration.isCurrentTopic(topicA));
-    assertTrue(this.collaboration.isCurrentTopic(topicB));
-
-    // release threadC
-    this.threadCFlag_TestLateComerJoinsIn = false;
-    ThreadUtils.join(threadC, 30 * 1000);
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertFalse(this.collaboration.isCurrentTopic(topicA));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-  }
-
-  protected List waitingList = Collections.synchronizedList(new ArrayList());
-  protected List fairnessList = Collections.synchronizedList(new ArrayList());
-  protected volatile boolean runTestFairnessStressfully = true;
-
-  @Test
-  public void testFairnessStressfully() throws Exception {
-    this.log.info("[testFairnessStressfully]");
-    final int numThreads = 20;
-    Thread threads[] = new Thread[numThreads];
-
-    Runnable run = new Runnable() {
-      public void run() {
-        boolean released = false;
-        try {
-          String uniqueTopic = Thread.currentThread().getName();
-          while (runTestFairnessStressfully) {
-            waitingList.add(uniqueTopic);
-            collaboration.acquireUninterruptibly(uniqueTopic);
-            try {
-              released = false;
-              fairnessList.add(uniqueTopic);
-              waitingList.remove(uniqueTopic);
-            } finally {
-              // wait for the other threads to line up...
-              WaitCriterion ev = new WaitCriterion() {
-                @Override
-                public boolean done() {
-                  return !runTestFairnessStressfully || waitingList.size() >= numThreads - 1;
-                }
-
-                @Override
-                public String description() {
-                  return "other threads lining up";
-                }
-              };
-              Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-              collaboration.release();
-              released = true;
-            }
-          }
-        } finally {
-          if (!released) {
-            collaboration.release();
-          }
-        }
-      }
-    };
-
-    try {
-      // many threads loop: acquire and release with unique topic
-      for (int t = 0; t < threads.length; t++) {
-        threads[t] = new Thread(group, run, String.valueOf(t));
-        threads[t].start();
-      }
-
-      log.info("Started all threads... waiting for test to complete.");
-
-      // wait for numThreads * 10
-      WaitCriterion ev = new WaitCriterion() {
-        @Override
-        public boolean done() {
-          return fairnessList.size() >= numThreads * 20;
-        }
-
-        @Override
-        public String description() {
-          return "waiting for numThreads * 10";
-        }
-      };
-      Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
-    } finally {
-      if (this.runTestFairnessStressfully) {
-        this.runTestFairnessStressfully = false;
-      }
-    }
-
-    for (int t = 0; t < threads.length; t++) {
-      ThreadUtils.join(threads[t], 30 * 1000);
-    }
-
-    // assert that all topics are acquired in order
-    // count number of occurrences of each thread
-    int count[] = new int[numThreads];
-    for (int i = 0; i < count.length; i++) { // shouldn't be necessary
-      count[i] = 0;
-    }
-    synchronized (this.fairnessList) {
-      for (Iterator iter = this.fairnessList.iterator(); iter.hasNext();) {
-        int id = Integer.valueOf((String) iter.next()).intValue();
-        count[id] = count[id] + 1;
-      }
-    }
-
-    int totalLocks = 0;
-    int minLocks = Integer.MAX_VALUE;
-    int maxLocks = 0;
-    for (int i = 0; i < count.length; i++) {
-      int locks = count[i];
-      this.log.fine("testFairnessStressfully thread-" + i + " acquired topic " + locks + " times.");
-      if (locks < minLocks)
-        minLocks = locks;
-      if (locks > maxLocks)
-        maxLocks = locks;
-      totalLocks = totalLocks + locks;
-    }
-
-    this.log.info("[testFairnessStressfully] totalLocks=" + totalLocks + " minLocks=" + minLocks
-        + " maxLocks=" + maxLocks);
-
-    int expectedLocks = (totalLocks / numThreads) + 1;
-
-    // NOTE: if you turn on fine logs, this deviation may be too small...
-    // slower machines may also fail depending on thread scheduling
-    int deviation = (int) (expectedLocks * 0.25);
-    int lowThreshold = expectedLocks - deviation;
-    int highThreshold = expectedLocks + deviation;
-
-    this.log.info("[testFairnessStressfully] deviation=" + deviation + " expectedLocks="
-        + expectedLocks + " lowThreshold=" + lowThreshold + " highThreshold=" + highThreshold);
-
-    // if these assertions keep failing we'll have to rewrite the test
-    // to handle scheduling of the threads...
-
-    assertTrue("minLocks is less than lowThreshold", minLocks >= lowThreshold);
-    assertTrue("maxLocks is greater than highThreshold", maxLocks <= highThreshold);
-  }
-
-  @Test
-  public void testHasCurrentTopic() throws Exception {
-    this.log.info("[testHasCurrentTopic]");
-    assertTrue(!this.collaboration.hasCurrentTopic());
-    this.collaboration.acquireUninterruptibly("testHasCurrentTopic");
-    try {
-      assertTrue(this.collaboration.hasCurrentTopic());
-    } finally {
-      this.collaboration.release();
-    }
-    assertTrue(!this.collaboration.hasCurrentTopic());
-  }
-
-  protected volatile boolean flagTestThreadHasCurrentTopic = false;
-
-  @Test
-  public void testThreadHasCurrentTopic() throws Exception {
-    this.log.info("[testThreadHasCurrentTopic]");
-    Thread thread = new Thread(group, new Runnable() {
-      @Override
-      public void run() {
-        collaboration.acquireUninterruptibly("testThreadHasCurrentTopic");
-        try {
-          flagTestThreadHasCurrentTopic = true;
-          WaitCriterion ev = new WaitCriterion() {
-            @Override
-            public boolean done() {
-              return !flagTestThreadHasCurrentTopic;
-            }
-
-            @Override
-            public String description() {
-              return null;
-            }
-          };
-          Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-        } finally {
-          collaboration.release();
-        }
-      }
-    });
-
-    // before starting thread, hasCurrentTopic(thread) returns false
-    assertTrue(!this.collaboration.hasCurrentTopic(thread));
-    thread.start();
-    WaitCriterion ev = new WaitCriterion() {
-      @Override
-      public boolean done() {
-        return flagTestThreadHasCurrentTopic;
-      }
-
-      @Override
-      public String description() {
-        return null;
-      }
-    };
-    Wait.waitForCriterion(ev, 60 * 1000, 200, true);
-
-    // after starting thread, hasCurrentTopic(thread) returns true
-    assertTrue(this.collaboration.hasCurrentTopic(thread));
-    this.flagTestThreadHasCurrentTopic = false;
-    ThreadUtils.join(thread, 30 * 1000);
-
-    // after thread finishes, hasCurrentTopic(thread) returns false
-    assertTrue(!this.collaboration.hasCurrentTopic(thread));
-  }
-
-  @Test
-  public void testIsCurrentTopic() throws Exception {
-    this.log.info("[testIsCurrentTopic]");
-    Object topic = "testIsCurrentTopic";
-    assertTrue(!this.collaboration.isCurrentTopic(topic));
-    this.collaboration.acquireUninterruptibly(topic);
-    try {
-      assertTrue(this.collaboration.isCurrentTopic(topic));
-    } finally {
-      this.collaboration.release();
-    }
-    assertTrue(!this.collaboration.isCurrentTopic(topic));
-  }
-
-  protected final ThreadGroup group = new ThreadGroup("CollaborationJUnitTest Threads") {
-    @Override
-    public void uncaughtException(Thread t, Throwable e) {
-      if (e instanceof VirtualMachineError) {
-        SystemFailure.setFailure((VirtualMachineError) e); // don't throw
-      }
-      String s = "Uncaught exception in thread " + t;
-      log.error(s, e);
-      fail(s);
-    }
-  };
-}
-