You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2016/12/08 22:50:03 UTC
geode git commit: GEODE-2175: wait for results from other members is
not needed in result collector. It's already handled by Function framework.
Repository: geode
Updated Branches:
refs/heads/develop f22809a44 -> 1d951e334
GEODE-2175: wait for results from other members is not needed in result collector. It's already handled by Function framework.
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1d951e33
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1d951e33
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1d951e33
Branch: refs/heads/develop
Commit: 1d951e334334393c2052307988a21c1cd1f11985
Parents: f22809a
Author: zhouxh <gz...@pivotal.io>
Authored: Thu Dec 8 14:47:56 2016 -0800
Committer: zhouxh <gz...@pivotal.io>
Committed: Thu Dec 8 14:47:56 2016 -0800
----------------------------------------------------------------------
.../TopEntriesFunctionCollector.java | 43 +-----
.../TopEntriesFunctionCollectorJUnitTest.java | 137 +------------------
2 files changed, 4 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/1d951e33/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
index d4a7008..66c4c0a 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollector.java
@@ -47,9 +47,6 @@ public class TopEntriesFunctionCollector
// Use this instance to perform reduce operation
final CollectorManager<TopEntriesCollector> manager;
- // latch to wait till all results are collected
- private final CountDownLatch waitForResults = new CountDownLatch(1);
-
final String id;
// Instance of gemfire cache to check status and other utility methods
@@ -83,36 +80,11 @@ public class TopEntriesFunctionCollector
@Override
public TopEntries getResult() throws FunctionException {
- try {
- waitForResults.await();
- } catch (InterruptedException e) {
- logger.debug("Interrupted while waiting for result collection", e);
- Thread.currentThread().interrupt();
- if (cache != null) {
- cache.getCancelCriterion().checkCancelInProgress(e);
- }
- throw new FunctionException(e);
- }
-
return aggregateResults();
}
@Override
public TopEntries getResult(long timeout, TimeUnit unit) throws FunctionException {
- try {
- boolean result = waitForResults.await(timeout, unit);
- if (!result) {
- throw new FunctionException("Did not receive results from all members within wait time");
- }
- } catch (InterruptedException e) {
- logger.debug("Interrupted while waiting for result collection", e);
- Thread.currentThread().interrupt();
- if (cache != null) {
- cache.getCancelCriterion().checkCancelInProgress(e);
- }
- throw new FunctionException(e);
- }
-
return aggregateResults();
}
@@ -128,20 +100,11 @@ public class TopEntriesFunctionCollector
}
@Override
- public void endResults() {
- synchronized (subResults) {
- waitForResults.countDown();
- }
- }
+ public void endResults() {}
@Override
public void clearResults() {
synchronized (subResults) {
- if (waitForResults.getCount() == 0) {
- throw new IllegalStateException(
- "This collector is closed and cannot accept anymore results");
- }
-
subResults.clear();
}
}
@@ -149,10 +112,6 @@ public class TopEntriesFunctionCollector
@Override
public void addResult(DistributedMember memberID, TopEntriesCollector resultOfSingleExecution) {
synchronized (subResults) {
- if (waitForResults.getCount() == 0) {
- throw new IllegalStateException(
- "This collector is closed and cannot accept anymore results");
- }
subResults.add(resultOfSingleExecution);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/1d951e33/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
index af868e2..bf08877 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/distributed/TopEntriesFunctionCollectorJUnitTest.java
@@ -63,25 +63,8 @@ public class TopEntriesFunctionCollectorJUnitTest {
@Test
public void testGetResultsBlocksTillEnd() throws Exception {
final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
- final CountDownLatch insideThread = new CountDownLatch(1);
- final CountDownLatch resultReceived = new CountDownLatch(1);
- Thread resultClient = new Thread(new Runnable() {
- @Override
- public void run() {
- insideThread.countDown();
- collector.getResult();
- resultReceived.countDown();
- }
- });
- resultClient.start();
-
- insideThread.await(1, TimeUnit.SECONDS);
- assertEquals(0, insideThread.getCount());
- assertEquals(1, resultReceived.getCount());
-
- collector.endResults();
- resultReceived.await(1, TimeUnit.SECONDS);
- assertEquals(0, resultReceived.getCount());
+ TopEntries merged = collector.getResult();
+ assertEquals(0, merged.size());
}
@Test
@@ -94,111 +77,11 @@ public class TopEntriesFunctionCollectorJUnitTest {
final CountDownLatch resultReceived = new CountDownLatch(1);
final AtomicReference<TopEntries> result = new AtomicReference<>();
-
- Thread resultClient = new Thread(new Runnable() {
- @Override
- public void run() {
- insideThread.countDown();
- result.set(collector.getResult(1, TimeUnit.SECONDS));
- resultReceived.countDown();
- }
- });
- resultClient.start();
-
- insideThread.await(1, TimeUnit.SECONDS);
- assertEquals(0, insideThread.getCount());
- assertEquals(1, resultReceived.getCount());
-
- collector.endResults();
-
- resultReceived.await(1, TimeUnit.SECONDS);
- assertEquals(0, resultReceived.getCount());
-
- TopEntries merged = result.get();
+ TopEntries merged = collector.getResult(1, TimeUnit.SECONDS);
assertEquals(4, merged.size());
TopEntriesJUnitTest.verifyResultOrder(merged.getHits(), r1_1, r2_1, r1_2, r2_2);
}
- @Test(expected = FunctionException.class)
- public void testGetResultsWaitInterrupted() throws Exception {
- interruptWhileWaiting(false);
- }
-
- @Test(expected = FunctionException.class)
- public void testGetResultsTimedWaitInterrupted() throws Exception {
- interruptWhileWaiting(false);
- }
-
- private void interruptWhileWaiting(final boolean timedWait)
- throws InterruptedException, Exception {
- GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class);
- final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null, mockCache);
-
- final CountDownLatch insideThread = new CountDownLatch(1);
- final CountDownLatch endGetResult = new CountDownLatch(1);
- final AtomicReference<Exception> exception = new AtomicReference<>();
-
- Thread resultClient = new Thread(new Runnable() {
- @Override
- public void run() {
- insideThread.countDown();
- try {
- if (timedWait) {
- collector.getResult(1, TimeUnit.SECONDS);
- } else {
- collector.getResult();
- }
- } catch (FunctionException e) {
- exception.set(e);
- endGetResult.countDown();
- }
- }
- });
- resultClient.start();
-
- insideThread.await(1, TimeUnit.SECONDS);
- assertEquals(0, insideThread.getCount());
- assertEquals(1, endGetResult.getCount());
-
- CancelCriterion mockCriterion = mock(CancelCriterion.class);
- when(mockCache.getCancelCriterion()).thenReturn(mockCriterion);
- resultClient.interrupt();
- endGetResult.await(1, TimeUnit.SECONDS);
- assertEquals(0, endGetResult.getCount());
- throw exception.get();
- }
-
- @Test(expected = FunctionException.class)
- public void expectErrorAfterWaitTime() throws Exception {
- final TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector(null);
-
- final CountDownLatch insideThread = new CountDownLatch(1);
- final CountDownLatch endGetResult = new CountDownLatch(1);
- final AtomicReference<Exception> exception = new AtomicReference<>();
-
- Thread resultClient = new Thread(new Runnable() {
- @Override
- public void run() {
- insideThread.countDown();
- try {
- collector.getResult(10, TimeUnit.MILLISECONDS);
- } catch (FunctionException e) {
- exception.set(e);
- endGetResult.countDown();
- }
- }
- });
- resultClient.start();
-
- insideThread.await(1, TimeUnit.SECONDS);
- assertEquals(0, insideThread.getCount());
- assertEquals(1, endGetResult.getCount());
-
- endGetResult.await(1, TimeUnit.SECONDS);
- assertEquals(0, endGetResult.getCount());
- throw exception.get();
- }
-
@Test
public void mergeShardAndLimitResults() throws Exception {
LuceneFunctionContext<TopEntriesCollector> context =
@@ -299,20 +182,6 @@ public class TopEntriesFunctionCollectorJUnitTest {
collector.getResult();
}
- @Test(expected = IllegalStateException.class)
- public void addResultDisallowedAfterEndResult() {
- TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
- collector.endResults();
- collector.addResult(null, new TopEntriesCollector(null));
- }
-
- @Test(expected = IllegalStateException.class)
- public void clearResultDisallowedAfterEndResult() {
- TopEntriesFunctionCollector collector = new TopEntriesFunctionCollector();
- collector.endResults();
- collector.clearResults();
- }
-
@Test
public void testCollectorName() {
GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class);