You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2018/04/24 22:34:44 UTC

[geode] branch develop updated: Revert "GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up"

This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7c4c4e5  Revert "GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up"
7c4c4e5 is described below

commit 7c4c4e57cefcf49e2ea2a69250176c63ffcffc44
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Tue Apr 24 15:33:47 2018 -0700

    Revert "GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up"
    
    This reverts commit 9145fa352e84724dd21d598a855210c71340acfd.
    
    Reverting due to spotless issue
---
 .../dunit/QueryDataInconsistencyDUnitTest.java     | 745 ++++++++++++---------
 1 file changed, 421 insertions(+), 324 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
index 3e6c192..20d3a78 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
@@ -14,23 +14,10 @@
  */
 package org.apache.geode.cache.query.dunit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.assertj.core.api.Assertions.fail;
+import static org.junit.Assert.*;
 
-import java.io.Serializable;
 import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-import org.assertj.core.api.Assertions;
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -52,20 +39,18 @@ import org.apache.geode.cache.query.data.Position;
 import org.apache.geode.cache.query.internal.QueryObserverHolder;
 import org.apache.geode.cache.query.internal.index.IndexManager;
 import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.cache.execute.PRClientServerTestBase;
-import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.CleanupDUnitVMsRule;
-import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.apache.geode.test.junit.categories.FlakyTest;
 import org.apache.geode.test.junit.categories.OQLIndexTest;
@@ -74,418 +59,530 @@ import org.apache.geode.test.junit.categories.OQLIndexTest;
  * This tests the data inconsistency during update on an index and querying the same UNLOCKED index.
  */
 @Category({DistributedTest.class, OQLIndexTest.class})
