You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/03/29 20:25:46 UTC

[03/45] incubator-geode git commit: GEODE-1035 CI failure: DeltaPropagationWithCQDUnitTest.testFullValueRequestsWithCqWithoutRI

GEODE-1035 CI failure: DeltaPropagationWithCQDUnitTest.testFullValueRequestsWithCqWithoutRI

GEODE-1035 #close The test had verification-step problems


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7e5f16d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7e5f16d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7e5f16d6

Branch: refs/heads/feature/GEODE-52
Commit: 7e5f16d688c7be5b8a85657fed1ec041c8ffb535
Parents: 0e3a60b
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon Mar 28 15:13:06 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon Mar 28 15:13:06 2016 -0700

----------------------------------------------------------------------
 .../DeltaPropagationWithCQDUnitTest.java        | 63 ++++++++++++++++----
 1 file changed, 50 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7e5f16d6/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
index 6e5521a..1ea7970 100644
--- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DeltaPropagationWithCQDUnitTest.java
@@ -16,7 +16,11 @@
  */
 package com.gemstone.gemfire.internal.cache.tier.sockets;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import com.gemstone.gemfire.DeltaTestImpl;
 import com.gemstone.gemfire.cache.AttributesFactory;
@@ -87,6 +91,8 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
   private static long cqEvents = 0;
 
   private static long cqErrors = 0;
+  
+  private static long deltasFound = 0;
 
   /**
    * @param name
@@ -181,8 +187,10 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
     client1.invoke(() -> DeltaPropagationWithCQDUnitTest.doPuts(numOfKeys, true));
     // verify client2's CQ listeners see above puts
     verifyCqListeners(numOfListeners * numOfKeys * numOfCQs * 2);
+    // verify number of deltas encountered in this client
+    assertEquals(numOfKeys, deltasFound);
     // verify full value requests at server 
-    server1.invoke(() -> DeltaPropagationWithCQDUnitTest.verifyFullValueRequestsFromClients(10L));
+    server1.invoke(() -> DeltaPropagationWithCQDUnitTest.verifyFullValueRequestsFromClients(numOfKeys*1l));
   }
 
   public static void verifyCqListeners(final Integer events) throws Exception {
@@ -193,6 +201,7 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
       }
 
       public boolean done() {
+        System.out.println("verifyCqListeners: expected total="+events+"; cqEvents="+cqEvents+"; cqErrors="+cqErrors);
         return (cqEvents + cqErrors) == events;
       }
     };
@@ -201,19 +210,23 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
 
   public static void verifyFullValueRequestsFromClients(Long expected)
       throws Exception {
-    Object[] proxies = ((CacheServerImpl)((GemFireCacheImpl)cache)
-        .getCacheServers().get(0)).getAcceptor().getCacheClientNotifier()
-        .getClientProxies().toArray();
-    long fullValueRequests = ((CacheClientProxy)proxies[0]).getStatistics()
-        .getDeltaFullMessagesSent();
-    if (fullValueRequests == 0) {
-      assertEquals("Full value requests, ", expected.longValue(),
-          ((CacheClientProxy)proxies[1]).getStatistics()
-          .getDeltaFullMessagesSent());
-    } else {
-      assertEquals("Full value requests, ", expected.longValue(),
-          fullValueRequests);
+    List<CacheServerImpl> servers = ((GemFireCacheImpl)cache).getCacheServers();
+    assertEquals("expected one server but found these: " + servers, 1, servers.size());
+
+    CacheClientProxy[] proxies = servers.get(0).getAcceptor().getCacheClientNotifier()
+        .getClientProxies().toArray(new CacheClientProxy[0]);
+    
+    // find the proxy for the client that processed the CQs - it will have
+    // incremented its deltaFullMessagesSent statistic when the listener invoked
+    // getValue() on the event and caused a RequestEventValue command to be
+    // invoked on the server
+    long fullValueRequests = 0;
+    for (int i=0; (i < proxies.length) && (fullValueRequests <= 0l); i++) {
+      CacheClientProxy proxy = proxies[i];
+      fullValueRequests = proxy.getStatistics().getDeltaFullMessagesSent();
     }
+    
+    assertEquals("Full value requests, ", expected.longValue(), fullValueRequests);
   }
 
   public static void doPut(Object key, Object value) throws Exception {
@@ -314,13 +327,37 @@ public class DeltaPropagationWithCQDUnitTest extends DistributedTestCase {
     for (int i = 0; i < numOfListeners; i++) {
       cqListeners[i] = new CqListenerAdapter() {
         public void onEvent(CqEvent event) {
+          System.out.println("CqListener.onEvent invoked.  Event="+event);
+          if (event.getDeltaValue() != null) {
+            deltasFound++;
+          }
+          // The first CQ event dispatched with a delta will not have a newValue.
+          // Attempting to access the newValue will cause an exception to be
+          // thrown, exiting this listener and causing the full value to be
+          // read from the server.  The listener is then invoked a second time
+          // and getNewValue will succeed
           event.getNewValue();
+          if (event.getDeltaValue() != null) {
+            // if there's a newValue we should ignore the delta bytes
+            deltasFound--;
+          }
+          System.out.println("deltasFound="+deltasFound);
           cqEvents++;
+          System.out.println("cqEvents is now " + cqEvents);
         }
 
         public void onError(CqEvent event) {
+          System.out.println("CqListener.onError invoked.  Event="+event);
+          if (event.getDeltaValue() != null) {
+            deltasFound++;
+          }
           event.getNewValue();
+          if (event.getDeltaValue() != null) {
+            deltasFound--;
+          }
+          System.out.println("deltasFound="+deltasFound);
           cqErrors++;
+          System.out.println("cqErrors is now " + cqErrors);
         }
       };
       caf.addCqListener(cqListeners[i]);