You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/03/31 00:31:14 UTC

[GitHub] [geode] DonalEvans commented on a change in pull request #6232: GEODE-8926: Calculate filter information after tx is applied to cache.

DonalEvans commented on a change in pull request #6232:
URL: https://github.com/apache/geode/pull/6232#discussion_r604450874



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();

Review comment:
       The compiler warning here can be fixed by using `new HashMap<>();`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();

Review comment:
       Compiler warning here can be fixed by using `new ArrayList<>();`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
+    int totalBucketsToQuery = bucketIdsToConsider.size();
     for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()
         && (bucketIds.size() < totalBucketsToQuery);) {
       InternalDistributedMember nd = (InternalDistributedMember) dsItr.next();
-
       final List<Integer> buckets = new ArrayList<Integer>();

Review comment:
       This can be just `new ArrayList<>();`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));

Review comment:
       This can be just `bucketIds.add(bid);`

##########
File path: geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartCQ");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      blackboard.waitForGate("StartCQ");
+      SelectResults cqResults = queryService
+          .newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+      blackboard.signalGate("RegistrationFinished");
+      blackboard.setMailbox("CqQueryResultCount", cqResults.asList().size());
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqQueryResultCount = blackboard.getMailbox("CqQueryResultCount");
+      Integer CqEvents = blackboard.getMailbox("CqEvents");
+      assertThat(CqQueryResultCount + CqEvents).isEqualTo(1);
+    });
+
+    serverAsync.await();
+  }
+
+  @Test
+  public void verifyCqEventInvocationIfTxCommitFromClient() throws Exception {
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server1.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      queryService.newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      TXManagerImpl txManager = (TXManagerImpl) clientCache.getCacheTransactionManager();
+      txManager.begin();
+
+      clientCache.getRegion(REGION_NAME).destroy("Key-1");
+      txManager.commit();
+    });
+
+    GeodeAwaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {

Review comment:
       It's recommended to use the default timeout for `await()` unless there is a specific timeout that's being tested as using shorter values can make flaky test failures more likely.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
+    int totalBucketsToQuery = bucketIdsToConsider.size();
     for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()

Review comment:
       Compiler warning here can be fixed by using `Iterator<InternalDistributedMember>`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);

Review comment:
       Compiler warning here can be fixed by using `new ArrayList<>(bucketIds);`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();

Review comment:
       Compiler warning here can be fixed by using `new ArrayList<>();`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -909,28 +939,22 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
         }
       }
       if (!buckets.isEmpty()) {
-        ret.put(nd, buckets);
+        nodeToBucketMap.put(nd, buckets);
       }
     }
+    return bucketIds;
+  }
 
-    if (bucketIds.size() != totalBucketsToQuery) {
-      bucketIdsToConsider.removeAll(bucketIds);
-      throw new QueryException("Data loss detected, unable to find the hosting "
-          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Node to bucketId map: {}", ret);
-    }
-    return ret;
+  private InternalDistributedMember getPrimaryBucketOwner(Integer bid) {
+    return pr.getBucketPrimary(bid.intValue());
   }
 
   protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
     return pr.getRegionAdvisor().getBucketOwners(bid.intValue());
   }

Review comment:
       These methods are both one line and only used once in this class and nowhere else. Can they just be used inline? Also, it's not necessary to call `.intValue()` on `bid` here.

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -854,50 +854,80 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
           bucketIds.add(Integer.valueOf(bid));
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
+    int totalBucketsToQuery = bucketIdsToConsider.size();
     for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()
         && (bucketIds.size() < totalBucketsToQuery);) {
       InternalDistributedMember nd = (InternalDistributedMember) dsItr.next();

Review comment:
       If the change on line 928 is applied, this cast is redundant.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
##########
@@ -92,6 +98,26 @@ public void doAfterCompletionThrowsIfCommitFails() {
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
+  @Test
+  public void attacheFilterProfileAfterApplyingChagnes() {
+    TXState txState = spy(new TXState(txStateProxy, false, disabledClock()));
+    ArrayList entries = mock(ArrayList.class);
+    doReturn(entries).when(txState).generateEventOffsets();
+    doNothing().when(txState).attachFilterProfileInformation(entries);
+    doNothing().when(txState).applyChanges(entries);

Review comment:
       It's generally best not to mock collections, especially when we don't care about the contents, so this could be changed to:
   ```suggestion
       doReturn(new ArrayList<>()).when(txState).generateEventOffsets();
       doNothing().when(txState).attachFilterProfileInformation(any());
       doNothing().when(txState).applyChanges(any());
   ```

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
##########
@@ -605,6 +604,18 @@ protected void attachFilterProfileInformation(List entries) {
               o.es.setFilterRoutingInfo(fri);
               Set set = bucket.getAdjunctReceivers(ev, Collections.EMPTY_SET, new HashSet(), fri);
               o.es.setAdjunctRecipients(set);
+
+              if (o.es.getPendingCallback() != null) {
+                if (fri != null) {
+                  // For tx host, local filter info was also calculated.
+                  // Set this local filter info in corresponding pending callback so that
+                  // notifyBridgeClient has correct routing info.
+                  FilterRoutingInfo.FilterInfo localRouting = fri.getLocalFilterInfo();
+                  o.es.getPendingCallback().setLocalFilterInfo(localRouting);
+                }
+                // do not hold pending callback reference after setting local routing.
+                o.es.setPendingCallback(null);

Review comment:
       It's possible at this point that `fri` was null and so no local routing was set, but the pending callback reference is still set to null. Is this correct? If so, the comment could be changed a little to make this clear.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);

Review comment:
       Compiler warnings can be fixed here and in the other added test cases by using `Set<Integer> bucketSet`

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(3)).getPrimaryBucketOwner(1);

Review comment:
       See above comment about using `doReturn()` with a spy.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);

Review comment:
       Correct invocation counts can be had if instead of `when(x).thenReturn(y)` you use `doReturn(y).when(x)` when using a spy:
   ```suggestion
           doReturn(null).doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
         } else {
           doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
   ```

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {

Review comment:
       Instead of hard-coding this bucketId, it might be good to extract it to a variable, since it's used in the verify step too.

##########
File path: geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())

Review comment:
       Compiler warning here and elsewhere in the class can be fixed by using `new PartitionAttributesFactory<>()`

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
##########
@@ -909,28 +939,22 @@ private SelectResults buildSortedResult(CompiledSelect cs, int limit) throws Que
         }
       }
       if (!buckets.isEmpty()) {
-        ret.put(nd, buckets);
+        nodeToBucketMap.put(nd, buckets);
       }
     }