-public class QueryDataInconsistencyDUnitTest implements Serializable {
+public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase {
 
   private static final int cnt = 0;
+
   private static final int cntDest = 10;
-  private static VM server = null;
-  private static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default name
-  private static String repRegionName = "TestRepRegion"; // default name
-  private static volatile boolean hooked = false;
 
-  private Logger logger = LogService.getLogger(QueryDataInconsistencyDUnitTest.class.getName());
+  static VM server = null;
+
+  static VM client = null;
+
+  static Cache cache = null;
+
+  static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default
+                                                                   // name
+  static String repRegionName = "TestRepRegion"; // default name
+
+  static Integer serverPort1 = null;
+
+  public static int numOfBuckets = 20;
+
+  public static String[] queries =
+      new String[] {"select * from /" + PartitionedRegionName1 + " where ID=1",};
 
-  @ClassRule
-  public static DistributedTestRule distributedTestRule = new DistributedTestRule(1);
+  public static String[] queriesForRR =
+      new String[] {"<trace> select * from /" + repRegionName + " where ID=1"};
 
-  @Rule
-  public CacheRule cacheRule = CacheRule.builder().createCacheInAll().disconnectAfter().build();
+  public static volatile boolean hooked = false;
 
-  @Before
-  public void initialize()
-  {
-    server = Host.getHost(0).getVM(0);
+  public QueryDataInconsistencyDUnitTest() {
+    super();
   }
 
-  @After
-  public final void postTearDownCacheTestCase() {
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    Invoke.invokeInEveryVM(JUnit4DistributedTestCase::disconnectFromDS);
     Invoke.invokeInEveryVM(QueryObserverHolder::reset);
   }
 
-  @BeforeClass
-  public static final void postSetUp() {
-    Awaitility.setDefaultPollInterval(200, TimeUnit.MILLISECONDS);
-    Awaitility.waitAtMost(30, TimeUnit.SECONDS);
+  @Override
+  public final void postSetUp() throws Exception {
+    Host host = Host.getHost(0);
+    server = host.getVM(0);
   }
 
   @Test
   public void testCompactRangeIndex() {
     // Create caches
-    server.invoke("create indexes", () -> {
-      Cache cache = cacheRule.getCache();
-      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+    Properties props = new Properties();
+    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
 
-      // Create common Portflios and NewPortfolios
-      for (int j = cnt; j < cntDest; j++) {
-        region.put(new Integer(j), new Portfolio(j));
-      }
+    server.invoke(new CacheSerializableRunnable("create indexes") {
+
+      @Override
+      public void run2() throws CacheException {
+        cache = CacheFactory.getAnyInstance();
+        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+
+        // Create common Portflios and NewPortfolios
+        for (int j = cnt; j < cntDest; j++) {
+          region.put(new Integer(j), new Portfolio(j));
+        }
 
-      QueryService queryService = cache.getQueryService();
-      try {
-        Index index = queryService.createIndex("idIndex", "ID", "/" + repRegionName);
-        assertEquals(10, index.getStatistics().getNumberOfKeys());
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Index creation failed");
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        try {
+          Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName);
+          assertEquals(10, index.getStatistics().getNumberOfKeys());
+        } catch (Exception e) {
+          fail("Index creation failed");
+        }
       }
     });
     // Invoke update from client and stop in updateIndex
-    // firesultSett before updating the RegionEntry and second after updating
+    // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
-      Region repRegion = cacheRule.getCache().getRegion(repRegionName);
-      IndexManager.testHook = new IndexManagerTestHook();
-      repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
-      // above call must be hooked in BEFORE_UPDATE_OP call.
-    });
-    server.invoke("query on server", () -> {
-      QueryService queryService = cacheRule.getCache().getQueryService();
-      Awaitility.await().until(() -> hooked);
-      Object resultSet = null;
-      try {
-        resultSet =
-            queryService.newQuery("<trace> select * from /" + repRegionName + " where ID = 1")
-                .execute();
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Query execution failed on server.");
-        IndexManager.testHook = null;
-      }
-      assertTrue(resultSet instanceof SelectResults);
-      assertEquals(1, ((SelectResults) resultSet).size());
-      Portfolio p1 = (Portfolio) ((SelectResults) resultSet).asList().get(0);
-      if (p1.getID() != 1) {
-        fail("Query thread did not verify index results even when RE is under update");
-        IndexManager.testHook = null;
+    AsyncInvocation putThread =
+        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
+
+          @Override
+          public void run2() throws CacheException {
+            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+            IndexManager.testHook = new IndexManagerTestHook();
+            repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
+            // above call must be hooked in BEFORE_UPDATE_OP call.
+          }
+        });
+    server.invoke(new CacheSerializableRunnable("query on server") {
+
+      @Override
+      public void run2() throws CacheException {
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        Object rs = null;
+        try {
+          rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail("Query execution failed on server.");
+          IndexManager.testHook = null;
+        }
+        assertTrue(rs instanceof SelectResults);
+        assertEquals(1, ((SelectResults) rs).size());
+        Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0);
+        if (p1.getID() != 1) {
+          fail("Query thread did not verify index results even when RE is under update");
+          IndexManager.testHook = null;
+        }
+        hooked = false;// Let client put go further.
       }
-      hooked = false;// Let client put go further.
     });
 
     // Client put is again hooked in AFTER_UPDATE_OP call in updateIndex.
-    server.invoke("query on server", () -> {
-      QueryService queryService = cacheRule.getCache().getQueryService();
-      Awaitility.await().until(() -> hooked);
-      Object resultSet = null;
-      try {
-        resultSet =
-            queryService.newQuery("<trace> select * from /" + repRegionName + " where ID = 1")
-                .execute();
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Query execution failed on server." + e.getMessage());
-      } finally {
-        IndexManager.testHook = null;
-      }
-      assertTrue(resultSet instanceof SelectResults);
-      if (((SelectResults) resultSet).size() > 0) {
-        Portfolio p1 = (Portfolio) ((SelectResults) resultSet).iterator().next();
-        if (p1.getID() != 1) {
-          fail("Query thread did not verify index results even when RE is under update and "
-              + "RegionEntry value has been modified before releasing the lock");
+    server.invoke(new CacheSerializableRunnable("query on server") {
+
+      @Override
+      public void run2() throws CacheException {
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        Object rs = null;
+        try {
+          rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail("Query execution failed on server." + e.getMessage());
+        } finally {
           IndexManager.testHook = null;
         }
+        assertTrue(rs instanceof SelectResults);
+        if (((SelectResults) rs).size() > 0) {
+          Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next();
+          if (p1.getID() != 1) {
+            fail("Query thread did not verify index results even when RE is under update and "
+                + "RegionEntry value has been modified before releasing the lock");
+            IndexManager.testHook = null;
+          }
+        }
+        hooked = false;// Let client put go further.
       }
-      hooked = false;// Let client put go further.
     });
-    Awaitility.await().until(joinThread(putThread));
-    // ThreadUtils.join(putThread, 200);
+    ThreadUtils.join(putThread, 200);
   }
 
   @Test
   public void testRangeIndex() {
     // Create caches
-    server.invoke("create indexes", () -> {
-      Cache cache = cacheRule.getCache();
-      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-      IndexManager.testHook = null;
-      // Create common Portfolios and NewPortfolios
-      Position.cnt = 0;
-      for (int j = cnt; j < cntDest; j++) {
-        Portfolio p = new Portfolio(j);
-        cache.getLogger().fine("Shobhit: portfolio " + j + " : " + p);
-        region.put(j, p);
-      }
+    Properties props = new Properties();
+    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
+
+    server.invoke(new CacheSerializableRunnable("create indexes") {
 
-      QueryService queryService = cache.getQueryService();
-      try {
-        Index index = queryService.createIndex("posIndex", "pos.secId",
-            "/" + repRegionName + " p, p.positions.values pos");
-        assertEquals(12, index.getStatistics().getNumberOfKeys());
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Index creation failed");
+      @Override
+      public void run2() throws CacheException {
+        cache = CacheFactory.getAnyInstance();
+        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+        IndexManager.testHook = null;
+        // Create common Portfolios and NewPortfolios
+        Position.cnt = 0;
+        for (int j = cnt; j < cntDest; j++) {
+          Portfolio p = new Portfolio(j);
+          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " : " + p);
+          region.put(new Integer(j), p);
+        }
+
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        try {
+          Index index = qs.createIndex("posIndex", "pos.secId",
+              "/" + repRegionName + " p, p.positions.values pos");
+          assertEquals(12, index.getStatistics().getNumberOfKeys());
+        } catch (Exception e) {
+          fail("Index creation failed");
+        }
       }
     });
     // Invoke update from client and stop in updateIndex
-    // firesultSett before updating the RegionEntry and second after updating
+    // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
-      Cache cache = cacheRule.getCache();
-      Region repRegion = cache.getRegion(repRegionName);
-      IndexManager.testHook = new IndexManagerTestHook();
-      Portfolio newPort = new Portfolio(cntDest + 1);
-      cache.getLogger().fine("Shobhit: New Portfolio" + newPort);
-      repRegion.put(new Integer("1"), newPort);
-      // above call must be hooked in BEFORE_UPDATE_OP call.
-    });
+    AsyncInvocation putThread =
+        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
 
-    server.invoke("query on server", () -> {
-      Cache cache = cacheRule.getCache();
-      QueryService queryService = cache.getQueryService();
-      Position pos1 = null;
-      Awaitility.await().until(() -> hooked);
-      try {
-        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
-            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-        cache.getLogger().fine("Shobhit: " + resultSet);
-        assertTrue(resultSet instanceof SelectResults);
-        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
-        if (!pos1.secId.equals("APPL")) {
-          fail("Query thread did not verify index results even when RE is under update");
+          @Override
+          public void run2() throws CacheException {
+            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+            IndexManager.testHook = new IndexManagerTestHook();
+            Portfolio newPort = new Portfolio(cntDest + 1);
+            CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort);
+            repRegion.put(new Integer("1"), newPort);
+            // above call must be hooked in BEFORE_UPDATE_OP call.
+          }
+        });
+
+    server.invoke(new CacheSerializableRunnable("query on server") {
+
+      @Override
+      public void run2() throws CacheException {
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        Position pos1 = null;
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        try {
+          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+          assertTrue(rs instanceof SelectResults);
+          pos1 = (Position) ((SelectResults) rs).iterator().next();
+          if (!pos1.secId.equals("APPL")) {
+            fail("Query thread did not verify index results even when RE is under update");
+            IndexManager.testHook = null;
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          Assert.fail("Query execution failed on server.", e);
           IndexManager.testHook = null;
+        } finally {
+          hooked = false;// Let client put go further.
         }
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Query execution failed on server.", e);
-        IndexManager.testHook = null;
-      } finally {
-        hooked = false;// Let client put go further.
-      }
-      Awaitility.await().until(() -> hooked);
-      try {
-        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
-            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-        cache.getLogger().fine("Shobhit: " + resultSet);
-        assertTrue(resultSet instanceof SelectResults);
-        if (((SelectResults) resultSet).size() > 0) {
-          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
-          if (pos2.equals(pos1)) {
-            fail("Query thread did not verify index results even when RE is under update and "
-                + "RegionEntry value has been modified before releasing the lock");
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        try {
+          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+          assertTrue(rs instanceof SelectResults);
+          if (((SelectResults) rs).size() > 0) {
+            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+            if (pos2.equals(pos1)) {
+              fail("Query thread did not verify index results even when RE is under update and "
+                  + "RegionEntry value has been modified before releasing the lock");
+            }
           }
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail("Query execution failed on server.");
+        } finally {
+          hooked = false;// Let client put go further.
+          IndexManager.testHook = null;
         }
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Query execution failed on server.");
-      } finally {
-        hooked = false;// Let client put go further.
-        IndexManager.testHook = null;
       }
     });
-    Awaitility.await().until(joinThread(putThread));
+    ThreadUtils.join(putThread, 200);
   }
 
   @Category(FlakyTest.class) // GEODE-925: time sensitive, async actions, short timeouts
   @Test
-  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() {
+  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() { // TODO: fix misspelling
     // Create caches
-    server.invoke("create indexes", () -> {
-      Cache cache = cacheRule.getCache();
-      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-      IndexManager.testHook = null;
-      // Create common Portfolios and NewPortfolios
-      Position.cnt = 0;
-      for (int j = cnt; j < cntDest; j++) {
-        region.put(j, new Portfolio(j));
-      }
+    Properties props = new Properties();
+    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
+
+    server.invoke(new CacheSerializableRunnable("create indexes") {
+
+      @Override
+      public void run2() throws CacheException {
+        cache = CacheFactory.getAnyInstance();
+        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+        IndexManager.testHook = null;
+        // Create common Portfolios and NewPortfolios
+        Position.cnt = 0;
+        for (int j = cnt; j < cntDest; j++) {
+          region.put(new Integer(j), new Portfolio(j));
+        }
 
-      QueryService queryService = cache.getQueryService();
-      try {
-        Index index = queryService.createIndex("posIndex", "pos.secId",
-            "/" + repRegionName + " p, p.collectionHolderMap.values coll, p.positions.values pos");
-        assertEquals(12, index.getStatistics().getNumberOfKeys());
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Index creation failed");
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        try {
+          Index index = qs.createIndex("posIndex", "pos.secId", "/" + repRegionName
+              + " p, p.collectionHolderMap.values coll, p.positions.values pos");
+          assertEquals(12, index.getStatistics().getNumberOfKeys());
+        } catch (Exception e) {
+          fail("Index creation failed");
+        }
       }
     });
     // Invoke update from client and stop in updateIndex
-    // firesultSett before updating the RegionEntry and second after updating
+    // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
-      Region repRegion = cacheRule.getCache().getRegion(repRegionName);
-      IndexManager.testHook = new IndexManagerTestHook();
-      // This portfolio with same ID must have different positions.
-      repRegion.put(new Integer("1"), new Portfolio(1));
-      // above call must be hooked in BEFORE_UPDATE_OP call.
-    });
+    AsyncInvocation putThread =
+        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
 
-    server.invoke("query on server", () -> {
-      Cache cache = cacheRule.getCache();
-      QueryService queryService = cache.getQueryService();
-      Position pos1 = null;
-      Awaitility.await().until(() -> hooked);
-      try {
-        Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
-            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-        cache.getLogger().fine("Shobhit: " + resultSet);
-        assertTrue(resultSet instanceof SelectResults);
-        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
-        if (!pos1.secId.equals("APPL")) {
-          fail("Query thread did not verify index results even when RE is under update");
+          @Override
+          public void run2() throws CacheException {
+            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+            IndexManager.testHook = new IndexManagerTestHook();
+            // This portfolio with same ID must have different positions.
+            repRegion.put(new Integer("1"), new Portfolio(1));
+            // above call must be hooked in BEFORE_UPDATE_OP call.
+          }
+        });
+
+    server.invoke(new CacheSerializableRunnable("query on server") {
+
+      @Override
+      public void run2() throws CacheException {
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        Position pos1 = null;
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        try {
+          Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+          assertTrue(rs instanceof SelectResults);
+          pos1 = (Position) ((SelectResults) rs).iterator().next();
+          if (!pos1.secId.equals("APPL")) {
+            fail("Query thread did not verify index results even when RE is under update");
+            IndexManager.testHook = null;
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          Assert.fail("Query execution failed on server.", e);
           IndexManager.testHook = null;
+        } finally {
+          hooked = false;// Let client put go further.
         }
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Query execution failed on server.");
-        IndexManager.testHook = null;
-      } finally {
-        hooked = false;// Let client put go further.
-      }
-      Awaitility.await().until(() -> hooked);
-      try {
-        Object resultSet = queryService.newQuery("select pos from /" + repRegionName
-            + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
-        assertTrue(resultSet instanceof SelectResults);
-        if (((SelectResults) resultSet).size() > 0) {
-          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
-          if (pos2.equals(pos1)) {
-            fail("Query thread did not verify index results even when RE is under update and "
-                + "RegionEntry value has been modified before releasing the lock");
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        try {
+          Object rs = qs.newQuery("select pos from /" + repRegionName
+              + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+          assertTrue(rs instanceof SelectResults);
+          if (((SelectResults) rs).size() > 0) {
+            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+            if (pos2.equals(pos1)) {
+              fail("Query thread did not verify index results even when RE is under update and "
+                  + "RegionEntry value has been modified before releasing the lock");
+            }
           }
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail("Query execution failed on server.");
+        } finally {
+          hooked = false;// Let client put go further.
+          IndexManager.testHook = null;
         }
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Query execution failed on server.");
-      } finally {
-        hooked = false;// Let client put go further.
-        IndexManager.testHook = null;
       }
     });
-    Awaitility.await().until(joinThread(putThread));
-    // ThreadUtils.join(putThread, 200); // GEODE-925 occuresultSet here and this is very short join 200
-    // millis
+    ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short join 200
+                                      // millis
   }
 
   @Test
-  public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() {
+  public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() {
     // Create caches
-    server.invoke("create indexes", () -> {
-      Cache cache = cacheRule.getCache();
-      Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
-      IndexManager.testHook = null;
-      // Create common Portfolios and NewPortfolios
-      Position.cnt = 0;
-      for (int j = cnt; j < cntDest; j++) {
-        region.put(new Integer(j), new Portfolio(j));
-      }
+    Properties props = new Properties();
+    server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
 
-      QueryService queryService = cache.getQueryService();
-      try {
-        Index index = queryService.createIndex("posIndex", "pos.secId",
-            "/" + repRegionName + " p, p.positions.values pos");
-        assertEquals(12, index.getStatistics().getNumberOfKeys());
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Index creation failed");
+    server.invoke(new CacheSerializableRunnable("create indexes") {
+
+      @Override
+      public void run2() throws CacheException {
+        cache = CacheFactory.getAnyInstance();
+        Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+        IndexManager.testHook = null;
+        // Create common Portfolios and NewPortfolios
+        Position.cnt = 0;
+        for (int j = cnt; j < cntDest; j++) {
+          region.put(new Integer(j), new Portfolio(j));
+        }
+
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        try {
+          Index index = qs.createIndex("posIndex", "pos.secId",
+              "/" + repRegionName + " p, p.positions.values pos");
+          assertEquals(12, index.getStatistics().getNumberOfKeys());
+        } catch (Exception e) {
+          fail("Index creation failed");
+        }
       }
     });
     // Invoke update from client and stop in updateIndex
-    // firesultSett before updating the RegionEntry and second after updating
+    // first before updating the RegionEntry and second after updating
     // the RegionEntry.
-    AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
-      Cache cache = cacheRule.getCache();
-      Region repRegion = cache.getRegion(repRegionName);
-      IndexManager.testHook = new IndexManagerTestHook();
-      // This portfolio with same ID must have different positions.
-      repRegion.put(new Integer("1"), new Portfolio(1));
-      // above call must be hooked in BEFORE_UPDATE_OP call.
-    });
+    AsyncInvocation putThread =
+        server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
 
-    server.invoke("query on server", () -> {
-      Cache cache = cacheRule.getCache();
-      QueryService queryService = cache.getQueryService();
-      Position pos1 = null;
-      Awaitility.await().until(() -> hooked);
-      try {
-        Object resultSet = queryService
-            .newQuery("<trace> select pos from /" + repRegionName
-                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
-            .execute();
-        cache.getLogger().fine("Shobhit: " + resultSet);
-        assertTrue(resultSet instanceof SelectResults);
-        pos1 = (Position) ((SelectResults) resultSet).iterator().next();
-        if (!pos1.secId.equals("APPL")) {
-          fail("Query thread did not verify index results even when RE is under update");
+          @Override
+          public void run2() throws CacheException {
+            Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+            IndexManager.testHook = new IndexManagerTestHook();
+            // This portfolio with same ID must have different positions.
+            repRegion.put(new Integer("1"), new Portfolio(1));
+            // above call must be hooked in BEFORE_UPDATE_OP call.
+          }
+        });
+
+    server.invoke(new CacheSerializableRunnable("query on server") {
+
+      @Override
+      public void run2() throws CacheException {
+        QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+        Position pos1 = null;
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        try {
+          Object rs = qs
+              .newQuery("<trace> select pos from /" + repRegionName
+                  + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
+              .execute();
+          CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+          assertTrue(rs instanceof SelectResults);
+          pos1 = (Position) ((SelectResults) rs).iterator().next();
+          if (!pos1.secId.equals("APPL")) {
+            fail("Query thread did not verify index results even when RE is under update");
+            IndexManager.testHook = null;
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          Assert.fail("Query execution failed on server.", e);
           IndexManager.testHook = null;
+        } finally {
+          hooked = false;// Let client put go further.
         }
-      } catch (Exception e) {
-        logger.error(e);
-        Assertions.fail("Query execution failed on server.", e);
-        IndexManager.testHook = null;
-      } finally {
-        hooked = false;// Let client put go further.
-      }
-      Awaitility.await().until(() -> hooked);
-
-      try {
-        Object resultSet = queryService
-            .newQuery("select pos from /" + repRegionName
-                + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
-            .execute();
-        assertTrue(resultSet instanceof SelectResults);
-        if (((SelectResults) resultSet).size() > 0) {
-          Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
-          if (pos2.equals(pos1)) {
-            fail("Query thread did not verify index results even when RE is under update and "
-                + "RegionEntry value has been modified before releasing the lock");
+        while (!hooked) {
+          Wait.pause(100);
+        }
+        try {
+          Object rs = qs
+              .newQuery("select pos from /" + repRegionName
+                  + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
+              .execute();
+          assertTrue(rs instanceof SelectResults);
+          if (((SelectResults) rs).size() > 0) {
+            Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+            if (pos2.equals(pos1)) {
+              fail("Query thread did not verify index results even when RE is under update and "
+                  + "RegionEntry value has been modified before releasing the lock");
+            }
           }
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail("Query execution failed on server.");
+        } finally {
+          IndexManager.testHook = null;
+          hooked = false;// Let client put go further.
         }
-      } catch (Exception e) {
-        logger.error(e);
-        fail("Query execution failed on server.");
-      } finally {
-        IndexManager.testHook = null;
-        hooked = false;// Let client put go further.
       }
     });
-    Awaitility.await().until(joinThread(putThread));
+    ThreadUtils.join(putThread, 200);
   }
 
-  private Callable<Boolean> joinThread(AsyncInvocation thread) {
-    return () -> {
-      try {
-        thread.join(100L);
-      } catch (InterruptedException e) {
-        return false;
-      }
-      if (thread.isAlive()) {
-        return false;
-      }
-      return true;
-    };
+  public static void createProxyRegions() {
+    new QueryDataInconsistencyDUnitTest().createProxyRegs();
   }
 
   private void createProxyRegs() {
-    ClientCache cache = (ClientCache) cacheRule.getCache();
+    ClientCache cache = (ClientCache) CacheFactory.getAnyInstance();
     cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName);
+
+    /*
+     * cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create( PartitionedRegionName1);
+     */
+  }
+
+  public static void createNewPR() {
+    new QueryDataInconsistencyDUnitTest().createPR();
   }
 
   public void createPR() {
     PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
-    Cache cache = cacheRule.getCache();
-    int numOfBuckets = 20;
+    cache = CacheFactory.getAnyInstance();
     cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
         .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(numOfBuckets)
             .setPartitionResolver(testKeyBasedResolver).create())
         .create(PartitionedRegionName1);
   }
 
+  public static void createCacheClientWithoutRegion(String host, Integer port1) {
+    new QueryDataInconsistencyDUnitTest().createCacheClientWithoutReg(host, port1);
+  }
+
+  private void createCacheClientWithoutReg(String host, Integer port1) {
+    disconnectFromDS();
+    new ClientCacheFactory().addPoolServer(host, port1).create();
+  }
+
+  /**
+   * This function puts portfolio objects into the created Region (PR or Local) *
+   *
+   * @return cacheSerializable object
+   */
+  public CacheSerializableRunnable getCacheSerializableRunnableForPRPuts(final String regionName,
+      final Object[] portfolio, final int from, final int to) {
+    SerializableRunnable puts = new CacheSerializableRunnable("Region Puts") {
+      @Override
+      public void run2() throws CacheException {
+        Cache cache = CacheFactory.getAnyInstance();
+        Region region = cache.getRegion(repRegionName);
+        for (int j = from; j < to; j++)
+          region.put(new Integer(j), portfolio[j]);
+        LogWriterUtils.getLogWriter().info(
+            "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio data on Region "
+                + regionName);
+      }
+    };
+    return (CacheSerializableRunnable) puts;
+  }
+
   public class IndexManagerTestHook
       implements org.apache.geode.cache.query.internal.index.IndexManager.TestHook {
     public void hook(final int spot) throws RuntimeException {
       switch (spot) {
         case 9: // Before Index update and after region entry lock.
           hooked = true;
-          logger
+          LogWriterUtils.getLogWriter()
               .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index Entry.");
-          Awaitility.await().until(() -> !hooked);
+          while (hooked) {
+            Wait.pause(100);
+          }
+          assertEquals(hooked, false);
           break;
         case 10: // Before Region update and after Index Remove call.
           hooked = true;
-          logger
+          LogWriterUtils.getLogWriter()
               .info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index Entry.");
-          Awaitility.await().until(() -> !hooked);
+          while (hooked) {
+            Wait.pause(100);
+          }
+          assertEquals(hooked, false);
           break;
         default:
           break;

-- 
To stop receiving notification emails like this one, please contact
jasonhuynh@apache.org.