You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2018/04/24 22:34:44 UTC
[geode] branch develop updated: Revert "GEODE-925: Replaced all
Wait.pause with Awaitility. Replaced all ThreadUtils.join with an
Awaitility. General code and lambda clean up"
This is an automated email from the ASF dual-hosted git repository.
jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 7c4c4e5 Revert "GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up"
7c4c4e5 is described below
commit 7c4c4e57cefcf49e2ea2a69250176c63ffcffc44
Author: Jason Huynh <hu...@gmail.com>
AuthorDate: Tue Apr 24 15:33:47 2018 -0700
Revert "GEODE-925: Replaced all Wait.pause with Awaitility. Replaced all ThreadUtils.join with an Awaitility. General code and lambda clean up"
This reverts commit 9145fa352e84724dd21d598a855210c71340acfd.
Reverting due to spotless issue
---
.../dunit/QueryDataInconsistencyDUnitTest.java | 745 ++++++++++++---------
1 file changed, 421 insertions(+), 324 deletions(-)
diff --git a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
index 3e6c192..20d3a78 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/QueryDataInconsistencyDUnitTest.java
@@ -14,23 +14,10 @@
*/
package org.apache.geode.cache.query.dunit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.assertj.core.api.Assertions.fail;
+import static org.junit.Assert.*;
-import java.io.Serializable;
import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.logging.log4j.Logger;
-import org.assertj.core.api.Assertions;
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -52,20 +39,18 @@ import org.apache.geode.cache.query.data.Position;
import org.apache.geode.cache.query.internal.QueryObserverHolder;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.execute.PRClientServerTestBase;
-import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.CleanupDUnitVMsRule;
-import org.apache.geode.test.dunit.rules.DistributedTestRule;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.categories.FlakyTest;
import org.apache.geode.test.junit.categories.OQLIndexTest;
@@ -74,418 +59,530 @@ import org.apache.geode.test.junit.categories.OQLIndexTest;
* This tests the data inconsistency during update on an index and querying the same UNLOCKED index.
*/
@Category({DistributedTest.class, OQLIndexTest.class})
-public class QueryDataInconsistencyDUnitTest implements Serializable {
+public class QueryDataInconsistencyDUnitTest extends JUnit4CacheTestCase {
private static final int cnt = 0;
+
private static final int cntDest = 10;
- private static VM server = null;
- private static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default name
- private static String repRegionName = "TestRepRegion"; // default name
- private static volatile boolean hooked = false;
- private Logger logger = LogService.getLogger(QueryDataInconsistencyDUnitTest.class.getName());
+ static VM server = null;
+
+ static VM client = null;
+
+ static Cache cache = null;
+
+ static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default
+ // name
+ static String repRegionName = "TestRepRegion"; // default name
+
+ static Integer serverPort1 = null;
+
+ public static int numOfBuckets = 20;
+
+ public static String[] queries =
+ new String[] {"select * from /" + PartitionedRegionName1 + " where ID=1",};
- @ClassRule
- public static DistributedTestRule distributedTestRule = new DistributedTestRule(1);
+ public static String[] queriesForRR =
+ new String[] {"<trace> select * from /" + repRegionName + " where ID=1"};
- @Rule
- public CacheRule cacheRule = CacheRule.builder().createCacheInAll().disconnectAfter().build();
+ public static volatile boolean hooked = false;
- @Before
- public void initialize()
- {
- server = Host.getHost(0).getVM(0);
+ public QueryDataInconsistencyDUnitTest() {
+ super();
}
- @After
- public final void postTearDownCacheTestCase() {
+ @Override
+ public final void postTearDownCacheTestCase() throws Exception {
+ Invoke.invokeInEveryVM(JUnit4DistributedTestCase::disconnectFromDS);
Invoke.invokeInEveryVM(QueryObserverHolder::reset);
}
- @BeforeClass
- public static final void postSetUp() {
- Awaitility.setDefaultPollInterval(200, TimeUnit.MILLISECONDS);
- Awaitility.waitAtMost(30, TimeUnit.SECONDS);
+ @Override
+ public final void postSetUp() throws Exception {
+ Host host = Host.getHost(0);
+ server = host.getVM(0);
}
@Test
public void testCompactRangeIndex() {
// Create caches
- server.invoke("create indexes", () -> {
- Cache cache = cacheRule.getCache();
- Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+ Properties props = new Properties();
+ server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
- // Create common Portflios and NewPortfolios
- for (int j = cnt; j < cntDest; j++) {
- region.put(new Integer(j), new Portfolio(j));
- }
+ server.invoke(new CacheSerializableRunnable("create indexes") {
+
+ @Override
+ public void run2() throws CacheException {
+ cache = CacheFactory.getAnyInstance();
+ Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+
+ // Create common Portflios and NewPortfolios
+ for (int j = cnt; j < cntDest; j++) {
+ region.put(new Integer(j), new Portfolio(j));
+ }
- QueryService queryService = cache.getQueryService();
- try {
- Index index = queryService.createIndex("idIndex", "ID", "/" + repRegionName);
- assertEquals(10, index.getStatistics().getNumberOfKeys());
- } catch (Exception e) {
- logger.error(e);
- fail("Index creation failed");
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ try {
+ Index index = qs.createIndex("idIndex", "ID", "/" + repRegionName);
+ assertEquals(10, index.getStatistics().getNumberOfKeys());
+ } catch (Exception e) {
+ fail("Index creation failed");
+ }
}
});
// Invoke update from client and stop in updateIndex
- // firesultSett before updating the RegionEntry and second after updating
+ // first before updating the RegionEntry and second after updating
// the RegionEntry.
- AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
- Region repRegion = cacheRule.getCache().getRegion(repRegionName);
- IndexManager.testHook = new IndexManagerTestHook();
- repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
- // above call must be hooked in BEFORE_UPDATE_OP call.
- });
- server.invoke("query on server", () -> {
- QueryService queryService = cacheRule.getCache().getQueryService();
- Awaitility.await().until(() -> hooked);
- Object resultSet = null;
- try {
- resultSet =
- queryService.newQuery("<trace> select * from /" + repRegionName + " where ID = 1")
- .execute();
- } catch (Exception e) {
- logger.error(e);
- fail("Query execution failed on server.");
- IndexManager.testHook = null;
- }
- assertTrue(resultSet instanceof SelectResults);
- assertEquals(1, ((SelectResults) resultSet).size());
- Portfolio p1 = (Portfolio) ((SelectResults) resultSet).asList().get(0);
- if (p1.getID() != 1) {
- fail("Query thread did not verify index results even when RE is under update");
- IndexManager.testHook = null;
+ AsyncInvocation putThread =
+ server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
+
+ @Override
+ public void run2() throws CacheException {
+ Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+ IndexManager.testHook = new IndexManagerTestHook();
+ repRegion.put(new Integer("1"), new Portfolio(cntDest + 1));
+ // above call must be hooked in BEFORE_UPDATE_OP call.
+ }
+ });
+ server.invoke(new CacheSerializableRunnable("query on server") {
+
+ @Override
+ public void run2() throws CacheException {
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ Object rs = null;
+ try {
+ rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Query execution failed on server.");
+ IndexManager.testHook = null;
+ }
+ assertTrue(rs instanceof SelectResults);
+ assertEquals(1, ((SelectResults) rs).size());
+ Portfolio p1 = (Portfolio) ((SelectResults) rs).asList().get(0);
+ if (p1.getID() != 1) {
+ fail("Query thread did not verify index results even when RE is under update");
+ IndexManager.testHook = null;
+ }
+ hooked = false;// Let client put go further.
}
- hooked = false;// Let client put go further.
});
// Client put is again hooked in AFTER_UPDATE_OP call in updateIndex.
- server.invoke("query on server", () -> {
- QueryService queryService = cacheRule.getCache().getQueryService();
- Awaitility.await().until(() -> hooked);
- Object resultSet = null;
- try {
- resultSet =
- queryService.newQuery("<trace> select * from /" + repRegionName + " where ID = 1")
- .execute();
- } catch (Exception e) {
- logger.error(e);
- fail("Query execution failed on server." + e.getMessage());
- } finally {
- IndexManager.testHook = null;
- }
- assertTrue(resultSet instanceof SelectResults);
- if (((SelectResults) resultSet).size() > 0) {
- Portfolio p1 = (Portfolio) ((SelectResults) resultSet).iterator().next();
- if (p1.getID() != 1) {
- fail("Query thread did not verify index results even when RE is under update and "
- + "RegionEntry value has been modified before releasing the lock");
+ server.invoke(new CacheSerializableRunnable("query on server") {
+
+ @Override
+ public void run2() throws CacheException {
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ Object rs = null;
+ try {
+ rs = qs.newQuery("<trace> select * from /" + repRegionName + " where ID = 1").execute();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Query execution failed on server." + e.getMessage());
+ } finally {
IndexManager.testHook = null;
}
+ assertTrue(rs instanceof SelectResults);
+ if (((SelectResults) rs).size() > 0) {
+ Portfolio p1 = (Portfolio) ((SelectResults) rs).iterator().next();
+ if (p1.getID() != 1) {
+ fail("Query thread did not verify index results even when RE is under update and "
+ + "RegionEntry value has been modified before releasing the lock");
+ IndexManager.testHook = null;
+ }
+ }
+ hooked = false;// Let client put go further.
}
- hooked = false;// Let client put go further.
});
- Awaitility.await().until(joinThread(putThread));
- // ThreadUtils.join(putThread, 200);
+ ThreadUtils.join(putThread, 200);
}
@Test
public void testRangeIndex() {
// Create caches
- server.invoke("create indexes", () -> {
- Cache cache = cacheRule.getCache();
- Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
- IndexManager.testHook = null;
- // Create common Portfolios and NewPortfolios
- Position.cnt = 0;
- for (int j = cnt; j < cntDest; j++) {
- Portfolio p = new Portfolio(j);
- cache.getLogger().fine("Shobhit: portfolio " + j + " : " + p);
- region.put(j, p);
- }
+ Properties props = new Properties();
+ server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
+
+ server.invoke(new CacheSerializableRunnable("create indexes") {
- QueryService queryService = cache.getQueryService();
- try {
- Index index = queryService.createIndex("posIndex", "pos.secId",
- "/" + repRegionName + " p, p.positions.values pos");
- assertEquals(12, index.getStatistics().getNumberOfKeys());
- } catch (Exception e) {
- logger.error(e);
- fail("Index creation failed");
+ @Override
+ public void run2() throws CacheException {
+ cache = CacheFactory.getAnyInstance();
+ Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+ IndexManager.testHook = null;
+ // Create common Portfolios and NewPortfolios
+ Position.cnt = 0;
+ for (int j = cnt; j < cntDest; j++) {
+ Portfolio p = new Portfolio(j);
+ CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio " + j + " : " + p);
+ region.put(new Integer(j), p);
+ }
+
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ try {
+ Index index = qs.createIndex("posIndex", "pos.secId",
+ "/" + repRegionName + " p, p.positions.values pos");
+ assertEquals(12, index.getStatistics().getNumberOfKeys());
+ } catch (Exception e) {
+ fail("Index creation failed");
+ }
}
});
// Invoke update from client and stop in updateIndex
- // firesultSett before updating the RegionEntry and second after updating
+ // first before updating the RegionEntry and second after updating
// the RegionEntry.
- AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
- Cache cache = cacheRule.getCache();
- Region repRegion = cache.getRegion(repRegionName);
- IndexManager.testHook = new IndexManagerTestHook();
- Portfolio newPort = new Portfolio(cntDest + 1);
- cache.getLogger().fine("Shobhit: New Portfolio" + newPort);
- repRegion.put(new Integer("1"), newPort);
- // above call must be hooked in BEFORE_UPDATE_OP call.
- });
+ AsyncInvocation putThread =
+ server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
- server.invoke("query on server", () -> {
- Cache cache = cacheRule.getCache();
- QueryService queryService = cache.getQueryService();
- Position pos1 = null;
- Awaitility.await().until(() -> hooked);
- try {
- Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
- + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
- cache.getLogger().fine("Shobhit: " + resultSet);
- assertTrue(resultSet instanceof SelectResults);
- pos1 = (Position) ((SelectResults) resultSet).iterator().next();
- if (!pos1.secId.equals("APPL")) {
- fail("Query thread did not verify index results even when RE is under update");
+ @Override
+ public void run2() throws CacheException {
+ Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+ IndexManager.testHook = new IndexManagerTestHook();
+ Portfolio newPort = new Portfolio(cntDest + 1);
+ CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio" + newPort);
+ repRegion.put(new Integer("1"), newPort);
+ // above call must be hooked in BEFORE_UPDATE_OP call.
+ }
+ });
+
+ server.invoke(new CacheSerializableRunnable("query on server") {
+
+ @Override
+ public void run2() throws CacheException {
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ Position pos1 = null;
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ try {
+ Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+ + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+ CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+ assertTrue(rs instanceof SelectResults);
+ pos1 = (Position) ((SelectResults) rs).iterator().next();
+ if (!pos1.secId.equals("APPL")) {
+ fail("Query thread did not verify index results even when RE is under update");
+ IndexManager.testHook = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Query execution failed on server.", e);
IndexManager.testHook = null;
+ } finally {
+ hooked = false;// Let client put go further.
}
- } catch (Exception e) {
- logger.error(e);
- fail("Query execution failed on server.", e);
- IndexManager.testHook = null;
- } finally {
- hooked = false;// Let client put go further.
- }
- Awaitility.await().until(() -> hooked);
- try {
- Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
- + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
- cache.getLogger().fine("Shobhit: " + resultSet);
- assertTrue(resultSet instanceof SelectResults);
- if (((SelectResults) resultSet).size() > 0) {
- Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
- if (pos2.equals(pos1)) {
- fail("Query thread did not verify index results even when RE is under update and "
- + "RegionEntry value has been modified before releasing the lock");
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ try {
+ Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+ + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+ CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+ assertTrue(rs instanceof SelectResults);
+ if (((SelectResults) rs).size() > 0) {
+ Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+ if (pos2.equals(pos1)) {
+ fail("Query thread did not verify index results even when RE is under update and "
+ + "RegionEntry value has been modified before releasing the lock");
+ }
}
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Query execution failed on server.");
+ } finally {
+ hooked = false;// Let client put go further.
+ IndexManager.testHook = null;
}
- } catch (Exception e) {
- logger.error(e);
- fail("Query execution failed on server.");
- } finally {
- hooked = false;// Let client put go further.
- IndexManager.testHook = null;
}
});
- Awaitility.await().until(joinThread(putThread));
+ ThreadUtils.join(putThread, 200);
}
@Category(FlakyTest.class) // GEODE-925: time sensitive, async actions, short timeouts
@Test
- public void testRangeIndexWithIndexAndQueryFromClauseMisMatch() {
+ public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() { // TODO: fix misspelling
// Create caches
- server.invoke("create indexes", () -> {
- Cache cache = cacheRule.getCache();
- Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
- IndexManager.testHook = null;
- // Create common Portfolios and NewPortfolios
- Position.cnt = 0;
- for (int j = cnt; j < cntDest; j++) {
- region.put(j, new Portfolio(j));
- }
+ Properties props = new Properties();
+ server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
+
+ server.invoke(new CacheSerializableRunnable("create indexes") {
+
+ @Override
+ public void run2() throws CacheException {
+ cache = CacheFactory.getAnyInstance();
+ Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+ IndexManager.testHook = null;
+ // Create common Portfolios and NewPortfolios
+ Position.cnt = 0;
+ for (int j = cnt; j < cntDest; j++) {
+ region.put(new Integer(j), new Portfolio(j));
+ }
- QueryService queryService = cache.getQueryService();
- try {
- Index index = queryService.createIndex("posIndex", "pos.secId",
- "/" + repRegionName + " p, p.collectionHolderMap.values coll, p.positions.values pos");
- assertEquals(12, index.getStatistics().getNumberOfKeys());
- } catch (Exception e) {
- logger.error(e);
- fail("Index creation failed");
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ try {
+ Index index = qs.createIndex("posIndex", "pos.secId", "/" + repRegionName
+ + " p, p.collectionHolderMap.values coll, p.positions.values pos");
+ assertEquals(12, index.getStatistics().getNumberOfKeys());
+ } catch (Exception e) {
+ fail("Index creation failed");
+ }
}
});
// Invoke update from client and stop in updateIndex
- // firesultSett before updating the RegionEntry and second after updating
+ // first before updating the RegionEntry and second after updating
// the RegionEntry.
- AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
- Region repRegion = cacheRule.getCache().getRegion(repRegionName);
- IndexManager.testHook = new IndexManagerTestHook();
- // This portfolio with same ID must have different positions.
- repRegion.put(new Integer("1"), new Portfolio(1));
- // above call must be hooked in BEFORE_UPDATE_OP call.
- });
+ AsyncInvocation putThread =
+ server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
- server.invoke("query on server", () -> {
- Cache cache = cacheRule.getCache();
- QueryService queryService = cache.getQueryService();
- Position pos1 = null;
- Awaitility.await().until(() -> hooked);
- try {
- Object resultSet = queryService.newQuery("<trace> select pos from /" + repRegionName
- + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
- cache.getLogger().fine("Shobhit: " + resultSet);
- assertTrue(resultSet instanceof SelectResults);
- pos1 = (Position) ((SelectResults) resultSet).iterator().next();
- if (!pos1.secId.equals("APPL")) {
- fail("Query thread did not verify index results even when RE is under update");
+ @Override
+ public void run2() throws CacheException {
+ Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+ IndexManager.testHook = new IndexManagerTestHook();
+ // This portfolio with same ID must have different positions.
+ repRegion.put(new Integer("1"), new Portfolio(1));
+ // above call must be hooked in BEFORE_UPDATE_OP call.
+ }
+ });
+
+ server.invoke(new CacheSerializableRunnable("query on server") {
+
+ @Override
+ public void run2() throws CacheException {
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ Position pos1 = null;
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ try {
+ Object rs = qs.newQuery("<trace> select pos from /" + repRegionName
+ + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+ CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+ assertTrue(rs instanceof SelectResults);
+ pos1 = (Position) ((SelectResults) rs).iterator().next();
+ if (!pos1.secId.equals("APPL")) {
+ fail("Query thread did not verify index results even when RE is under update");
+ IndexManager.testHook = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Query execution failed on server.", e);
IndexManager.testHook = null;
+ } finally {
+ hooked = false;// Let client put go further.
}
- } catch (Exception e) {
- logger.error(e);
- fail("Query execution failed on server.");
- IndexManager.testHook = null;
- } finally {
- hooked = false;// Let client put go further.
- }
- Awaitility.await().until(() -> hooked);
- try {
- Object resultSet = queryService.newQuery("select pos from /" + repRegionName
- + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
- assertTrue(resultSet instanceof SelectResults);
- if (((SelectResults) resultSet).size() > 0) {
- Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
- if (pos2.equals(pos1)) {
- fail("Query thread did not verify index results even when RE is under update and "
- + "RegionEntry value has been modified before releasing the lock");
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ try {
+ Object rs = qs.newQuery("select pos from /" + repRegionName
+ + " p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
+ assertTrue(rs instanceof SelectResults);
+ if (((SelectResults) rs).size() > 0) {
+ Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+ if (pos2.equals(pos1)) {
+ fail("Query thread did not verify index results even when RE is under update and "
+ + "RegionEntry value has been modified before releasing the lock");
+ }
}
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Query execution failed on server.");
+ } finally {
+ hooked = false;// Let client put go further.
+ IndexManager.testHook = null;
}
- } catch (Exception e) {
- logger.error(e);
- fail("Query execution failed on server.");
- } finally {
- hooked = false;// Let client put go further.
- IndexManager.testHook = null;
}
});
- Awaitility.await().until(joinThread(putThread));
- // ThreadUtils.join(putThread, 200); // GEODE-925 occuresultSet here and this is very short join 200
- // millis
+ ThreadUtils.join(putThread, 200); // GEODE-925 occurs here and this is very short join 200
+ // millis
}
@Test
- public void testRangeIndexWithIndexAndQueryFromClauseMisMatch2() {
+ public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() {
// Create caches
- server.invoke("create indexes", () -> {
- Cache cache = cacheRule.getCache();
- Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
- IndexManager.testHook = null;
- // Create common Portfolios and NewPortfolios
- Position.cnt = 0;
- for (int j = cnt; j < cntDest; j++) {
- region.put(new Integer(j), new Portfolio(j));
- }
+ Properties props = new Properties();
+ server.invoke(() -> PRClientServerTestBase.createCacheInVm(props));
- QueryService queryService = cache.getQueryService();
- try {
- Index index = queryService.createIndex("posIndex", "pos.secId",
- "/" + repRegionName + " p, p.positions.values pos");
- assertEquals(12, index.getStatistics().getNumberOfKeys());
- } catch (Exception e) {
- logger.error(e);
- fail("Index creation failed");
+ server.invoke(new CacheSerializableRunnable("create indexes") {
+
+ @Override
+ public void run2() throws CacheException {
+ cache = CacheFactory.getAnyInstance();
+ Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
+ IndexManager.testHook = null;
+ // Create common Portfolios and NewPortfolios
+ Position.cnt = 0;
+ for (int j = cnt; j < cntDest; j++) {
+ region.put(new Integer(j), new Portfolio(j));
+ }
+
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ try {
+ Index index = qs.createIndex("posIndex", "pos.secId",
+ "/" + repRegionName + " p, p.positions.values pos");
+ assertEquals(12, index.getStatistics().getNumberOfKeys());
+ } catch (Exception e) {
+ fail("Index creation failed");
+ }
}
});
// Invoke update from client and stop in updateIndex
- // firesultSett before updating the RegionEntry and second after updating
+ // first before updating the RegionEntry and second after updating
// the RegionEntry.
- AsyncInvocation putThread = server.invokeAsync("update a Region Entry", () -> {
- Cache cache = cacheRule.getCache();
- Region repRegion = cache.getRegion(repRegionName);
- IndexManager.testHook = new IndexManagerTestHook();
- // This portfolio with same ID must have different positions.
- repRegion.put(new Integer("1"), new Portfolio(1));
- // above call must be hooked in BEFORE_UPDATE_OP call.
- });
+ AsyncInvocation putThread =
+ server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
- server.invoke("query on server", () -> {
- Cache cache = cacheRule.getCache();
- QueryService queryService = cache.getQueryService();
- Position pos1 = null;
- Awaitility.await().until(() -> hooked);
- try {
- Object resultSet = queryService
- .newQuery("<trace> select pos from /" + repRegionName
- + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
- .execute();
- cache.getLogger().fine("Shobhit: " + resultSet);
- assertTrue(resultSet instanceof SelectResults);
- pos1 = (Position) ((SelectResults) resultSet).iterator().next();
- if (!pos1.secId.equals("APPL")) {
- fail("Query thread did not verify index results even when RE is under update");
+ @Override
+ public void run2() throws CacheException {
+ Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
+ IndexManager.testHook = new IndexManagerTestHook();
+ // This portfolio with same ID must have different positions.
+ repRegion.put(new Integer("1"), new Portfolio(1));
+ // above call must be hooked in BEFORE_UPDATE_OP call.
+ }
+ });
+
+ server.invoke(new CacheSerializableRunnable("query on server") {
+
+ @Override
+ public void run2() throws CacheException {
+ QueryService qs = CacheFactory.getAnyInstance().getQueryService();
+ Position pos1 = null;
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ try {
+ Object rs = qs
+ .newQuery("<trace> select pos from /" + repRegionName
+ + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
+ .execute();
+ CacheFactory.getAnyInstance().getLogger().fine("Shobhit: " + rs);
+ assertTrue(rs instanceof SelectResults);
+ pos1 = (Position) ((SelectResults) rs).iterator().next();
+ if (!pos1.secId.equals("APPL")) {
+ fail("Query thread did not verify index results even when RE is under update");
+ IndexManager.testHook = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Query execution failed on server.", e);
IndexManager.testHook = null;
+ } finally {
+ hooked = false;// Let client put go further.
}
- } catch (Exception e) {
- logger.error(e);
- Assertions.fail("Query execution failed on server.", e);
- IndexManager.testHook = null;
- } finally {
- hooked = false;// Let client put go further.
- }
- Awaitility.await().until(() -> hooked);
-
- try {
- Object resultSet = queryService
- .newQuery("select pos from /" + repRegionName
- + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
- .execute();
- assertTrue(resultSet instanceof SelectResults);
- if (((SelectResults) resultSet).size() > 0) {
- Position pos2 = (Position) ((SelectResults) resultSet).iterator().next();
- if (pos2.equals(pos1)) {
- fail("Query thread did not verify index results even when RE is under update and "
- + "RegionEntry value has been modified before releasing the lock");
+ while (!hooked) {
+ Wait.pause(100);
+ }
+ try {
+ Object rs = qs
+ .newQuery("select pos from /" + repRegionName
+ + " p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1")
+ .execute();
+ assertTrue(rs instanceof SelectResults);
+ if (((SelectResults) rs).size() > 0) {
+ Position pos2 = (Position) ((SelectResults) rs).iterator().next();
+ if (pos2.equals(pos1)) {
+ fail("Query thread did not verify index results even when RE is under update and "
+ + "RegionEntry value has been modified before releasing the lock");
+ }
}
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Query execution failed on server.");
+ } finally {
+ IndexManager.testHook = null;
+ hooked = false;// Let client put go further.
}
- } catch (Exception e) {
- logger.error(e);
- fail("Query execution failed on server.");
- } finally {
- IndexManager.testHook = null;
- hooked = false;// Let client put go further.
}
});
- Awaitility.await().until(joinThread(putThread));
+ ThreadUtils.join(putThread, 200);
}
- private Callable<Boolean> joinThread(AsyncInvocation thread) {
- return () -> {
- try {
- thread.join(100L);
- } catch (InterruptedException e) {
- return false;
- }
- if (thread.isAlive()) {
- return false;
- }
- return true;
- };
+ public static void createProxyRegions() {
+ new QueryDataInconsistencyDUnitTest().createProxyRegs();
}
private void createProxyRegs() {
- ClientCache cache = (ClientCache) cacheRule.getCache();
+ ClientCache cache = (ClientCache) CacheFactory.getAnyInstance();
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(repRegionName);
+
+ /*
+ * cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create( PartitionedRegionName1);
+ */
+ }
+
+ public static void createNewPR() {
+ new QueryDataInconsistencyDUnitTest().createPR();
}
public void createPR() {
PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
- Cache cache = cacheRule.getCache();
- int numOfBuckets = 20;
+ cache = CacheFactory.getAnyInstance();
cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
.setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(numOfBuckets)
.setPartitionResolver(testKeyBasedResolver).create())
.create(PartitionedRegionName1);
}
+ public static void createCacheClientWithoutRegion(String host, Integer port1) {
+ new QueryDataInconsistencyDUnitTest().createCacheClientWithoutReg(host, port1);
+ }
+
+ private void createCacheClientWithoutReg(String host, Integer port1) {
+ disconnectFromDS();
+ new ClientCacheFactory().addPoolServer(host, port1).create();
+ }
+
+ /**
+ * This function puts portfolio objects into the created Region (PR or Local) *
+ *
+ * @return cacheSerializable object
+ */
+ public CacheSerializableRunnable getCacheSerializableRunnableForPRPuts(final String regionName,
+ final Object[] portfolio, final int from, final int to) {
+ SerializableRunnable puts = new CacheSerializableRunnable("Region Puts") {
+ @Override
+ public void run2() throws CacheException {
+ Cache cache = CacheFactory.getAnyInstance();
+ Region region = cache.getRegion(repRegionName);
+ for (int j = from; j < to; j++)
+ region.put(new Integer(j), portfolio[j]);
+ LogWriterUtils.getLogWriter().info(
+ "PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio data on Region "
+ + regionName);
+ }
+ };
+ return (CacheSerializableRunnable) puts;
+ }
+
public class IndexManagerTestHook
implements org.apache.geode.cache.query.internal.index.IndexManager.TestHook {
public void hook(final int spot) throws RuntimeException {
switch (spot) {
case 9: // Before Index update and after region entry lock.
hooked = true;
- logger
+ LogWriterUtils.getLogWriter()
.info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index Entry.");
- Awaitility.await().until(() -> !hooked);
+ while (hooked) {
+ Wait.pause(100);
+ }
+ assertEquals(hooked, false);
break;
case 10: // Before Region update and after Index Remove call.
hooked = true;
- logger
+ LogWriterUtils.getLogWriter()
.info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index Entry.");
- Awaitility.await().until(() -> !hooked);
+ while (hooked) {
+ Wait.pause(100);
+ }
+ assertEquals(hooked, false);
break;
default:
break;
--
To stop receiving notification emails like this one, please contact
jasonhuynh@apache.org.