You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/08/23 22:24:09 UTC

[geode] branch develop updated: GEODE-5440: when we need re-evaluate a entry in a index, we need to pass in the outer value key in the equiJoin. (#2338)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new a1f747c  GEODE-5440: when we need re-evaluate a entry in a index, we need to pass in the outer value key in the equiJoin. (#2338)
a1f747c is described below

commit a1f747ca126a85192ac3c36183b9bfc9ae5116f9
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Thu Aug 23 15:24:01 2018 -0700

    GEODE-5440: when we need re-evaluate a entry in a index, we need to pass in the outer value key in the equiJoin. (#2338)
    
    * rewrite the flaky test with an integration test
    * pass in the value key when doing the equiJoin
    * code clean up.
---
 .../partitioned/PRColocatedEquiJoinDUnitTest.java  |  47 ---------
 .../query/partitioned/PRQueryDUnitHelper.java      |  86 +++-------------
 .../query/internal/IndexManagerJUnitTest.java      |  12 +--
 .../query/partitioned/PRColocatedEquiJoinTest.java | 110 +++++++++++++++++++++
 .../query/internal/IndexTrackingQueryObserver.java |   1 +
 .../geode/cache/query/internal/QueryUtils.java     |  10 +-
 .../cache/query/internal/index/IndexManager.java   |  30 +++---
 .../cache/query/internal/index/RangeIndex.java     |   7 +-
 .../geode/test/dunit/rules/ClusterStartupRule.java |   8 ++
 .../geode/test/junit/rules/ServerStarterRule.java  |  21 ++++
 10 files changed, 175 insertions(+), 157 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java
index 29ff500..586cafa 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java
@@ -828,53 +828,6 @@ public class PRColocatedEquiJoinDUnitTest extends CacheTestCase {
   }
 
   @Test
-  public void testRRPRLocalQueryingWithHetroIndexes() throws Exception {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    setCacheInVMs(vm0);
-
-    // Creating PR's on the participating VM's
-    // Creating DataStore node on the VM0.
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRCreate(coloName, redundancy,
-        NewPortfolio.class));
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRIndexCreate(coloName, "IdIndex1",
-        "r2.id", "/" + coloName + " r2", null));
-
-    // Creating Colocated Region DataStore node on the VM0.
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(name,
-        Portfolio.class));
-
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRIndexCreate(name, "IdIndex2",
-        "r1.ID", "/" + name + " r1, r1.positions.values pos1", null));
-
-    // Creating local region on vm0 to compare the results of query.
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(localName,
-        Portfolio.class));
-
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(coloLocalName,
-        NewPortfolio.class));
-
-    // Generating portfolio object array to be populated across the PR's & Local Regions
-    Portfolio[] portfolio = createPortfoliosAndPositions(cntDest);
-    NewPortfolio[] newPortfolio = createNewPortfoliosAndPositions(cntDest);
-
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(localName, portfolio, cnt,
-        cntDest));
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(coloLocalName, newPortfolio,
-        cnt, cntDest));
-
-    // Putting the data into the PR's created
-    vm0.invoke(
-        prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(name, portfolio, cnt, cntDest));
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(coloName, newPortfolio, cnt,
-        cntDest));
-
-    // querying the VM for data and comparing the result with query result of local region.
-    vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForRRAndPRQueryAndCompareResults(name,
-        coloName, localName, coloLocalName));
-  }
-
-  @Test
   public void testPRRRCompactRangeAndNestedRangeIndexQuerying() throws Exception {
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
index 2cd5dc1..edb5cfa 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 
 import util.TestException;
 
@@ -49,7 +48,6 @@ import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionAdapter;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
@@ -76,8 +74,6 @@ import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
@@ -845,7 +841,6 @@ public class PRQueryDUnitHelper implements Serializable {
             "p, pos from /REGION_NAME p, p.positions.values pos order by p.ID, pos.secId",};
 
         Object r[][] = new Object[queries.length][2];
-        Region local = cache.getRegion(localRegion);
         Region region = cache.getRegion(regionName);
         assertNotNull(region);
 
@@ -963,7 +958,6 @@ public class PRQueryDUnitHelper implements Serializable {
             "p.ID, pos.secId from /REGION_NAME p, p.positions.values pos order by p.ID desc, pos.secId",};
 
         Object r[][] = new Object[1][2];
-        Region local = cache.getRegion(localRegion);
         Region region = cache.getRegion(regionName);
         assertNotNull(region);
 
@@ -1588,66 +1582,6 @@ public class PRQueryDUnitHelper implements Serializable {
   }
 
   /**
-   * This function <br>
-   * 1. calls the cache.close on the VM <br>
-   * 2. creates the cache again & also the PR <br>
-   *
-   * @return cacheSerializable object
-   *
-   *         NOTE: Closing of the cache must be done from the test case rather than in
-   *         PRQueryDUintHelper
-   *
-   */
-
-  public CacheSerializableRunnable getCacheSerializableRunnableForCacheClose(
-      final String regionName, final int redundancy, final Class constraint) {
-    SerializableRunnable PrRegion = new CacheSerializableRunnable("cacheClose") {
-      @Override
-      public void run2() throws CacheException {
-        final String expectedCacheClosedException = CacheClosedException.class.getName();
-        final String expectedReplyException = ReplyException.class.getName();
-        getCache().getLogger().info("<ExpectedException action=add>" + expectedCacheClosedException
-            + "</ExpectedException>");
-        getCache().getLogger().info(
-            "<ExpectedException action=add>" + expectedReplyException + "</ExpectedException>");
-        Cache cache = getCache();
-        org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info(
-            "PROperationWithQueryDUnitTest#getCacheSerializableRunnableForCacheClose: Recreating the cache ");
-        AttributesFactory attr = new AttributesFactory();
-        attr.setValueConstraint(constraint);
-        PartitionAttributesFactory paf = new PartitionAttributesFactory();
-        PartitionAttributes prAttr = paf.setRedundantCopies(redundancy).create();
-        attr.setPartitionAttributes(prAttr);
-        final CountDownLatch cdl = new CountDownLatch(1);
-        ResourceObserverAdapter observer = new InternalResourceManager.ResourceObserverAdapter() {
-          @Override
-          public void recoveryFinished(Region region) {
-            cdl.countDown();
-          }
-        };
-        InternalResourceManager.setResourceObserver(observer);
-        try {
-          cache.createRegion(regionName, attr.create());
-          // Wait for recovery to finish
-          cdl.await();
-        } catch (InterruptedException e) {
-          Assert.fail("interupted", e);
-        } finally {
-          InternalResourceManager.setResourceObserver(null);
-        }
-        org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info(
-            "PROperationWithQueryDUnitTest#getCacheSerializableRunnableForCacheClose: cache Recreated on VM ");
-        getCache().getLogger().info(
-            "<ExpectedException action=remove>" + expectedReplyException + "</ExpectedException>");
-        getCache().getLogger().info("<ExpectedException action=remove>"
-            + expectedCacheClosedException + "</ExpectedException>");
-      }
-
-    };
-    return (CacheSerializableRunnable) PrRegion;
-  }
-
-  /**
    * This function creates a appropriate index on a PR given the name and other parameters.
    */
   public CacheSerializableRunnable getCacheSerializableRunnableForPRIndexCreate(
@@ -2046,7 +1980,7 @@ public class PRQueryDUnitHelper implements Serializable {
 
       }
     };
-    return (CacheSerializableRunnable) PrRegion;
+    return PrRegion;
 
   }
 