+    return bucketIds;
+  }
 
-    if (bucketIds.size() != totalBucketsToQuery) {
-      bucketIdsToConsider.removeAll(bucketIds);
-      throw new QueryException("Data loss detected, unable to find the hosting "
-          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Node to bucketId map: {}", ret);
-    }
-    return ret;
+  private InternalDistributedMember getPrimaryBucketOwner(Integer bid) {
+    return pr.getBucketPrimary(bid.intValue());
   }
 
   protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
     return pr.getRegionAdvisor().getBucketOwners(bid.intValue());
   }
 
-  protected ArrayList getAllNodes(RegionAdvisor regionAdvisor) {
-    ArrayList nodes = new ArrayList(regionAdvisor.adviseDataStore());
+  protected ArrayList<InternalDistributedMember> getAllNodes(RegionAdvisor regionAdvisor) {
+    ArrayList<InternalDistributedMember> nodes = new ArrayList(regionAdvisor.adviseDataStore());

Review comment:
       Compiler warning here can be fixed by using `new ArrayList<>(regionAdvisor.adviseDataStore());`

##########
File path: geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());

Review comment:
       It is not necessary to pass a `Properties` as an argument to `startLocator()` or `startServer()`, so the variable can be removed here.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(3)).getPrimaryBucketOwner(1);
+  }
+
+  @Test
+  public void exceptionIsThrownWhenPrimaryBucketNodeIsNotFoundForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    assertThatThrownBy(() -> prqe.buildNodeToBucketMap()).isInstanceOf(QueryException.class)
+        .hasMessageContaining(
+            "Data loss detected, unable to find the hosting  node for some of the dataset.");
+
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(4)).getPrimaryBucketOwner(1);

Review comment:
       See above comments about using `doReturn()` with a spy and extracting the bucketId to a variable.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
##########
@@ -92,6 +98,26 @@ public void doAfterCompletionThrowsIfCommitFails() {
         .isSameAs(transactionDataNodeHasDepartedException);
   }
 
+  @Test
+  public void attacheFilterProfileAfterApplyingChagnes() {

Review comment:
       Typo here, should be "attachFilterProfileAfterApplyingChanges"

##########
File path: geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");

Review comment:
       This can be replaced with `PartitionRegionHelper.assignBucketsToPartitions(region);`, which creates buckets without needing to put data into the region.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
##########
@@ -239,6 +244,80 @@ public void testGetAllNodesShouldBeRandomized() {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == 1) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(null).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    // Called 3 times : 2 times in retry and once for setting the return value
+    verify(prqe, times(3)).getPrimaryBucketOwner(1);
+  }
+
+  @Test
+  public void exceptionIsThrownWhenPrimaryBucketNodeIsNotFoundForCqQuery() throws Exception {

Review comment:
       Exception is never thrown from this method, so this can be removed.

##########
File path: geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {

Review comment:
       There is quite a lot of duplicate code in the tests in this class. Would it be possible to move the cluster startup and region creation to a `@Before` method?

##########
File path: geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartCQ");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      blackboard.waitForGate("StartCQ");
+      SelectResults cqResults = queryService

Review comment:
       Compiler warning here can be fixed by using `SelectResults<Object>`

##########
File path: geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+      region.destroy("Key-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartCQ");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      blackboard.waitForGate("StartCQ");
+      SelectResults cqResults = queryService
+          .newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+      blackboard.signalGate("RegistrationFinished");
+      blackboard.setMailbox("CqQueryResultCount", cqResults.asList().size());
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqQueryResultCount = blackboard.getMailbox("CqQueryResultCount");
+      Integer CqEvents = blackboard.getMailbox("CqEvents");
+      assertThat(CqQueryResultCount + CqEvents).isEqualTo(1);
+    });
+
+    serverAsync.await();
+  }
+
+  @Test
+  public void verifyCqEventInvocationIfTxCommitFromClient() throws Exception {
+    blackboard.setMailbox("CqEvents", 0);
+
+    Properties properties = new Properties();
+    locator = clusterStartupRule.startLocatorVM(0, new Properties());
+    server1 = clusterStartupRule.startServerVM(1, properties, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, properties, locator.getPort());
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server1.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+              .create(REGION_NAME);
+
+      // Force primary bucket to get created.
+      region.put("Key-1", "value-1");
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new PartitionAttributesFactory().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      queryService.newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      TXManagerImpl txManager = (TXManagerImpl) clientCache.getCacheTransactionManager();
+      txManager.begin();
+
+      clientCache.getRegion(REGION_NAME).destroy("Key-1");

Review comment:
       If the suggestions to move region creation to a `@Before` method and to use `PartitionRegionHelper.assignBucketsToPartitions(region);` are followed, this can be changed to a put to allow the test to still pass.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org