You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/03/29 20:25:46 UTC
[03/45] incubator-geode git commit: GEODE-1035 CI failure:
DeltaPropagationWithCQDUnitTest.testFullValueRequestsWithCqWithoutRI
GEODE-1035 CI failure: DeltaPropagationWithCQDUnitTest.testFullValueRequestsWithCqWithoutRI
GEODE-1035 #close The test had verification-step problems
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7e5f16d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7e5f16d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7e5f16d6
Branch: refs/heads/feature/GEODE-52
Commit: 7e5f16d688c7be5b8a85657fed1ec041c8ffb535
Parents: 0e3a60b
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon Mar 28 15:13:06 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon Mar 28 15:13:06 2016 -0700
----------------------------------------------------------------------
.../DeltaPropagationWithCQDUnitTest.java | 63 ++++++++++++++++----
1 file changed, 50 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e5f16d6/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
index 6e5521a..1ea7970 100644
--- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
@@ -16,7 +16,11 @@
*/
package com.gemstone.gemfire.internal.cache.tier.sockets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
import java.util.Properties;
+import java.util.Set;
import com.gemstone.gemfire.DeltaTestImpl;
import com.gemstone.gemfire.cache.AttributesFactory;
@@ -87,6 +91,8 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
private static long cqEvents = 0;
private static long cqErrors = 0;
+
+ private static long deltasFound = 0;
/**
* @param name
@@ -181,8 +187,10 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
client1.invoke(() -> DeltaPropagationWithCQDUnitTest.doPuts(numOfKeys, true));
// verify client2's CQ listeners see above puts
verifyCqListeners(numOfListeners * numOfKeys * numOfCQs * 2);
+ // verify number of deltas encountered in this client
+ assertEquals(numOfKeys, deltasFound);
// verify full value requests at server
- server1.invoke(() -> DeltaPropagationWithCQDUnitTest.verifyFullValueRequestsFromClients(10L));
+ server1.invoke(() -> DeltaPropagationWithCQDUnitTest.verifyFullValueRequestsFromClients(numOfKeys*1l));
}
public static void verifyCqListeners(final Integer events) throws Exception {
@@ -193,6 +201,7 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
}
public boolean done() {
+ System.out.println("verifyCqListeners: expected total="+events+"; cqEvents="+cqEvents+"; cqErrors="+cqErrors);
return (cqEvents + cqErrors) == events;
}
};
@@ -201,19 +210,23 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
public static void verifyFullValueRequestsFromClients(Long expected)
throws Exception {
- Object[] proxies = ((CacheServerImpl)((GemFireCacheImpl)cache)
- .getCacheServers().get(0)).getAcceptor().getCacheClientNotifier()
- .getClientProxies().toArray();
- long fullValueRequests = ((CacheClientProxy)proxies[0]).getStatistics()
- .getDeltaFullMessagesSent();
- if (fullValueRequests == 0) {
- assertEquals("Full value requests, ", expected.longValue(),
- ((CacheClientProxy)proxies[1]).getStatistics()
- .getDeltaFullMessagesSent());
- } else {
- assertEquals("Full value requests, ", expected.longValue(),
- fullValueRequests);
+ List<CacheServerImpl> servers = ((GemFireCacheImpl)cache).getCacheServers();
+ assertEquals("expected one server but found these: " + servers, 1, servers.size());
+
+ CacheClientProxy[] proxies = servers.get(0).getAcceptor().getCacheClientNotifier()
+ .getClientProxies().toArray(new CacheClientProxy[0]);
+
+ // find the proxy for the client that processed the CQs - it will have
+ // incremented its deltaFullMessagesSent statistic when the listener invoked
+ // getValue() on the event and caused a RequestEventValue command to be
+ // invoked on the server
+ long fullValueRequests = 0;
+ for (int i=0; (i < proxies.length) && (fullValueRequests <= 0l); i++) {
+ CacheClientProxy proxy = proxies[i];
+ fullValueRequests = proxy.getStatistics().getDeltaFullMessagesSent();
}
+
+ assertEquals("Full value requests, ", expected.longValue(), fullValueRequests);
}
public static void doPut(Object key, Object value) throws Exception {
@@ -314,13 +327,37 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
for (int i = 0; i < numOfListeners; i++) {
cqListeners[i] = new CqListenerAdapter() {
public void onEvent(CqEvent event) {
+ System.out.println("CqListener.onEvent invoked. Event="+event);
+ if (event.getDeltaValue() != null) {
+ deltasFound++;
+ }
+ // The first CQ event dispatched with a delta will not have a newValue.
+ // Attempting to access the newValue will cause an exception to be
+ // thrown, exiting this listener and causing the full value to be
+ // read from the server. The listener is then invoked a second time
+ // and getNewValue will succeed
event.getNewValue();
+ if (event.getDeltaValue() != null) {
+ // if there's a newValue we should ignore the delta bytes
+ deltasFound--;
+ }
+ System.out.println("deltasFound="+deltasFound);
cqEvents++;
+ System.out.println("cqEvents is now " + cqEvents);
}
public void onError(CqEvent event) {
+ System.out.println("CqListener.onError invoked. Event="+event);
+ if (event.getDeltaValue() != null) {
+ deltasFound++;
+ }
event.getNewValue();
+ if (event.getDeltaValue() != null) {
+ deltasFound--;
+ }
+ System.out.println("deltasFound="+deltasFound);
cqErrors++;
+ System.out.println("cqErrors is now " + cqErrors);
}
};
caf.addCqListener(cqListeners[i]);