You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2021/04/07 19:52:27 UTC

[geode] branch support/1.12 updated (553b267 -> c126748)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a change to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from 553b267  GEODE-9126: Bump jetty from 9.4.38.v20210224 to 9.4.39.v20210325
     new fead6fb  GEODE-8247: modified FilterRoutingInfo checks (#5248)
     new c126748  GEODE-8926: Calculate filter information after tx is applied to cache. (#6232)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/geode/internal/cache/FilterProfile.java |  25 +-
 .../apache/geode/internal/cache/LocalRegion.java   |   3 +-
 .../cache/PartitionedRegionQueryEvaluator.java     | 101 ++++----
 .../apache/geode/internal/cache/TXEntryState.java  |  10 +
 .../org/apache/geode/internal/cache/TXState.java   |  28 ++-
 .../internal/cache/TxCallbackEventFactoryImpl.java |  44 +---
 .../internal/cache/tier/sockets/BaseCommand.java   |  10 +-
 .../cache/PartitionedRegionQueryEvaluatorTest.java | 179 +++++++++++++++
 .../apache/geode/internal/cache/TXStateTest.java   |  25 ++
 .../cache/tier/sockets/FilterProfileJUnitTest.java |   1 +
 .../geode/cache/query/cq/CQDistributedTest.java    |  80 ++++++-
 .../cq/dunit/PartitionedRegionTxDUnitTest.java     | 253 +++++++++++++++++++++
 12 files changed, 670 insertions(+), 89 deletions(-)
 create mode 100644 geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java

[geode] 01/02: GEODE-8247: modified FilterRoutingInfo checks (#5248)

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit fead6fbce77f5bc492ad2eece9b68261b52d0b19
Author: Mario Ivanac <48...@users.noreply.github.com>
AuthorDate: Fri Jun 26 21:09:33 2020 +0200

    GEODE-8247: modified FilterRoutingInfo checks (#5248)
    
    * GEODE-8247: solution and new tests added
    
    * GEODE-8247: fix for failing test
    
    * GEODE-8247: update after comments
    
    (cherry picked from commit 6bd1d4be0ae1a2f3a141a36fe545ff18a5adf58c)
---
 .../apache/geode/internal/cache/FilterProfile.java | 23 +++++--
 .../internal/cache/TxCallbackEventFactoryImpl.java |  3 +-
 .../geode/cache/query/cq/CQDistributedTest.java    | 80 +++++++++++++++++++++-
 3 files changed, 96 insertions(+), 10 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index 158f185..60db8a8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -1083,21 +1083,32 @@ public class FilterProfile implements DataSerializableFixedID {
     }
 
     FilterRoutingInfo frInfo = null;
+    // bug #50809 - local routing for transactional ops must be done here
+    // because the event isn't available later and we lose the old value for the entry
+    boolean processLocalProfile = false;
 
     CqService cqService = getCqService(event.getRegion());
     if (cqService.isRunning()) {
-      frInfo = new FilterRoutingInfo();
-      // bug #50809 - local routing for transactional ops must be done here
-      // because the event isn't available later and we lose the old value for the entry
-      final boolean processLocalProfile =
+      processLocalProfile =
           event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null;
+      frInfo = new FilterRoutingInfo();
       fillInCQRoutingInfo(event, processLocalProfile, peerProfiles, frInfo);
     }
 
+    Profile[] tempProfiles = peerProfiles;
+
+    if (processLocalProfile) {
+      tempProfiles = new Profile[peerProfiles.length + 1];
+      for (int i = 0; i < peerProfiles.length; i++) {
+        tempProfiles[i] = peerProfiles[i];
+      }
+      tempProfiles[peerProfiles.length] = localProfile;
+    }
+
     // Process InterestList.
     // return fillInInterestRoutingInfo(event, peerProfiles, frInfo, cacheOpRecipients);
-    frInfo = fillInInterestRoutingInfo(event, peerProfiles, frInfo, cacheOpRecipients);
-    if (frInfo == null || !frInfo.hasMemberWithFilterInfo()) {
+    frInfo = fillInInterestRoutingInfo(event, tempProfiles, frInfo, cacheOpRecipients);
+    if (frInfo == null || (!frInfo.hasMemberWithFilterInfo() && !processLocalProfile)) {
       return null;
     } else {
       return frInfo;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
index 897e00c..0c2da66 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
@@ -81,7 +81,8 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
         localRouting = filterRoutingInfo.getLocalFilterInfo();
         if (localRouting != null) {
           // routing was computed in this VM but may need to perform local interest processing
-          computeFilterInfo = !filterRoutingInfo.hasLocalInterestBeenComputed();
+          computeFilterInfo = !filterRoutingInfo.hasLocalInterestBeenComputed()
+              && !localRouting.filterProcessedLocally;
         } else {
           // routing was computed elsewhere and is in the "remote" routing table
           localRouting = filterRoutingInfo.getFilterInfo(internalRegion.getMyId());
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
index 01c1954..5743d6a 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/CQDistributedTest.java
@@ -16,6 +16,7 @@ package org.apache.geode.cache.query.cq;
 
 import static junit.framework.TestCase.assertEquals;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.Serializable;
 import java.util.Properties;
@@ -25,6 +26,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientCache;
@@ -52,6 +55,8 @@ public class CQDistributedTest implements Serializable {
   private CqAttributes cqa;
   private QueryService qs;
   private TestCqListener testListener;
+  private TestCqListener2 testListener2;
+
 
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
@@ -70,7 +75,10 @@ public class CQDistributedTest implements Serializable {
     qs = clientCache.getQueryService();
     CqAttributesFactory cqaf = new CqAttributesFactory();
     testListener = new TestCqListener();
+    testListener2 = new TestCqListener2();
     cqaf.addCqListener(testListener);
+    cqaf.addCqListener(testListener2);
+
     cqa = cqaf.create();
   }
 
@@ -174,6 +182,54 @@ public class CQDistributedTest implements Serializable {
     assertEquals(1, results.size());
   }
 
+  @Test
+  public void cqWithTransaction() throws Exception {
+    qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
+
+    server.invoke(() -> {
+      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      final CacheTransactionManager txMgr =
+          ClusterStartupRule.getCache().getCacheTransactionManager();
+
+      // CREATE new entry
+      txMgr.begin();
+      regionOnServer.put(0, new Portfolio(1));
+      txMgr.commit();
+
+      // UPDATE
+      txMgr.begin();
+      regionOnServer.put(0, new Portfolio(0));
+      txMgr.commit();
+
+      // CREATE
+      txMgr.begin();
+      regionOnServer.put(0, new Portfolio(1));
+      txMgr.commit();
+    });
+
+    await().untilAsserted(() -> assertThat(testListener2.onEventCreateCalls).isEqualTo(2));
+    await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
+  }
+
+  @Test
+  public void cqWithoutTransaction() throws Exception {
+    qs.newCq("Select * from /region r where r.ID = 1", cqa).execute();
+
+    server.invoke(() -> {
+      Region regionOnServer = ClusterStartupRule.getCache().getRegion("region");
+      // CREATE new entry
+      regionOnServer.put(0, new Portfolio(1));
+
+      // UPDATE
+      regionOnServer.put(0, new Portfolio(0));
+
+      // CREATE
+      regionOnServer.put(0, new Portfolio(1));
+    });
+
+    await().untilAsserted(() -> assertThat(testListener2.onEventCreateCalls).isEqualTo(2));
+    await().untilAsserted(() -> assertThat(testListener2.onEventUpdateCalls).isEqualTo(0));
+  }
 
   private class TestCqListener implements CqListener, Serializable {
     public int onEventCalls = 0;
@@ -184,16 +240,34 @@ public class CQDistributedTest implements Serializable {
     }
 
     @Override
-    public void onError(CqEvent aCqEvent) {
+    public void onError(CqEvent aCqEvent) {}
+
+    @Override
+    public void close() {}
+  }
 
+  private class TestCqListener2 implements CqListener, Serializable {
+    public int onEventCreateCalls = 0;
+    public int onEventUpdateCalls = 0;
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {
+      Operation queryOperation = aCqEvent.getQueryOperation();
+      if (queryOperation.isCreate()) {
+        onEventCreateCalls++;
+      } else if (queryOperation.isUpdate()) {
+        onEventUpdateCalls++;
+      }
     }
 
     @Override
-    public void close() {
+    public void onError(CqEvent aCqEvent) {}
 
-    }
+    @Override
+    public void close() {}
   }
 
+
   private void createServerRegion(MemberVM server, RegionShortcut regionShortcut) {
     server.invoke(() -> {
       clusterStartupRule.getCache().createRegionFactory(regionShortcut).create("region");

[geode] 02/02: GEODE-8926: Calculate filter information after tx is applied to cache. (#6232)

Posted by es...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c12674883311b42955fbe9e8c5421db557efe08e
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Thu Apr 1 15:22:13 2021 -0700

    GEODE-8926: Calculate filter information after tx is applied to cache. (#6232)
    
      * Calculate filter routing on tx host node for partitioned regions after tx is applied to cache.
        This is to avoid concurrent register interest or cq registration miss the operations either
        from snapshot/query results or through client cache update.
      * Register interest snapshot taken or cq query with inital results are taken on primary buckets now
        to make sure the results are correct.
      * Avoid calculating filter routing multiple times with TX.
    
      Co-authored-by: agingade <ag...@vmware.com>
    
    (cherry picked from commit dedcea3b745a35a56b28fae7cd50042822e1629b)
---
 .../apache/geode/internal/cache/FilterProfile.java |   8 +-
 .../apache/geode/internal/cache/LocalRegion.java   |   3 +-
 .../cache/PartitionedRegionQueryEvaluator.java     | 101 ++++----
 .../apache/geode/internal/cache/TXEntryState.java  |  10 +
 .../org/apache/geode/internal/cache/TXState.java   |  28 ++-
 .../internal/cache/TxCallbackEventFactoryImpl.java |  41 +---
 .../internal/cache/tier/sockets/BaseCommand.java   |  10 +-
 .../cache/PartitionedRegionQueryEvaluatorTest.java | 179 +++++++++++++++
 .../apache/geode/internal/cache/TXStateTest.java   |  25 ++
 .../cache/tier/sockets/FilterProfileJUnitTest.java |   1 +
 .../cq/dunit/PartitionedRegionTxDUnitTest.java     | 253 +++++++++++++++++++++
 11 files changed, 577 insertions(+), 82 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index 60db8a8..dd88ead 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -1085,12 +1085,12 @@ public class FilterProfile implements DataSerializableFixedID {
     FilterRoutingInfo frInfo = null;
     // bug #50809 - local routing for transactional ops must be done here
     // because the event isn't available later and we lose the old value for the entry
-    boolean processLocalProfile = false;
+
+    boolean processLocalProfile =
+        event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null;
 
     CqService cqService = getCqService(event.getRegion());
     if (cqService.isRunning()) {
-      processLocalProfile =
-          event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null;
       frInfo = new FilterRoutingInfo();
       fillInCQRoutingInfo(event, processLocalProfile, peerProfiles, frInfo);
     }
@@ -1309,7 +1309,6 @@ public class FilterProfile implements DataSerializableFixedID {
    */
   public FilterRoutingInfo fillInInterestRoutingInfo(CacheEvent event, Profile[] profiles,
       FilterRoutingInfo filterRoutingInfo, Set cacheOpRecipients) {
-
     Set clientsInv = Collections.emptySet();
     Set clients = Collections.emptySet();
 
@@ -1902,6 +1901,7 @@ public class FilterProfile implements DataSerializableFixedID {
             if (logger.isDebugEnabled()) {
               logger.debug("Processing the filter profile request for : {}", this);
             }
+            fp.region = (LocalRegion) r;
             processRequest(fp);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 44c8774..b6fcbb3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6088,7 +6088,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       }
 
       InternalCacheEvent ice = (InternalCacheEvent) event;
-      if (!isUsedForPartitionedRegionBucket()) {
+      if (!(this instanceof PartitionedRegion || isUsedForPartitionedRegionBucket())) {
+        // Do not generate local filter routing if partitioned region.
         generateLocalFilterRouting(ice);
       }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index 1c7d4bb..93aa110 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -854,51 +854,82 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap<>();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList<>();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      nodeToBucketMap.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList<>();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
-          bucketIds.add(Integer.valueOf(bid));
+          bucketIds.add(bid);
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList<>(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
-    for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()
+    int totalBucketsToQuery = bucketIdsToConsider.size();
+    for (Iterator<InternalDistributedMember> dsItr = allNodes.iterator(); dsItr.hasNext()
         && (bucketIds.size() < totalBucketsToQuery);) {
-      InternalDistributedMember nd = (InternalDistributedMember) dsItr.next();
-
-      final List<Integer> buckets = new ArrayList<Integer>();
+      InternalDistributedMember nd = dsItr.next();
+      final List<Integer> buckets = new ArrayList<>();
       for (Integer bid : bucketIdsToConsider) {
         if (!bucketIds.contains(bid)) {
           final Set owners = getBucketOwners(bid);
@@ -909,28 +940,22 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         }
       }
       if (!buckets.isEmpty()) {
-        ret.put(nd, buckets);
+        nodeToBucketMap.put(nd, buckets);
       }
     }
+    return bucketIds;
+  }
 
-    if (bucketIds.size() != totalBucketsToQuery) {
-      bucketIdsToConsider.removeAll(bucketIds);
-      throw new QueryException("Data loss detected, unable to find the hosting "
-          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Node to bucketId map: {}", ret);
-    }
-    return ret;
+  InternalDistributedMember getPrimaryBucketOwner(Integer bid) {
+    return pr.getBucketPrimary(bid);
   }
 
   protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
-    return pr.getRegionAdvisor().getBucketOwners(bid.intValue());
+    return pr.getRegionAdvisor().getBucketOwners(bid);
   }
 
-  protected ArrayList getAllNodes(RegionAdvisor regionAdvisor) {
-    ArrayList nodes = new ArrayList(regionAdvisor.adviseDataStore());
+  protected ArrayList<InternalDistributedMember> getAllNodes(RegionAdvisor regionAdvisor) {
+    ArrayList<InternalDistributedMember> nodes = new ArrayList<>(regionAdvisor.adviseDataStore());
     Collections.shuffle(nodes);
     return nodes;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
index 6332268..b76e100 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
@@ -95,6 +95,8 @@ public class TXEntryState implements Releasable {
 
   private byte[] serializedPendingValue;
 
+  private EntryEventImpl pendingCallback;
+
   /**
    * Remember the callback argument for listener invocation
    */
@@ -1939,6 +1941,14 @@ public class TXEntryState implements Releasable {
     return this.modSerialNum;
   }
 
+  public EntryEventImpl getPendingCallback() {
+    return pendingCallback;
+  }
+
+  public void setPendingCallback(EntryEventImpl pendingCallback) {
+    this.pendingCallback = pendingCallback;
+  }
+
   /**
    * Just like an EntryEventImpl but also has access to TxEntryState to make it Comparable
    *
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index b006d71..3cf77c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -100,7 +100,7 @@ public class TXState implements TXStateInterface {
    * this transaction.
    */
   private int modSerialNum;
-  private final List<EntryEventImpl> pendingCallbacks = new ArrayList<EntryEventImpl>();
+  private final List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
   // Access this variable should be in synchronized block.
   private boolean beforeCompletionCalled;
 
@@ -473,7 +473,6 @@ public class TXState implements TXStateInterface {
       List/* <TXEntryStateWithRegionAndKey> */ entries = generateEventOffsets();
       TXCommitMessage msg = null;
       try {
-
         /*
          * In order to preserve data consistency, we need to: 1. Modify the cache first
          * (applyChanges) 2. Ask for advice on who to send to (buildMessage) 3. Send out to other
@@ -482,8 +481,6 @@ public class TXState implements TXStateInterface {
          * If this is done out of order, we will have problems with GII, split brain, and HA.
          */
 
-        attachFilterProfileInformation(entries);
-
         lockTXRegions(regions);
 
         try {
@@ -494,6 +491,8 @@ public class TXState implements TXStateInterface {
             this.internalAfterApplyChanges.run();
           }
 
+          attachFilterProfileInformation(entries);
+
           // build and send the message
           msg = buildMessage();
           this.commitMessage = msg;
@@ -563,6 +562,10 @@ public class TXState implements TXStateInterface {
             @Released
             EntryEventImpl ev =
                 (EntryEventImpl) o.es.getEvent(o.r, o.key, o.es.getTXRegionState().getTXState());
+            if (ev.getOperation() == null) {
+              // A read op with detect read conflicts does not need filter routing.
+              continue;
+            }
             try {
               /*
                * The routing information is derived from the PR advisor, not the bucket advisor.
@@ -572,6 +575,18 @@ public class TXState implements TXStateInterface {
               o.es.setFilterRoutingInfo(fri);
               Set set = bucket.getAdjunctReceivers(ev, Collections.EMPTY_SET, new HashSet(), fri);
               o.es.setAdjunctRecipients(set);
+
+              if (o.es.getPendingCallback() != null) {
+                if (fri != null) {
+                  // For tx host, local filter info was also calculated.
+                  // Set this local filter info in corresponding pending callback so that
+                  // notifyBridgeClient has correct routing info.
+                  FilterRoutingInfo.FilterInfo localRouting = fri.getLocalFilterInfo();
+                  o.es.getPendingCallback().setLocalFilterInfo(localRouting);
+                }
+                // Do not hold pending callback reference in TXEntryState as it is no longer used.
+                o.es.setPendingCallback(null);
+              }
             } finally {
               ev.release();
             }
@@ -821,6 +836,7 @@ public class TXState implements TXStateInterface {
     }
 
     // applyChanges for each entry
+    int size = pendingCallbacks.size();
     for (Object entry : entries) {
       TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) entry;
       if (this.internalDuringApplyChanges != null) {
@@ -828,6 +844,10 @@ public class TXState implements TXStateInterface {
       }
       try {
         o.es.applyChanges(o.r, o.key, this);
+        if (pendingCallbacks.size() > size) {
+          o.es.setPendingCallback(pendingCallbacks.get(size));
+          size = pendingCallbacks.size();
+        }
       } catch (RegionDestroyedException ex) {
         // region was destroyed out from under us; after conflict checking
         // passed. So act as if the region destroy happened right after the
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
index 0c2da66..bc0ec5e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
@@ -42,7 +42,7 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
       ClientProxyMembershipID bridgeContext,
       TXEntryState txEntryState, VersionTag versionTag,
       long tailKey) {
-    DistributedMember originator = null;
+    DistributedMember originator;
     // txId should not be null even on localOrigin
     Assert.assertTrue(txId != null);
     originator = txId.getMemberId();
@@ -57,8 +57,6 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
         aCallbackArgument, txEntryState == null, originator);
     boolean returnedRetVal = false;
     try {
-
-
       if (bridgeContext != null) {
         retVal.setContext(bridgeContext);
       }
@@ -75,9 +73,7 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
 
       FilterRoutingInfo.FilterInfo localRouting = null;
       boolean computeFilterInfo = false;
-      if (filterRoutingInfo == null) {
-        computeFilterInfo = true;
-      } else {
+      if (filterRoutingInfo != null) {
         localRouting = filterRoutingInfo.getLocalFilterInfo();
         if (localRouting != null) {
           // routing was computed in this VM but may need to perform local interest processing
@@ -91,8 +87,6 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
           if (!computeFilterInfo) {
             retVal.setLocalFilterInfo(localRouting);
           }
-        } else {
-          computeFilterInfo = true;
         }
       }
       if (TxCallbackEventFactoryImpl.logger.isTraceEnabled()) {
@@ -109,32 +103,13 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
         } else {
           retVal.setInvokePRCallbacks(false);
         }
-
-        if (computeFilterInfo) {
-          if (bucket.getBucketAdvisor().isPrimary()) {
-            if (TxCallbackEventFactoryImpl.logger.isTraceEnabled()) {
-              TxCallbackEventFactoryImpl.logger
-                  .trace("createCBEvent computing routing for primary bucket");
-            }
-            FilterProfile fp =
-                ((BucketRegion) internalRegion).getPartitionedRegion().getFilterProfile();
-            if (fp != null) {
-              FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(filterRoutingInfo, retVal);
-              if (fri != null) {
-                retVal.setLocalFilterInfo(fri.getLocalFilterInfo());
-              }
-            }
-          }
-        }
-      } else if (computeFilterInfo) { // not a bucket
-        if (TxCallbackEventFactoryImpl.logger.isTraceEnabled()) {
-          TxCallbackEventFactoryImpl.logger.trace("createCBEvent computing routing for non-bucket");
-        }
-        FilterProfile fp = internalRegion.getFilterProfile();
-        if (fp != null) {
-          retVal.setLocalFilterInfo(fp.getLocalFilterRouting(retVal));
-        }
       }
+      // No need to computeFilterInfo for primary bucket, as it is done
+      // during attach filter info after applying to cache.
+      // For secondary buckets, filter routing is calculated in the "remote" routing table.
+      // For replicate region, filter routing should be computed after entry commit
+      // is applied to cache, as concurrent register interest could occur.
+      // That computation occurs in notifyBridgeClient when no local routing is set.
       retVal.setTransactionId(txId);
       returnedRetVal = true;
       return retVal;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index c933d6a..8c8be6f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -1086,7 +1086,12 @@ public abstract class BaseCommand implements Command {
    */
   private static void handleSingleton(LocalRegion region, Object entryKey,
       InterestResultPolicy policy, ServerConnection servConn) throws IOException {
-    List keyList = new ArrayList(1);
+    List<Object> keyList = new ArrayList<>(1);
+    if (region instanceof PartitionedRegion) {
+      keyList.add(entryKey);
+      handleListPR((PartitionedRegion) region, keyList, policy, servConn);
+      return;
+    }
     if (region != null) {
       if (region.containsKey(entryKey)
           || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) {
@@ -1365,7 +1370,8 @@ public abstract class BaseCommand implements Command {
    */
   private static void handleListPR(final PartitionedRegion region, final List keyList,
       final InterestResultPolicy policy, final ServerConnection servConn) throws IOException {
-    final List newKeyList = new ArrayList(MAXIMUM_CHUNK_SIZE);
+    int chunkSize = keyList.size() < MAXIMUM_CHUNK_SIZE ? keyList.size() : MAXIMUM_CHUNK_SIZE;
+    final List<Object> newKeyList = new ArrayList<>(chunkSize);
     region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy),
         new PartitionedRegion.SetCollector() {
           @Override
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
index cc49d56..475aa19 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
@@ -16,8 +16,13 @@ package org.apache.geode.internal.cache;
 
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -35,6 +40,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.internal.CompiledSelect;
 import org.apache.geode.cache.query.internal.CompiledValue;
@@ -239,6 +245,179 @@ public class PartitionedRegionQueryEvaluatorTest {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    int bucket1 = 1;
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == bucket1) {
+        doReturn(null).doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
+      } else {
+        doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    verify(prqe, times(2)).getPrimaryBucketOwner(bucket1);
+  }
+
+  @Test
+  public void exceptionIsThrownWhenPrimaryBucketNodeIsNotFoundForCqQuery() {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    int bucket1 = 1;
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == bucket1) {
+        doReturn(null).when(prqe).getPrimaryBucketOwner(bid);
+      } else {
+        doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
+      }
+    }
+
+    assertThatThrownBy(prqe::buildNodeToBucketMap).isInstanceOf(QueryException.class)
+        .hasMessageContaining(
+            "Data loss detected, unable to find the hosting  node for some of the dataset.");
+
+    verify(prqe, times(3)).getPrimaryBucketOwner(bucket1);
+  }
+
+  @Test
+  public void verifyLocalBucketNodesAreRetrievedForQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    for (Integer bid : bucketList) {
+      when(dataStore.isManagingBucket(bid)).thenReturn(true);
+    }
+    when(pr.getDataStore()).thenReturn(dataStore);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(localNode).size()).isEqualTo(10);
+  }
+
+  @Test
+  public void verifyAllBucketsAreRetrievedFromSingleRemoteNode() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(pr.getDataStore()).thenReturn(null);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Set<InternalDistributedMember> nodes = new HashSet<>(allNodes);
+    when(regionAdvisor.adviseDataStore()).thenReturn(nodes);
+    for (Integer bid : bucketList) {
+      when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodes);
+    }
+    when(pr.getRegionAdvisor()).thenReturn(regionAdvisor);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    bnMap.keySet().forEach(x -> assertThat(bnMap.get(x).size()).isEqualTo(10));
+  }
+
+  @Test
+  public void verifyAllBucketsAreRetrievedFromMultipleRemoteNodes() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(pr.getDataStore()).thenReturn(null);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Set<InternalDistributedMember> nodes = new HashSet<>(allNodes);
+    when(regionAdvisor.adviseDataStore()).thenReturn(nodes);
+    Set<InternalDistributedMember> nodesA = new HashSet<>();
+    nodesA.add(remoteNodeA);
+    Set<InternalDistributedMember> nodesB = new HashSet<>();
+    nodesB.add(remoteNodeB);
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodesA);
+      } else {
+        when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodesB);
+      }
+    }
+    when(pr.getRegionAdvisor()).thenReturn(regionAdvisor);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    bnMap.keySet().forEach(x -> {
+      assertThat(x).satisfiesAnyOf(
+          member -> assertThat(member).isEqualTo(remoteNodeA),
+          member -> assertThat(member).isEqualTo(remoteNodeB));
+      assertThat(bnMap.get(x).size()).isEqualTo(5);
+    });
+  }
+
+  @Test
+  public void exceptionIsThrownWhenNodesAreNotFoundForQueryBuckets() {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(pr.getDataStore()).thenReturn(null);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Set<InternalDistributedMember> nodes = new HashSet<>(allNodes);
+    when(regionAdvisor.adviseDataStore()).thenReturn(nodes);
+    for (Integer bid : bucketList) {
+      if (bid != 1) {
+        when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodes);
+      }
+    }
+    when(pr.getRegionAdvisor()).thenReturn(regionAdvisor);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    assertThatThrownBy(prqe::buildNodeToBucketMap).isInstanceOf(QueryException.class)
+        .hasMessageContaining(
+            "Data loss detected, unable to find the hosting  node for some of the dataset.");
+  }
+
   private Map<InternalDistributedMember, List<Integer>> createFakeBucketMap() {
     Map<InternalDistributedMember, List<Integer>> bucketToNodeMap = new HashMap<>();
     bucketToNodeMap.put(localNode, createBucketList(1, 2, 3));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 362ee5f..f44fb19 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -19,9 +19,12 @@ import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabl
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -29,10 +32,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
 import javax.transaction.Status;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.EntryNotFoundException;
@@ -84,6 +90,25 @@ public class TXStateTest {
   }
 
   @Test
+  public void attachFilterProfileAfterApplyingChanges() {
+    TXState txState = spy(new TXState(txStateProxy, false, disabledClock()));
+    doReturn(new ArrayList()).when(txState).generateEventOffsets();
+    doNothing().when(txState).attachFilterProfileInformation(any());
+    doNothing().when(txState).applyChanges(any());
+    TXCommitMessage txCommitMessage = mock(TXCommitMessage.class);
+    doReturn(txCommitMessage).when(txState).buildMessage();
+
+    txState.commit();
+
+    InOrder inOrder = inOrder(txState, txCommitMessage);
+    inOrder.verify(txState).applyChanges(any());
+    inOrder.verify(txState).attachFilterProfileInformation(any());
+    inOrder.verify(txState).buildMessage();
+    inOrder.verify(txCommitMessage).send(any());
+    inOrder.verify(txState).firePendingCallbacks();
+  }
+
+  @Test
   public void doAfterCompletionCanCommitJTA() {
     TXState txState = spy(new TXState(txStateProxy, false, disabledClock()));
     txState.reserveAndCheck();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java
index a711a7c..77d6712 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java
@@ -49,6 +49,7 @@ public class FilterProfileJUnitTest {
     when(mockCache.getCacheServers()).thenReturn(Collections.emptyList());
     when(mockRegion.getGemFireCache()).thenReturn(mockCache);
     fprofile = new FilterProfile(mockRegion);
+    when(mockRegion.getFilterProfile()).thenReturn(fprofile);
   }
 
   @Test
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
new file mode 100644
index 0000000..a7155b2
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.DUnitBlackboard;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private static volatile DUnitBlackboard blackboard;
+  private static final long TIMEOUT_MILLIS = getTimeout().getValueInMS();
+  private final String REGION_NAME = "region";
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Before
+  public void setUp() {
+    MemberVM locator = clusterStartupRule.startLocatorVM(0);
+    server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory<>().setRedundantCopies(1).setTotalNumBuckets(1)
+                  .create())
+              .create(REGION_NAME);
+
+      PartitionRegionHelper.assignBucketsToPartitions(region);
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new PartitionAttributesFactory<>().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+  }
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    getBlackboard().setMailbox("CqQueryResultCount", 0);
+    getBlackboard().setMailbox("CqEvents", 0);
+
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          getBlackboard().signalGate("StartCQ");
+          getBlackboard().waitForGate("RegistrationFinished", TIMEOUT_MILLIS, MILLISECONDS);
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      getBlackboard().waitForGate("StartCQ", TIMEOUT_MILLIS, MILLISECONDS);
+      SelectResults<Object> cqResults = queryService
+          .newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+      getBlackboard().signalGate("RegistrationFinished");
+      getBlackboard().setMailbox("CqQueryResultCount", cqResults.asList().size());
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqQueryResultCount = getBlackboard().getMailbox("CqQueryResultCount");
+      Integer CqEvents = getBlackboard().getMailbox("CqEvents");
+      assertThat(CqQueryResultCount + CqEvents).isEqualTo(1);
+    });
+
+    serverAsync.await();
+  }
+
+  @Test
+  public void verifyCqEventInvocationForDestroyOpIfTxCommitFromClient() throws Exception {
+    getBlackboard().setMailbox("CqEvents", 0);
+
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server1.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      queryService.newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      TXManagerImpl txManager = (TXManagerImpl) clientCache.getCacheTransactionManager();
+      txManager.begin();
+
+      clientCache.getRegion(REGION_NAME).destroy("Key-1");
+      txManager.commit();
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqEvents = getBlackboard().getMailbox("CqEvents");
+      assertThat(CqEvents).isEqualTo(1);
+    });
+  }
+
+  @Test
+  public void verifyInterestRegistrationWorksDuringTxCommit() throws Exception {
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    AsyncInvocation serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          getBlackboard().signalGate("StartReg");
+          getBlackboard().waitForGate("RegistrationFinished", TIMEOUT_MILLIS, MILLISECONDS);
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-5", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      Region<Object, Object> region =
+          clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+              .create(REGION_NAME);
+      getBlackboard().waitForGate("StartReg", TIMEOUT_MILLIS, MILLISECONDS);
+      region.registerInterest("Key-5", InterestResultPolicy.KEYS_VALUES);
+      region.registerInterest("Key-6", InterestResultPolicy.KEYS_VALUES);
+      getBlackboard().signalGate("RegistrationFinished");
+
+      GeodeAwaitility.await().untilAsserted(() -> assertThat(region.size()).isEqualTo(1));
+    });
+
+    serverAsync.await();
+  }
+
+  private class TestCqListener implements CqListener, Serializable {
+
+    int numEvents = 0;
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {
+      numEvents++;
+      getBlackboard().setMailbox("CqEvents", numEvents);
+    }
+
+    @Override
+    public void onError(CqEvent aCqEvent) {}
+  }
+
+  private static DUnitBlackboard getBlackboard() {
+    if (blackboard == null) {
+      blackboard = new DUnitBlackboard();
+    }
+    return blackboard;
+  }
+}