You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/08/23 22:24:09 UTC
[geode] branch develop updated: GEODE-5440: when we need
re-evaluate a entry in a index,
we need to pass in the outer value key in the equiJoin. (#2338)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new a1f747c GEODE-5440: when we need re-evaluate a entry in a index, we need to pass in the outer value key in the equiJoin. (#2338)
a1f747c is described below
commit a1f747ca126a85192ac3c36183b9bfc9ae5116f9
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Thu Aug 23 15:24:01 2018 -0700
GEODE-5440: when we need re-evaluate a entry in a index, we need to pass in the outer value key in the equiJoin. (#2338)
* rewrite the flaky test with an integration test
* pass in the value key when doing the equiJoin
* code clean up.
---
.../partitioned/PRColocatedEquiJoinDUnitTest.java | 47 ---------
.../query/partitioned/PRQueryDUnitHelper.java | 86 +++-------------
.../query/internal/IndexManagerJUnitTest.java | 12 +--
.../query/partitioned/PRColocatedEquiJoinTest.java | 110 +++++++++++++++++++++
.../query/internal/IndexTrackingQueryObserver.java | 1 +
.../geode/cache/query/internal/QueryUtils.java | 10 +-
.../cache/query/internal/index/IndexManager.java | 30 +++---
.../cache/query/internal/index/RangeIndex.java | 7 +-
.../geode/test/dunit/rules/ClusterStartupRule.java | 8 ++
.../geode/test/junit/rules/ServerStarterRule.java | 21 ++++
10 files changed, 175 insertions(+), 157 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java
index 29ff500..586cafa 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinDUnitTest.java
@@ -828,53 +828,6 @@ public class PRColocatedEquiJoinDUnitTest extends CacheTestCase {
}
@Test
- public void testRRPRLocalQueryingWithHetroIndexes() throws Exception {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- setCacheInVMs(vm0);
-
- // Creating PR's on the participating VM's
- // Creating DataStore node on the VM0.
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRCreate(coloName, redundancy,
- NewPortfolio.class));
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRIndexCreate(coloName, "IdIndex1",
- "r2.id", "/" + coloName + " r2", null));
-
- // Creating Colocated Region DataStore node on the VM0.
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(name,
- Portfolio.class));
-
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRIndexCreate(name, "IdIndex2",
- "r1.ID", "/" + name + " r1, r1.positions.values pos1", null));
-
- // Creating local region on vm0 to compare the results of query.
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(localName,
- Portfolio.class));
-
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForLocalRegionCreation(coloLocalName,
- NewPortfolio.class));
-
- // Generating portfolio object array to be populated across the PR's & Local Regions
- Portfolio[] portfolio = createPortfoliosAndPositions(cntDest);
- NewPortfolio[] newPortfolio = createNewPortfoliosAndPositions(cntDest);
-
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(localName, portfolio, cnt,
- cntDest));
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(coloLocalName, newPortfolio,
- cnt, cntDest));
-
- // Putting the data into the PR's created
- vm0.invoke(
- prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(name, portfolio, cnt, cntDest));
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForPRPuts(coloName, newPortfolio, cnt,
- cntDest));
-
- // querying the VM for data and comparing the result with query result of local region.
- vm0.invoke(prQueryDUnitHelper.getCacheSerializableRunnableForRRAndPRQueryAndCompareResults(name,
- coloName, localName, coloLocalName));
- }
-
- @Test
public void testPRRRCompactRangeAndNestedRangeIndexQuerying() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
index 2cd5dc1..edb5cfa 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRQueryDUnitHelper.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import util.TestException;
@@ -49,7 +48,6 @@ import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionAdapter;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
@@ -76,8 +74,6 @@ import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.control.InternalResourceManager;
-import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.SerializableRunnableIF;
@@ -845,7 +841,6 @@ public class PRQueryDUnitHelper implements Serializable {
"p, pos from /REGION_NAME p, p.positions.values pos order by p.ID, pos.secId",};
Object r[][] = new Object[queries.length][2];
- Region local = cache.getRegion(localRegion);
Region region = cache.getRegion(regionName);
assertNotNull(region);
@@ -963,7 +958,6 @@ public class PRQueryDUnitHelper implements Serializable {
"p.ID, pos.secId from /REGION_NAME p, p.positions.values pos order by p.ID desc, pos.secId",};
Object r[][] = new Object[1][2];
- Region local = cache.getRegion(localRegion);
Region region = cache.getRegion(regionName);
assertNotNull(region);
@@ -1588,66 +1582,6 @@ public class PRQueryDUnitHelper implements Serializable {
}
/**
- * This function <br>
- * 1. calls the cache.close on the VM <br>
- * 2. creates the cache again & also the PR <br>
- *
- * @return cacheSerializable object
- *
- * NOTE: Closing of the cache must be done from the test case rather than in
- * PRQueryDUintHelper
- *
- */
-
- public CacheSerializableRunnable getCacheSerializableRunnableForCacheClose(
- final String regionName, final int redundancy, final Class constraint) {
- SerializableRunnable PrRegion = new CacheSerializableRunnable("cacheClose") {
- @Override
- public void run2() throws CacheException {
- final String expectedCacheClosedException = CacheClosedException.class.getName();
- final String expectedReplyException = ReplyException.class.getName();
- getCache().getLogger().info("<ExpectedException action=add>" + expectedCacheClosedException
- + "</ExpectedException>");
- getCache().getLogger().info(
- "<ExpectedException action=add>" + expectedReplyException + "</ExpectedException>");
- Cache cache = getCache();
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info(
- "PROperationWithQueryDUnitTest#getCacheSerializableRunnableForCacheClose: Recreating the cache ");
- AttributesFactory attr = new AttributesFactory();
- attr.setValueConstraint(constraint);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- PartitionAttributes prAttr = paf.setRedundantCopies(redundancy).create();
- attr.setPartitionAttributes(prAttr);
- final CountDownLatch cdl = new CountDownLatch(1);
- ResourceObserverAdapter observer = new InternalResourceManager.ResourceObserverAdapter() {
- @Override
- public void recoveryFinished(Region region) {
- cdl.countDown();
- }
- };
- InternalResourceManager.setResourceObserver(observer);
- try {
- cache.createRegion(regionName, attr.create());
- // Wait for recovery to finish
- cdl.await();
- } catch (InterruptedException e) {
- Assert.fail("interupted", e);
- } finally {
- InternalResourceManager.setResourceObserver(null);
- }
- org.apache.geode.test.dunit.LogWriterUtils.getLogWriter().info(
- "PROperationWithQueryDUnitTest#getCacheSerializableRunnableForCacheClose: cache Recreated on VM ");
- getCache().getLogger().info(
- "<ExpectedException action=remove>" + expectedReplyException + "</ExpectedException>");
- getCache().getLogger().info("<ExpectedException action=remove>"
- + expectedCacheClosedException + "</ExpectedException>");
- }
-
- };
- return (CacheSerializableRunnable) PrRegion;
- }
-
- /**
* This function creates a appropriate index on a PR given the name and other parameters.
*/
public CacheSerializableRunnable getCacheSerializableRunnableForPRIndexCreate(
@@ -2046,7 +1980,7 @@ public class PRQueryDUnitHelper implements Serializable {
}
};
- return (CacheSerializableRunnable) PrRegion;
+ return PrRegion;
}
@@ -2160,7 +2094,7 @@ public class PRQueryDUnitHelper implements Serializable {
}
};
- return (CacheSerializableRunnable) PrRegion;
+ return PrRegion;
}
@@ -2272,7 +2206,7 @@ public class PRQueryDUnitHelper implements Serializable {
}
};
- return (CacheSerializableRunnable) PrRegion;
+ return PrRegion;
}
@@ -2288,11 +2222,14 @@ public class PRQueryDUnitHelper implements Serializable {
Cache cache = getCache();
// Querying the PR region
- String[] queries = new String[] {"r1.ID = r2.id", "r1.ID = r2.id AND r1.ID > 5",
+ String[] queries = new String[] {"r1.ID = r2.id",
+ "r1.ID = r2.id AND r1.ID > 5",
"r1.ID = r2.id AND r1.status = 'active'",
// "r1.ID = r2.id LIMIT 10",
- "r1.ID = r2.id ORDER BY r1.ID", "r1.ID = r2.id ORDER BY r2.id",
- "r1.ID = r2.id ORDER BY r2.status", "r1.ID = r2.id AND r1.status != r2.status",
+ "r1.ID = r2.id ORDER BY r1.ID",
+ "r1.ID = r2.id ORDER BY r2.id",
+ "r1.ID = r2.id ORDER BY r2.status",
+ "r1.ID = r2.id AND r1.status != r2.status",
"r1.ID = r2.id AND r1.status = r2.status",
"r1.ID = r2.id AND r1.positions.size = r2.positions.size",
"r1.ID = r2.id AND r1.positions.size > r2.positions.size",
@@ -2323,7 +2260,6 @@ public class PRQueryDUnitHelper implements Serializable {
}
QueryService qs = getCache().getQueryService();
- Object[] params;
try {
for (int j = 0; j < queries.length; j++) {
getCache().getLogger().info("About to execute local query: " + queries[j]);
@@ -2386,12 +2322,12 @@ public class PRQueryDUnitHelper implements Serializable {
}
};
- return (CacheSerializableRunnable) PrRegion;
+ return PrRegion;
}
// Helper classes and function
- class TestQueryFunction extends FunctionAdapter {
+ public static class TestQueryFunction implements Function {
@Override
public boolean hasResult() {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java
index c82f87e..0d0790c 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/internal/IndexManagerJUnitTest.java
@@ -66,7 +66,7 @@ public class IndexManagerJUnitTest {
IndexManager.resetIndexBufferTime();
// fake entry update at LMT of 0 and actual time of 10
// safe query time set in index manager is going to be 20
- assertTrue(IndexManager.setIndexBufferTime(0, 10));
+ IndexManager.setIndexBufferTime(0, 10);
// fake query start at actual time of 9, 10, 11 and using the fake LMT of 0
assertTrue(IndexManager.needsRecalculation(9, 0));
@@ -84,13 +84,13 @@ public class IndexManagerJUnitTest {
// Now let's assume a new update has occurred, this update delta and time combo still is not
// larger
- assertFalse(IndexManager.setIndexBufferTime(0, 9));
- assertFalse(IndexManager.setIndexBufferTime(1, 10));
+ IndexManager.setIndexBufferTime(0, 9);
+ IndexManager.setIndexBufferTime(1, 10);
// Now let's assume a new update has occurred where the time is larger (enough to roll off the
// large delta)
// but the delta is smaller
- assertTrue(IndexManager.setIndexBufferTime(30, 30));
+ IndexManager.setIndexBufferTime(30, 30);
// Now that we have a small delta, let's see if a query that was "stuck" would reevaluate
// appropriately
@@ -103,7 +103,7 @@ public class IndexManagerJUnitTest {
IndexManager.resetIndexBufferTime();
// fake entry update at LMT of 0 and actual time of 10
// safe query time set in index manager is going to be -10
- assertTrue(IndexManager.setIndexBufferTime(210, 200));
+ IndexManager.setIndexBufferTime(210, 200);
assertFalse(IndexManager.needsRecalculation(200, 190));
assertFalse(IndexManager.needsRecalculation(200, 200));
@@ -113,7 +113,7 @@ public class IndexManagerJUnitTest {
assertTrue(IndexManager.needsRecalculation(200, 221));
// now lets say an entry updates with no delta
- assertTrue(IndexManager.setIndexBufferTime(210, 210));
+ IndexManager.setIndexBufferTime(210, 210);
assertTrue(IndexManager.needsRecalculation(200, 190));
assertTrue(IndexManager.needsRecalculation(200, 200));
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java
new file mode 100644
index 0000000..4fc5f35
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/query/partitioned/PRColocatedEquiJoinTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.partitioned;
+
+import static org.apache.geode.cache.query.Utils.createNewPortfoliosAndPositions;
+import static org.apache.geode.cache.query.Utils.createPortfoliosAndPositions;
+
+import java.util.ArrayList;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import parReg.query.unittest.NewPortfolio;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.data.Portfolio;
+import org.apache.geode.cache.query.functional.StructSetOrResultsSet;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.categories.OQLQueryTest;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+@Category({OQLQueryTest.class})
+public class PRColocatedEquiJoinTest {
+ private static final int count = 100;
+
+ @Rule
+ public ServerStarterRule server = new ServerStarterRule().withAutoStart();
+
+ @Test
+ public void prQueryWithHeteroIndex() throws Exception {
+ InternalCache cache = server.getCache();
+ QueryService qs = cache.getQueryService();
+
+ // create a local and Partition region for 1st select query
+ Region r1 = server.createRegion(RegionShortcut.LOCAL, "region1",
+ rf -> rf.setValueConstraint(Portfolio.class));
+ qs.createIndex("IdIndex1", "r.ID", "/region1 r, r.positions.values pos");
+ Region r2 = server.createRegion(RegionShortcut.PARTITION, "region2",
+ rf -> rf.setValueConstraint(NewPortfolio.class));
+ qs.createIndex("IdIndex2", "r.id", "/region2 r");
+
+ // create two local regions for 2nd select query to compare the result set
+ Region r3 = server.createRegion(RegionShortcut.LOCAL, "region3",
+ rf -> rf.setValueConstraint(Portfolio.class));
+ Region r4 = server.createRegion(RegionShortcut.LOCAL, "region4",
+ rf -> rf.setValueConstraint(NewPortfolio.class));
+
+ Portfolio[] portfolio = createPortfoliosAndPositions(count);
+ NewPortfolio[] newPortfolio = createNewPortfoliosAndPositions(count);
+
+ for (int i = 0; i < count; i++) {
+ r1.put(new Integer(i), portfolio[i]);
+ r2.put(new Integer(i), newPortfolio[i]);
+ r3.put(new Integer(i), portfolio[i]);
+ r4.put(new Integer(i), newPortfolio[i]);
+ }
+
+ ArrayList results[][] = new ArrayList[whereClauses.length][2];
+ for (int i = 0; i < whereClauses.length; i++) {
+ // issue the first select on region 1 and region 2
+ SelectResults selectResults = (SelectResults) qs.newQuery("<trace> Select "
+ + (whereClauses[i].contains("ORDER BY") ? "DISTINCT" : "")
+ + "* from /region1 r1, /region2 r2 where " + whereClauses[i])
+ .execute();
+ results[i][0] = (ArrayList) selectResults.asList();
+
+ // issue the second select on region 3 and region 4
+ SelectResults queryResult = (SelectResults) qs.newQuery("<trace> Select "
+ + (whereClauses[i].contains("ORDER BY") ? "DISTINCT" : "")
+ + "* from /region3 r1, /region4 r2 where " + whereClauses[i])
+ .execute();
+ results[i][1] = (ArrayList) queryResult.asList();
+ }
+
+ // compare the resultsets and expect them to be equal
+ StructSetOrResultsSet ssORrs = new StructSetOrResultsSet();
+ ssORrs.CompareQueryResultsAsListWithoutAndWithIndexes(results, whereClauses.length, false,
+ false,
+ whereClauses);
+ }
+
+ private static String[] whereClauses = new String[] {"r2.ID = r1.id",
+ "r1.ID = r2.id AND r1.ID > 5",
+ "r1.ID = r2.id AND r1.status = 'active'",
+ "r1.ID = r2.id ORDER BY r1.ID", "r1.ID = r2.id ORDER BY r2.id",
+ "r1.ID = r2.id ORDER BY r2.status", "r1.ID = r2.id AND r1.status != r2.status",
+ "r1.ID = r2.id AND r1.status = r2.status",
+ "r1.ID = r2.id AND r1.positions.size = r2.positions.size",
+ "r1.ID = r2.id AND r1.positions.size > r2.positions.size",
+ "r1.ID = r2.id AND r1.positions.size < r2.positions.size",
+ "r1.ID = r2.id AND r1.positions.size = r2.positions.size AND r2.positions.size > 0",
+ "r1.ID = r2.id AND (r1.positions.size > r2.positions.size OR r2.positions.size > 0)",
+ "r1.ID = r2.id AND (r1.positions.size < r2.positions.size OR r1.positions.size > 0)",
+ };
+}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java
index 9e88df3..5c715ad 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/IndexTrackingQueryObserver.java
@@ -183,6 +183,7 @@ public class IndexTrackingQueryObserver extends QueryObserverAdapter {
return results.keySet();
}
+ // initial result of index in the observer. 0 means it's not updated yet.
public void addRegionId(String regionId) {
this.results.put(regionId, 0);
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
index ec7086d..ef23565 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/QueryUtils.java
@@ -482,7 +482,7 @@ public class QueryUtils {
private static void mergeAndExpandCutDownRelationshipIndexResults(Object[][] values,
SelectResults result, RuntimeIterator[][] indexFieldToItrsMapping,
ListIterator expansionListIterator, List finalItrs, ExecutionContext context,
- List[] checkList, CompiledValue iterOps, IndexCutDownExpansionHelper[] icdeh, int level)
+ CompiledValue iterOps, IndexCutDownExpansionHelper[] icdeh, int level)
throws FunctionDomainException, TypeMismatchException, NameResolutionException,
QueryInvocationTargetException {
@@ -506,7 +506,7 @@ public class QueryUtils {
}
} else {
mergeAndExpandCutDownRelationshipIndexResults(values, result, indexFieldToItrsMapping,
- expansionListIterator, finalItrs, context, checkList, iterOps, icdeh, level + 1);
+ expansionListIterator, finalItrs, context, iterOps, icdeh, level + 1);
if (icdeh[level + 1].cutDownNeeded) {
icdeh[level + 1].checkSet.clear();
}
@@ -715,7 +715,6 @@ public class QueryUtils {
// be a Compiled Region otherwise it will be a CompiledPath that
// we can extract the id from. In the end the result will be the alias which is used as a
// prefix
- CompiledValue collectionExpression = currentLevel.getCmpIteratorDefn().getCollectionExpr();
String key = null;
boolean useDerivedResults = true;
if (currentLevel.getCmpIteratorDefn().getCollectionExpr()
@@ -1250,7 +1249,7 @@ public class QueryUtils {
maxCartesianDepth);
} else {
mergeAndExpandCutDownRelationshipIndexResults(values, returnSet, mappings,
- expansionListIterator, finalList, context, totalCheckList, iterOperands, icdeh,
+ expansionListIterator, finalList, context, iterOperands, icdeh,
0);
}
if (icdeh[0].cutDownNeeded)
@@ -1472,7 +1471,6 @@ public class QueryUtils {
RuntimeIterator[][] mappings = new RuntimeIterator[2][];
mappings[0] = ich1.indexFieldToItrsMapping;
mappings[1] = ich2.indexFieldToItrsMapping;
- List[] totalCheckList = new List[] {ich1.checkList, ich2.checkList};
Iterator dataItr = data.iterator();
IndexCutDownExpansionHelper[] icdeh =
new IndexCutDownExpansionHelper[] {new IndexCutDownExpansionHelper(ich1.checkList, context),
@@ -1494,7 +1492,7 @@ public class QueryUtils {
// skip the similar row of a set , even when the row in its entirety is unique ( made by
// different data in the other set)
mergeAndExpandCutDownRelationshipIndexResults(values, returnSet, mappings,
- expansionListIterator, totalFinalList, context, totalCheckList, iterOperands, icdeh,
+ expansionListIterator, totalFinalList, context, iterOperands, icdeh,
0 /* Level */);
if (icdeh[0].cutDownNeeded)
icdeh[0].checkSet.clear();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
index de9c709..e3c33e8 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/IndexManager.java
@@ -181,11 +181,15 @@ public class IndexManager {
* aggressively. But the large hiccup will eventually be rolled off as time is always increasing
* This is a fix for #47475
*
- * @param operationTime the last modified time from version tag
+ * @param lastModifiedTime the last modified time from version tag
*/
- public static boolean setIndexBufferTime(long operationTime, long currentCacheTime) {
- long timeDifference = currentCacheTime - operationTime;
- return setNewLargestValue(SAFE_QUERY_TIME, currentCacheTime + timeDifference);
+ public static void setIndexBufferTime(long lastModifiedTime, long currentCacheTime) {
+ long oldValue = SAFE_QUERY_TIME.get();
+ long newValue = currentCacheTime + currentCacheTime - lastModifiedTime;
+ if (oldValue < newValue) {
+ // use compare and set in case the value has been changed since we got the old value
+ SAFE_QUERY_TIME.compareAndSet(oldValue, newValue);
+ }
}
/**
@@ -213,21 +217,9 @@ public class IndexManager {
* Small amounts of false positives are ok as it will have a slight impact on performance
*/
public static boolean needsRecalculation(long queryStartTime, long lastModifiedTime) {
- return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION
- && queryStartTime <= SAFE_QUERY_TIME.get() - queryStartTime + lastModifiedTime;
- }
-
- private static boolean setNewLargestValue(AtomicLong value, long newValue) {
- boolean done = false;
- while (!done) {
- long oldValue = value.get();
- if (oldValue < newValue) {
- return value.compareAndSet(oldValue, newValue);
- } else {
- done = true;
- }
- }
- return false;
+ boolean needsRecalculate =
+ (queryStartTime <= (lastModifiedTime + (SAFE_QUERY_TIME.get() - queryStartTime)));
+ return ENABLE_UPDATE_IN_PROGRESS_INDEX_CALCULATION && needsRecalculate;
}
/** Test Hook */
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java
index 28d693d..f7e5b1c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/index/RangeIndex.java
@@ -584,12 +584,9 @@ public class RangeIndex extends AbstractIndex {
Object innerEntry = null;
Object outerKey = null;
Object innerKey = null;
- // boolean incrementOuter = true;
boolean incrementInner = true;
outer: while (outer.hasNext()) {
- // if (incrementOuter) {
outerEntry = (Map.Entry) outer.next();
- // }
outerKey = outerEntry.getKey();
while (!incrementInner || inner.hasNext()) {
if (incrementInner) {
@@ -611,7 +608,9 @@ public class RangeIndex extends AbstractIndex {
} else {
innerValue = ((Map.Entry) innerEntry).getValue();
}
- populateListForEquiJoin(data, outerEntry.getValue(), innerValue, context, null);
+ // GEODE-5440: need to pass in the key value to do EquiJoin
+ populateListForEquiJoin(data, outerEntry.getValue(), innerValue, context,
+ outerEntry.getKey());
incrementInner = true;
continue outer;
} else if (compare < 0) {
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
index 234d90e..de71e7b 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/ClusterStartupRule.java
@@ -42,6 +42,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.DUnitEnv;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.RMIException;
import org.apache.geode.test.dunit.SerializableConsumerIF;
@@ -113,6 +114,13 @@ public class ClusterStartupRule extends ExternalResource implements Serializable
this.vmCount = vmCount;
}
+ /**
+ * Returns the port that the standard dunit locator is listening on.
+ */
+ public static int getDUnitLocatorPort() {
+ return DUnitEnv.get().getLocatorPort();
+ }
+
public static ClientCache getClientCache() {
if (clientCacheRule == null) {
return null;
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
index e1e7397..1b5d99f 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
@@ -22,8 +22,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.function.Consumer;
import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.internal.DistributionConfig;
@@ -167,6 +171,23 @@ public class ServerStarterRule extends MemberStarterRule<ServerStarterRule> impl
return this;
}
+ public Region createRegion(RegionShortcut type, String name,
+ Consumer<RegionFactory> regionFactoryConsumer) {
+ RegionFactory factory = getCache().createRegionFactory(type);
+ regionFactoryConsumer.accept(factory);
+ return factory.create(name);
+ }
+
+ public Region createPRRegion(String name, Consumer<RegionFactory> regionFactoryConsumer,
+ Consumer<PartitionAttributesFactory> prAttributesFactory) {
+ return createRegion(RegionShortcut.PARTITION, name, rf -> {
+ regionFactoryConsumer.accept(rf);
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ prAttributesFactory.accept(factory);
+ rf.setPartitionAttributes(factory.create());
+ });
+ }
+
public void startServer(Properties properties, int locatorPort) {
withProperties(properties).withConnectionToLocator(locatorPort).startServer();
}