@@ -2160,7 +2094,7 @@ public class PRQueryDUnitHelper implements Serializable {
 
       }
     };
-    return (CacheSerializableRunnable) PrRegion;
+    return PrRegion;
 
   }
 
@@ -2272,7 +2206,7 @@ public class PRQueryDUnitHelper implements Serializable {
 
       }
     };
-    return (CacheSerializableRunnable) PrRegion;
+    return PrRegion;
 
   }
 
@@ -2288,11 +2222,14 @@ public class PRQueryDUnitHelper implements Serializable {
         Cache cache = getCache();
         // Querying the PR region
 
-        String[] queries = new String[] {"r1.ID = r2.id", "r1.ID = r2.id AND r1.ID > 5",
+        String[] queries = new String[] {"r1.ID = r2.id",
+            "r1.ID = r2.id AND r1.ID > 5",
             "r1.ID = r2.id AND r1.status = 'active'",
             // "r1.ID = r2.id LIMIT 10",
-            "r1.ID = r2.id ORDER BY r1.ID", "r1.ID = r2.id ORDER BY r2.id",
-            "r1.ID = r2.id ORDER BY r2.status", "r1.ID = r2.id AND r1.status != r2.status",
+            "r1.ID = r2.id ORDER BY r1.ID",
+            "r1.ID = r2.id ORDER BY r2.id",
+            "r1.ID = r2.id ORDER BY r2.status",
+            "r1.ID = r2.id AND r1.status != r2.status",
             "r1.ID = r2.id AND r1.status = r2.status",
             "r1.ID = r2.id AND r1.positions.size = r2.positions.size",
             "r1.ID = r2.id AND r1.positions.size > r2.positions.size",
@@ -2323,7 +2260,6 @@ public class PRQueryDUnitHelper implements Serializable {
         }
 
         QueryService qs = getCache().getQueryService();
-        Object[] params;
         try {
           for (int j = 0; j < queries.length; j++) {
             getCache().getLogger().info("About to execute local query: " + queries[j]);
@@ -2386,12 +2322,12 @@ public class PRQueryDUnitHelper implements Serializable {
 
       }
     };
-    return (CacheSerializableRunnable) PrRegion;
+    return PrRegion;
 
   }
 
   // Helper classes and function
-  class TestQueryFunction extends FunctionAdapter {
+  public static class TestQueryFunction implements Function {
 
     @Override
     public boolean hasResult() {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java
index c82f87e..0d0790c 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java
@@ -66,7 +66,7 @@ public class IndexManagerJUnitTest {
     IndexManager.resetIndexBufferTime();
     // fake entry update at LMT of 0 and actual time of 10
     // safe query time set in index manager is going to be 20
-    assertTrue(IndexManager.setIndexBufferTime(0, 10));
+    IndexManager.setIndexBufferTime(0, 10);
 
     // fake query start at actual time of 9, 10, 11 and using the fake LMT of 0
     assertTrue(IndexManager.needsRecalculation(9, 0));
@@ -84,13 +84,13 @@ public class IndexManagerJUnitTest {
 
     // Now let's assume a new update has occurred, this update delta and time combo still is not
     // larger
-    assertFalse(IndexManager.setIndexBufferTime(0, 9));
-    assertFalse(IndexManager.setIndexBufferTime(1, 10));
+    IndexManager.setIndexBufferTime(0, 9);
+    IndexManager.setIndexBufferTime(1, 10);
 
     // Now let's assume a new update has occurred where the time is larger (enough to roll off the
     // large delta)
     // but the delta is smaller
-    assertTrue(IndexManager.setIndexBufferTime(30, 30));
+    IndexManager.setIndexBufferTime(30, 30);
 
     // Now that we have a small delta, let's see if a query that was "stuck" would reevaluate
     // appropriately
@@ -103,7 +103,7 @@ public class IndexManagerJUnitTest {
     IndexManager.resetIndexBufferTime();
     // fake entry update at LMT of 0 and actual time of 10
     // safe query time set in index manager is going to be -10
-    assertTrue(IndexManager.setIndexBufferTime(210, 200));
+    IndexManager.setIndexBufferTime(210, 200);
 
     assertFalse(IndexManager.needsRecalculation(200, 190));
     assertFalse(IndexManager.needsRecalculation(200, 200));
@@ -113,7 +113,7 @@ public class IndexManagerJUnitTest {
     assertTrue(IndexManager.needsRecalculation(200, 221));
 
     // now lets say an entry updates with no delta
-    assertTrue(IndexManager.setIndexBufferTime(210, 210));
+    IndexManager.setIndexBufferTime(210, 210);
 
     assertTrue(IndexManager.needsRecalculation(200, 190));
     assertTrue(IndexManager.needsRecalculation(200, 200));
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java
new file mode 100644
index 0000000..4fc5f35
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.partitioned;
+
+import static org.apache.geode.cache.query.Utils.createNewPortfoliosAndPositions;
+import static org.apache.geode.cache.query.Utils.createPortfoliosAndPositions;
+
+import java.util.ArrayList;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import parReg.query.unittest.NewPortfolio;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.cache.query.functional.StructSetOrResultsSet;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+@Category({OQLQueryTest.class})
+public class PRColocatedEquiJoinTest {
+  private static final int count = 100;
+
+  @Rule
+  public ServerStarterRule server = new ServerStarterRule().withAutoStart();
+
+  @Test
+  public void prQueryWithHeteroIndex() throws Exception {
+    InternalCache cache = server.getCache();
+    QueryService qs = cache.getQueryService();
+
+    // create a local and Partition region for 1st select query
+    Region r1 = server.createRegion(RegionShortcut.LOCAL, "region1",
+        rf -> rf.setValueConstraint(Portfolio.class));
+    qs.createIndex("IdIndex1", "r.ID", "/region1 r, r.positions.values pos");
+    Region r2 = server.createRegion(RegionShortcut.PARTITION, "region2",
+        rf -> rf.setValueConstraint(NewPortfolio.class));
+    qs.createIndex("IdIndex2", "r.id", "/region2 r");
+
+    // create two local regions for 2nd select query to compare the result set
+    Region r3 = server.createRegion(RegionShortcut.LOCAL, "region3",
+        rf -> rf.setValueConstraint(Portfolio.class));
+    Region r4 = server.createRegion(RegionShortcut.LOCAL, "region4",
+        rf -> rf.setValueConstraint(NewPortfolio.class));
+
+    Portfolio[] portfolio = createPortfoliosAndPositions(count);
+    NewPortfolio[] newPortfolio = createNewPortfoliosAndPositions(count);
+
+    for (int i = 0; i < count; i++) {
+      r1.put(new Integer(i), portfolio[i]);
+      r2.put(new Integer(i), newPortfolio[i]);
+      r3.put(new Integer(i), portfolio[i]);
+      r4.put(new Integer(i), newPortfolio[i]);
+    }
+
+    ArrayList results[][] = new ArrayList[whereClauses.length][2];
+    for (int i = 0; i < whereClauses.length; i++) {
+      // issue the first select on region 1 and region 2
+      SelectResults selectResults = (SelectResults) qs.newQuery("<trace> Select "
+          + (whereClauses[i].contains("ORDER BY") ? "DISTINCT" : "")
+          + "* from /region1 r1, /region2 r2 where " + whereClauses[i])
+          .execute();
+      results[i][0] = (ArrayList) selectResults.asList();
+
+      // issue the second select on region 3 and region 4
+      SelectResults queryResult = (SelectResults) qs.newQuery("<trace> Select "
+          + (whereClauses[i].contains("ORDER BY") ? "DISTINCT" : "")
+          + "* from /region3 r1, /region4 r2 where " + whereClauses[i])
+          .execute();
+      results[i][1] = (ArrayList) queryResult.asList();
+    }
+
+    // compare the resultsets and expect them to be equal
+    StructSetOrResultsSet ssORrs = new StructSetOrResultsSet();
+    ssORrs.CompareQueryResultsAsListWithoutAndWithIndexes(results, whereClauses.length, false,
+        false,
+        whereClauses);
+  }
+
+  private static String[] whereClauses = new String[] {"r2.ID = r1.id",
+      "r1.ID = r2.id AND r1.ID > 5",
+      "r1.ID = r2.id AND r1.status = 'active'",
+      "r1.ID = r2.id ORDER BY r1.ID", "r1.ID = r2.id ORDER BY r2.id",
+      "r1.ID = r2.id ORDER BY r2.status", "r1.ID = r2.id AND r1.status != r2.status",
+      "r1.ID = r2.id AND r1.status = r2.status",
+      "r1.ID = r2.id AND r1.positions.size = r2.positions.size",
+      "r1.ID = r2.id AND r1.positions.size > r2.positions.size",
+      "r1.ID = r2.id AND r1.positions.size < r2.positions.size",
+      "r1.ID = r2.id AND r1.positions.size = r2.positions.size AND r2.positions.size > 0",
+      "r1.ID = r2.id AND (r1.positions.size > r2.positions.size OR r2.positions.size > 0)",
+      "r1.ID = r2.id AND (r1.positions.size < r2.positions.size OR r1.positions.size > 0)",
+  };
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java
index 9e88df3..5c715ad 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java
@@ -183,6 +183,7 @@ public class IndexTrackingQueryObserver extends QueryObserverAdapter {
       return results.keySet();
     }
 
+    // initial result of index in the observer. 0 means it's not updated yet.
     public void addRegionId(String regionId) {
       this.results.put(regionId, 0);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
index ec7086d..ef23565 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
@@ -482,7 +482,7 @@ public class QueryUtils {
   private static void mergeAndExpandCutDownRelationshipIndexResults(Object[][] values,
       SelectResults result, RuntimeIterator[][] indexFieldToItrsMapping,
       ListIterator expansionListIterator, List finalItrs, ExecutionContext context,
-      List[] checkList, CompiledValue iterOps, IndexCutDownExpansionHelper[] icdeh, int level)
+      CompiledValue iterOps, IndexCutDownExpansionHelper[] icdeh, int level)
       throws FunctionDomainException, TypeMismatchException, NameResolutionException,
       QueryInvocationTargetException {
 
@@ -506,7 +506,7 @@ public class QueryUtils {
           }
         } else {
           mergeAndExpandCutDownRelationshipIndexResults(values, result, indexFieldToItrsMapping,
-              expansionListIterator, finalItrs, context, checkList, iterOps, icdeh, level + 1);
+              expansionListIterator, finalItrs, context, iterOps, icdeh, level + 1);
           if (icdeh[level + 1].cutDownNeeded) {
             icdeh[level + 1].checkSet.clear();
           }
@@ -715,7 +715,6 @@ public class QueryUtils {
       // be a Compiled Region otherwise it will be a CompiledPath that
       // we can extract the id from. In the end the result will be the alias which is used as a
       // prefix
-      CompiledValue collectionExpression = currentLevel.getCmpIteratorDefn().getCollectionExpr();
       String key = null;
       boolean useDerivedResults = true;
       if (currentLevel.getCmpIteratorDefn().getCollectionExpr()
@@ -1250,7 +1249,7 @@ public class QueryUtils {
                   maxCartesianDepth);
             } else {
               mergeAndExpandCutDownRelationshipIndexResults(values, returnSet, mappings,
-                  expansionListIterator, finalList, context, totalCheckList, iterOperands, icdeh,
+                  expansionListIterator, finalList, context, iterOperands, icdeh,
                   0);
             }
             if (icdeh[0].cutDownNeeded)
@@ -1472,7 +1471,6 @@ public class QueryUtils {
     RuntimeIterator[][] mappings = new RuntimeIterator[2][];
     mappings[0] = ich1.indexFieldToItrsMapping;
     mappings[1] = ich2.indexFieldToItrsMapping;
-    List[] totalCheckList = new List[] {ich1.checkList, ich2.checkList};
     Iterator dataItr = data.iterator();
     IndexCutDownExpansionHelper[] icdeh =
         new IndexCutDownExpansionHelper[] {new IndexCutDownExpansionHelper(ich1.checkList, context),
@@ -1494,7 +1492,7 @@ public class QueryUtils {
           // skip the similar row of a set , even when the row in its entirety is unique ( made by
           // different data in the other set)
           mergeAndExpandCutDownRelationshipIndexResults(values, returnSet, mappings,
-              expansionListIterator, totalFinalList, context, totalCheckList, iterOperands, icdeh,
+              expansionListIterator, totalFinalList, context, iterOperands, icdeh,
               0 /* Level */);
           if (icdeh[0].cutDownNeeded)
             icdeh[0].checkSet.clear();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
index de9c709..e3c33e8 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
@@ -181,11 +181,15 @@ public class IndexManager {
    * aggressively. But the large hiccup will eventually be rolled off as time is always increasing
    * This is a fix for #47475
    *
-   * @param operationTime the last modified time from version tag
+   * @param lastModifiedTime the last modified time from version tag
    */
-  public static boolean setIndexBufferTime(long operationTime, long currentCacheTime) {
-    long timeDifference = currentCacheTime - operationTime;
-    return setNewLargestValue(SAFE_QUERY_TIME, currentCacheTime + timeDifference);
+  public static void setIndexBufferTime(long lastModifiedTime, long currentCacheTime) {
+    long oldValue = SAFE_QUERY_TIME.get();
+    long newValue = currentCacheTime + currentCacheTime - lastModifiedTime;
+    if (oldValue < newValue) {
+      // use compare and set in case the value has been changed since we got the old value
+      SAFE_QUERY_TIME.compareAndSet(oldValue, newValue);
+    }
   }
 
   /**
@@ -213,21 +217,9 @@ public class IndexManager {
    * Small amounts of false positives are ok as it will have a slight impact on performance
    */
   public static boolean needsRecalculation(long queryStartTime, long lastModifiedTime) {
-    return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION
-        && queryStartTime <= SAFE_QUERY_TIME.get() - queryStartTime + lastModifiedTime;
-  }
-
-  private static boolean setNewLargestValue(AtomicLong value, long newValue) {
-    boolean done = false;
-    while (!done) {
-      long oldValue = value.get();
-      if (oldValue < newValue) {
-        return value.compareAndSet(oldValue, newValue);
-      } else {
-        done = true;
-      }
-    }
-    return false;
+    boolean needsRecalculate =
+        (queryStartTime <= (lastModifiedTime + (SAFE_QUERY_TIME.get() - queryStartTime)));
+    return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION && needsRecalculate;
   }
 
   /** Test Hook */
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java
index 28d693d..f7e5b1c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java
@@ -584,12 +584,9 @@ public class RangeIndex extends AbstractIndex {
       Object innerEntry = null;
       Object outerKey = null;
       Object innerKey = null;
-      // boolean incrementOuter = true;
       boolean incrementInner = true;
       outer: while (outer.hasNext()) {
-        // if (incrementOuter) {
         outerEntry = (Map.Entry) outer.next();
-        // }
         outerKey = outerEntry.getKey();
         while (!incrementInner || inner.hasNext()) {
           if (incrementInner) {
@@ -611,7 +608,9 @@ public class RangeIndex extends AbstractIndex {
             } else {
               innerValue = ((Map.Entry) innerEntry).getValue();
             }
-            populateListForEquiJoin(data, outerEntry.getValue(), innerValue, context, null);
+            // GEODE-5440: need to pass in the key value to do EquiJoin
+            populateListForEquiJoin(data, outerEntry.getValue(), innerValue, context,
+                outerEntry.getKey());
             incrementInner = true;
             continue outer;
           } else if (compare < 0) {
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
index 234d90e..de71e7b 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
@@ -42,6 +42,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.DUnitEnv;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.SerializableConsumerIF;
@@ -113,6 +114,13 @@ public class ClusterStartupRule extends ExternalResource implements Serializable
     this.vmCount = vmCount;
   }
 
+  /**
+   * Returns the port that the standard dunit locator is listening on.
+   */
+  public static int getDUnitLocatorPort() {
+    return DUnitEnv.get().getLocatorPort();
+  }
+
   public static ClientCache getClientCache() {
     if (clientCacheRule == null) {
       return null;
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
index e1e7397..1b5d99f 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
@@ -22,8 +22,12 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Consumer;
 
 import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -167,6 +171,23 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl
     return this;
   }
 
+  public Region createRegion(RegionShortcut type, String name,
+      Consumer<RegionFactory> regionFactoryConsumer) {
+    RegionFactory factory = getCache().createRegionFactory(type);
+    regionFactoryConsumer.accept(factory);
+    return factory.create(name);
+  }
+
+  public Region createPRRegion(String name, Consumer<RegionFactory> regionFactoryConsumer,
+      Consumer<PartitionAttributesFactory> prAttributesFactory) {
+    return createRegion(RegionShortcut.PARTITION, name, rf -> {
+      regionFactoryConsumer.accept(rf);
+      PartitionAttributesFactory factory = new PartitionAttributesFactory();
+      prAttributesFactory.accept(factory);
+      rf.setPartitionAttributes(factory.create());
+    });
+  }
+
   public void startServer(Properties properties, int locatorPort) {
     withProperties(properties).withConnectionToLocator(locatorPort).startServer();
   }