You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/04/07 17:27:03 UTC

[geode] branch support/1.14 updated: GEODE-8926: Calculate filter information after tx is applied to cache. (#6232)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new ba47d66  GEODE-8926: Calculate filter information after tx is applied to cache. (#6232)
ba47d66 is described below

commit ba47d66a0658d85c1d059e584896570197225d6a
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Thu Apr 1 15:22:13 2021 -0700

    GEODE-8926: Calculate filter information after tx is applied to cache. (#6232)
    
      * Calculate filter routing on tx host node for partitioned regions after tx is applied to cache.
        This is to avoid concurrent register interest or cq registration miss the operations either
        from snapshot/query results or through client cache update.
      * Register interest snapshot taken or cq query with inital results are taken on primary buckets now
        to make sure the results are correct.
      * Avoid calculating filter routing multiple times with TX.
    
      Co-authored-by: agingade <ag...@vmware.com>
    
    (cherry picked from commit dedcea3b745a35a56b28fae7cd50042822e1629b)
---
 .../apache/geode/internal/cache/FilterProfile.java |   8 +-
 .../apache/geode/internal/cache/LocalRegion.java   |   3 +-
 .../cache/PartitionedRegionQueryEvaluator.java     | 101 +++++----
 .../apache/geode/internal/cache/TXEntryState.java  |  10 +
 .../org/apache/geode/internal/cache/TXState.java   |  28 ++-
 .../internal/cache/TxCallbackEventFactoryImpl.java |  41 +---
 .../internal/cache/tier/sockets/BaseCommand.java   |   8 +-
 .../cache/PartitionedRegionQueryEvaluatorTest.java | 179 +++++++++++++++
 .../apache/geode/internal/cache/TXStateTest.java   |  25 +++
 .../cache/tier/sockets/FilterProfileJUnitTest.java |   1 +
 .../cq/dunit/PartitionedRegionTxDUnitTest.java     | 246 +++++++++++++++++++++
 11 files changed, 569 insertions(+), 81 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index b2a64c0..352b58a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -1085,12 +1085,12 @@ public class FilterProfile implements DataSerializableFixedID {
     FilterRoutingInfo frInfo = null;
     // bug #50809 - local routing for transactional ops must be done here
     // because the event isn't available later and we lose the old value for the entry
-    boolean processLocalProfile = false;
+
+    boolean processLocalProfile =
+        event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null;
 
     CqService cqService = getCqService(event.getRegion());
     if (cqService.isRunning()) {
-      processLocalProfile =
-          event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null;
       frInfo = new FilterRoutingInfo();
       fillInCQRoutingInfo(event, processLocalProfile, peerProfiles, frInfo);
     }
@@ -1297,7 +1297,6 @@ public class FilterProfile implements DataSerializableFixedID {
    */
   public FilterRoutingInfo fillInInterestRoutingInfo(CacheEvent event, Profile[] profiles,
       FilterRoutingInfo filterRoutingInfo, Set cacheOpRecipients) {
-
     Set clientsInv = Collections.emptySet();
     Set clients = Collections.emptySet();
 
@@ -1884,6 +1883,7 @@ public class FilterProfile implements DataSerializableFixedID {
             if (logger.isDebugEnabled()) {
               logger.debug("Processing the filter profile request for : {}", this);
             }
+            fp.region = (LocalRegion) r;
             processRequest(fp);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 57b613e..35ea56e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6083,7 +6083,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       }
 
       InternalCacheEvent ice = (InternalCacheEvent) event;
-      if (!isUsedForPartitionedRegionBucket()) {
+      if (!(this instanceof PartitionedRegion || isUsedForPartitionedRegionBucket())) {
+        // Do not generate local filter routing if partitioned region.
         generateLocalFilterRouting(ice);
       }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
index c0df11d..b8ddef0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluator.java
@@ -854,51 +854,82 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
    */
   private Map<InternalDistributedMember, List<Integer>> buildNodeToBucketMapForBuckets(
       final Set<Integer> bucketIdsToConsider) throws QueryException {
-
-    final HashMap<InternalDistributedMember, List<Integer>> ret =
-        new HashMap<InternalDistributedMember, List<Integer>>();
-
+    final HashMap<InternalDistributedMember, List<Integer>> ret = new HashMap<>();
     if (bucketIdsToConsider.isEmpty()) {
       return ret;
     }
+    List<Integer> bucketIds;
+    if (query != null && query.isCqQuery()) {
+      bucketIds = findPrimaryBucketOwners(bucketIdsToConsider, ret);
+    } else {
+      bucketIds = findBucketOwners(bucketIdsToConsider, ret);
+    }
+    if (bucketIds.size() != bucketIdsToConsider.size()) {
+      bucketIdsToConsider.removeAll(bucketIds);
+      throw new QueryException("Data loss detected, unable to find the hosting "
+          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Node to bucketId map: {}", ret);
+    }
+    return ret;
+  }
 
-    final List<Integer> bucketIds = new ArrayList<Integer>();
+  private List<Integer> findPrimaryBucketOwners(Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList<>();
+    int retry = 3;
+    for (int i = 0; i < retry && (bucketIds.size() != bucketIdsToConsider.size()); i++) {
+      bucketIds.clear();
+      nodeToBucketMap.clear();
+      for (Integer bucketId : bucketIdsToConsider) {
+        InternalDistributedMember primary = getPrimaryBucketOwner(bucketId);
+        if (primary != null) {
+          bucketIds.add(bucketId);
+          if (nodeToBucketMap.get(primary) != null) {
+            nodeToBucketMap.get(primary).add(bucketId);
+          } else {
+            List<Integer> bids = new ArrayList<>();
+            bids.add(bucketId);
+            nodeToBucketMap.put(primary, bids);
+          }
+        }
+      }
+    }
+    return bucketIds;
+  }
+
+  private List<Integer> findBucketOwners(final Set<Integer> bucketIdsToConsider,
+      HashMap<InternalDistributedMember, List<Integer>> nodeToBucketMap) {
+    final List<Integer> bucketIds = new ArrayList<>();
+    // Get all local buckets
     PartitionedRegionDataStore dataStore = this.pr.getDataStore();
-    final int totalBucketsToQuery = bucketIdsToConsider.size();
     if (dataStore != null) {
       for (Integer bid : bucketIdsToConsider) {
         if (dataStore.isManagingBucket(bid)) {
-          bucketIds.add(Integer.valueOf(bid));
+          bucketIds.add(bid);
         }
       }
       if (bucketIds.size() > 0) {
-        ret.put(pr.getMyId(), new ArrayList(bucketIds));
+        List<Integer> localBucketIds = new ArrayList<>(bucketIds);
+        nodeToBucketMap.put(pr.getMyId(), localBucketIds);
         // All the buckets are hosted locally.
-        if (bucketIds.size() == totalBucketsToQuery) {
-          return ret;
+        if (localBucketIds.size() == bucketIdsToConsider.size()) {
+          return bucketIds;
         }
       }
     }
-
-    final List allNodes = getAllNodes(this.pr.getRegionAdvisor());
-    /*
-     * for(Map.Entry<InternalDistributedMember, Collection<Collection>> entry :
-     * resultsPerMember.entrySet()) { InternalDistributedMember member = entry.getKey();
-     * TaintableArrayList list = entry.getValue(); if(list.isTainted()) {
-     * taintedMembers.add(member); } }
-     */
-
+    final List<InternalDistributedMember> allNodes = getAllNodes(this.pr.getRegionAdvisor());
     // Put the failed members on the end of the list.
     if (failedMembers != null && !failedMembers.isEmpty()) {
       allNodes.removeAll(failedMembers);
       allNodes.addAll(failedMembers);
     }
-
-    for (Iterator dsItr = allNodes.iterator(); dsItr.hasNext()
+    int totalBucketsToQuery = bucketIdsToConsider.size();
+    for (Iterator<InternalDistributedMember> dsItr = allNodes.iterator(); dsItr.hasNext()
         && (bucketIds.size() < totalBucketsToQuery);) {
-      InternalDistributedMember nd = (InternalDistributedMember) dsItr.next();
-
-      final List<Integer> buckets = new ArrayList<Integer>();
+      InternalDistributedMember nd = dsItr.next();
+      final List<Integer> buckets = new ArrayList<>();
       for (Integer bid : bucketIdsToConsider) {
         if (!bucketIds.contains(bid)) {
           final Set owners = getBucketOwners(bid);
@@ -909,28 +940,22 @@ public class PartitionedRegionQueryEvaluator extends StreamingPartitionOperation
         }
       }
       if (!buckets.isEmpty()) {
-        ret.put(nd, buckets);
+        nodeToBucketMap.put(nd, buckets);
       }
     }
+    return bucketIds;
+  }
 
-    if (bucketIds.size() != totalBucketsToQuery) {
-      bucketIdsToConsider.removeAll(bucketIds);
-      throw new QueryException("Data loss detected, unable to find the hosting "
-          + " node for some of the dataset. [dataset/bucket ids:" + bucketIdsToConsider + "]");
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Node to bucketId map: {}", ret);
-    }
-    return ret;
+  InternalDistributedMember getPrimaryBucketOwner(Integer bid) {
+    return pr.getBucketPrimary(bid);
   }
 
   protected Set<InternalDistributedMember> getBucketOwners(Integer bid) {
-    return pr.getRegionAdvisor().getBucketOwners(bid.intValue());
+    return pr.getRegionAdvisor().getBucketOwners(bid);
   }
 
-  protected ArrayList getAllNodes(RegionAdvisor regionAdvisor) {
-    ArrayList nodes = new ArrayList(regionAdvisor.adviseDataStore());
+  protected ArrayList<InternalDistributedMember> getAllNodes(RegionAdvisor regionAdvisor) {
+    ArrayList<InternalDistributedMember> nodes = new ArrayList<>(regionAdvisor.adviseDataStore());
     Collections.shuffle(nodes);
     return nodes;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
index ba795ca..ee8b570 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXEntryState.java
@@ -95,6 +95,8 @@ public class TXEntryState implements Releasable {
 
   private byte[] serializedPendingValue;
 
+  private EntryEventImpl pendingCallback;
+
   /**
    * Remember the callback argument for listener invocation
    */
@@ -1939,6 +1941,14 @@ public class TXEntryState implements Releasable {
     return this.modSerialNum;
   }
 
+  public EntryEventImpl getPendingCallback() {
+    return pendingCallback;
+  }
+
+  public void setPendingCallback(EntryEventImpl pendingCallback) {
+    this.pendingCallback = pendingCallback;
+  }
+
   /**
    * Just like an EntryEventImpl but also has access to TxEntryState to make it Comparable
    *
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
index ea2cd01..801b5d4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXState.java
@@ -101,7 +101,7 @@ public class TXState implements TXStateInterface {
    * this transaction.
    */
   private int modSerialNum;
-  private final List<EntryEventImpl> pendingCallbacks = new ArrayList<EntryEventImpl>();
+  private final List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
   // Access this variable should be in synchronized block.
   private boolean beforeCompletionCalled;
 
@@ -506,7 +506,6 @@ public class TXState implements TXStateInterface {
       List/* <TXEntryStateWithRegionAndKey> */ entries = generateEventOffsets();
       TXCommitMessage msg = null;
       try {
-
         /*
          * In order to preserve data consistency, we need to: 1. Modify the cache first
          * (applyChanges) 2. Ask for advice on who to send to (buildMessage) 3. Send out to other
@@ -515,8 +514,6 @@ public class TXState implements TXStateInterface {
          * If this is done out of order, we will have problems with GII, split brain, and HA.
          */
 
-        attachFilterProfileInformation(entries);
-
         lockTXRegions(regions);
 
         try {
@@ -527,6 +524,8 @@ public class TXState implements TXStateInterface {
             this.internalAfterApplyChanges.run();
           }
 
+          attachFilterProfileInformation(entries);
+
           // build and send the message
           msg = buildMessage();
           this.commitMessage = msg;
@@ -596,6 +595,10 @@ public class TXState implements TXStateInterface {
             @Released
             EntryEventImpl ev =
                 (EntryEventImpl) o.es.getEvent(o.r, o.key, o.es.getTXRegionState().getTXState());
+            if (ev.getOperation() == null) {
+              // A read op with detect read conflicts does not need filter routing.
+              continue;
+            }
             try {
               /*
                * The routing information is derived from the PR advisor, not the bucket advisor.
@@ -605,6 +608,18 @@ public class TXState implements TXStateInterface {
               o.es.setFilterRoutingInfo(fri);
               Set set = bucket.getAdjunctReceivers(ev, Collections.EMPTY_SET, new HashSet(), fri);
               o.es.setAdjunctRecipients(set);
+
+              if (o.es.getPendingCallback() != null) {
+                if (fri != null) {
+                  // For tx host, local filter info was also calculated.
+                  // Set this local filter info in corresponding pending callback so that
+                  // notifyBridgeClient has correct routing info.
+                  FilterRoutingInfo.FilterInfo localRouting = fri.getLocalFilterInfo();
+                  o.es.getPendingCallback().setLocalFilterInfo(localRouting);
+                }
+                // Do not hold pending callback reference in TXEntryState as it is no longer used.
+                o.es.setPendingCallback(null);
+              }
             } finally {
               ev.release();
             }
@@ -854,6 +869,7 @@ public class TXState implements TXStateInterface {
     }
 
     // applyChanges for each entry
+    int size = pendingCallbacks.size();
     for (Object entry : entries) {
       TXEntryStateWithRegionAndKey o = (TXEntryStateWithRegionAndKey) entry;
       if (this.internalDuringApplyChanges != null) {
@@ -861,6 +877,10 @@ public class TXState implements TXStateInterface {
       }
       try {
         o.es.applyChanges(o.r, o.key, this);
+        if (pendingCallbacks.size() > size) {
+          o.es.setPendingCallback(pendingCallbacks.get(size));
+          size = pendingCallbacks.size();
+        }
       } catch (RegionDestroyedException ex) {
         // region was destroyed out from under us; after conflict checking
         // passed. So act as if the region destroy happened right after the
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
index 0c2da66..bc0ec5e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TxCallbackEventFactoryImpl.java
@@ -42,7 +42,7 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
       ClientProxyMembershipID bridgeContext,
       TXEntryState txEntryState, VersionTag versionTag,
       long tailKey) {
-    DistributedMember originator = null;
+    DistributedMember originator;
     // txId should not be null even on localOrigin
     Assert.assertTrue(txId != null);
     originator = txId.getMemberId();
@@ -57,8 +57,6 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
         aCallbackArgument, txEntryState == null, originator);
     boolean returnedRetVal = false;
     try {
-
-
       if (bridgeContext != null) {
         retVal.setContext(bridgeContext);
       }
@@ -75,9 +73,7 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
 
       FilterRoutingInfo.FilterInfo localRouting = null;
       boolean computeFilterInfo = false;
-      if (filterRoutingInfo == null) {
-        computeFilterInfo = true;
-      } else {
+      if (filterRoutingInfo != null) {
         localRouting = filterRoutingInfo.getLocalFilterInfo();
         if (localRouting != null) {
           // routing was computed in this VM but may need to perform local interest processing
@@ -91,8 +87,6 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
           if (!computeFilterInfo) {
             retVal.setLocalFilterInfo(localRouting);
           }
-        } else {
-          computeFilterInfo = true;
         }
       }
       if (TxCallbackEventFactoryImpl.logger.isTraceEnabled()) {
@@ -109,32 +103,13 @@ public class TxCallbackEventFactoryImpl implements TxCallbackEventFactory {
         } else {
           retVal.setInvokePRCallbacks(false);
         }
-
-        if (computeFilterInfo) {
-          if (bucket.getBucketAdvisor().isPrimary()) {
-            if (TxCallbackEventFactoryImpl.logger.isTraceEnabled()) {
-              TxCallbackEventFactoryImpl.logger
-                  .trace("createCBEvent computing routing for primary bucket");
-            }
-            FilterProfile fp =
-                ((BucketRegion) internalRegion).getPartitionedRegion().getFilterProfile();
-            if (fp != null) {
-              FilterRoutingInfo fri = fp.getFilterRoutingInfoPart2(filterRoutingInfo, retVal);
-              if (fri != null) {
-                retVal.setLocalFilterInfo(fri.getLocalFilterInfo());
-              }
-            }
-          }
-        }
-      } else if (computeFilterInfo) { // not a bucket
-        if (TxCallbackEventFactoryImpl.logger.isTraceEnabled()) {
-          TxCallbackEventFactoryImpl.logger.trace("createCBEvent computing routing for non-bucket");
-        }
-        FilterProfile fp = internalRegion.getFilterProfile();
-        if (fp != null) {
-          retVal.setLocalFilterInfo(fp.getLocalFilterRouting(retVal));
-        }
       }
+      // No need to computeFilterInfo for primary bucket, as it is done
+      // during attach filter info after applying to cache.
+      // For secondary buckets, filter routing is calculated in the "remote" routing table.
+      // For replicate region, filter routing should be computed after entry commit
+      // is applied to cache, as concurrent register interest could occur.
+      // That computation occurs in notifyBridgeClient when no local routing is set.
       retVal.setTransactionId(txId);
       returnedRetVal = true;
       return retVal;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 8000d1e..d446539 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -1043,6 +1043,11 @@ public abstract class BaseCommand implements Command {
   private static void handleSingleton(LocalRegion region, Object entryKey,
       InterestResultPolicy policy, ServerConnection servConn) throws IOException {
     List<Object> keyList = new ArrayList<>(1);
+    if (region instanceof PartitionedRegion) {
+      keyList.add(entryKey);
+      handleListPR((PartitionedRegion) region, keyList, policy, servConn);
+      return;
+    }
     if (region != null) {
       if (region.containsKey(entryKey)
           || sendTombstonesInRIResults(servConn, policy) && region.containsTombstone(entryKey)) {
@@ -1318,7 +1323,8 @@ public abstract class BaseCommand implements Command {
    */
   private static void handleListPR(final PartitionedRegion region, final List<?> keyList,
       final InterestResultPolicy policy, final ServerConnection servConn) throws IOException {
-    final List<Object> newKeyList = new ArrayList<>(MAXIMUM_CHUNK_SIZE);
+    int chunkSize = keyList.size() < MAXIMUM_CHUNK_SIZE ? keyList.size() : MAXIMUM_CHUNK_SIZE;
+    final List<Object> newKeyList = new ArrayList<>(chunkSize);
     region.getKeysWithList(keyList, sendTombstonesInRIResults(servConn, policy),
         theSet -> appendInterestResponseKeys(region, keyList, uncheckedCast(theSet), newKeyList,
             servConn));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
index cc49d56..475aa19 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionQueryEvaluatorTest.java
@@ -16,8 +16,13 @@ package org.apache.geode.internal.cache;
 
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -35,6 +40,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.query.QueryException;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.internal.CompiledSelect;
 import org.apache.geode.cache.query.internal.CompiledValue;
@@ -239,6 +245,179 @@ public class PartitionedRegionQueryEvaluatorTest {
         .until(() -> !(bucketList.equals(prqe.getAllNodes(regionAdvisor))));
   }
 
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeA);
+      } else {
+        when(prqe.getPrimaryBucketOwner(bid)).thenReturn(remoteNodeB);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(5);
+    assertThat(bnMap.get(remoteNodeB).size()).isEqualTo(5);
+  }
+
+  @Test
+  public void verifyPrimaryBucketNodesAreRetrievedForCqQueryWithRetry() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    int bucket1 = 1;
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == bucket1) {
+        doReturn(null).doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
+      } else {
+        doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
+      }
+    }
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(remoteNodeA).size()).isEqualTo(10);
+    verify(prqe, times(2)).getPrimaryBucketOwner(bucket1);
+  }
+
+  @Test
+  public void exceptionIsThrownWhenPrimaryBucketNodeIsNotFoundForCqQuery() {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    int bucket1 = 1;
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(query.isCqQuery()).thenReturn(true);
+    PartitionedRegionQueryEvaluator prqe =
+        spy(new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet));
+
+    for (Integer bid : bucketList) {
+      if (bid == bucket1) {
+        doReturn(null).when(prqe).getPrimaryBucketOwner(bid);
+      } else {
+        doReturn(remoteNodeA).when(prqe).getPrimaryBucketOwner(bid);
+      }
+    }
+
+    assertThatThrownBy(prqe::buildNodeToBucketMap).isInstanceOf(QueryException.class)
+        .hasMessageContaining(
+            "Data loss detected, unable to find the hosting  node for some of the dataset.");
+
+    verify(prqe, times(3)).getPrimaryBucketOwner(bucket1);
+  }
+
+  @Test
+  public void verifyLocalBucketNodesAreRetrievedForQuery() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    PartitionedRegionDataStore dataStore = mock(PartitionedRegionDataStore.class);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    for (Integer bid : bucketList) {
+      when(dataStore.isManagingBucket(bid)).thenReturn(true);
+    }
+    when(pr.getDataStore()).thenReturn(dataStore);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    assertThat(bnMap.get(localNode).size()).isEqualTo(10);
+  }
+
+  @Test
+  public void verifyAllBucketsAreRetrievedFromSingleRemoteNode() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(pr.getDataStore()).thenReturn(null);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Set<InternalDistributedMember> nodes = new HashSet<>(allNodes);
+    when(regionAdvisor.adviseDataStore()).thenReturn(nodes);
+    for (Integer bid : bucketList) {
+      when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodes);
+    }
+    when(pr.getRegionAdvisor()).thenReturn(regionAdvisor);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(1);
+    bnMap.keySet().forEach(x -> assertThat(bnMap.get(x).size()).isEqualTo(10));
+  }
+
+  @Test
+  public void verifyAllBucketsAreRetrievedFromMultipleRemoteNodes() throws Exception {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(pr.getDataStore()).thenReturn(null);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Set<InternalDistributedMember> nodes = new HashSet<>(allNodes);
+    when(regionAdvisor.adviseDataStore()).thenReturn(nodes);
+    Set<InternalDistributedMember> nodesA = new HashSet<>();
+    nodesA.add(remoteNodeA);
+    Set<InternalDistributedMember> nodesB = new HashSet<>();
+    nodesB.add(remoteNodeB);
+    for (Integer bid : bucketList) {
+      if (bid % 2 == 0) {
+        when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodesA);
+      } else {
+        when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodesB);
+      }
+    }
+    when(pr.getRegionAdvisor()).thenReturn(regionAdvisor);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    Map<InternalDistributedMember, List<Integer>> bnMap = prqe.buildNodeToBucketMap();
+
+    assertThat(bnMap.size()).isEqualTo(2);
+    bnMap.keySet().forEach(x -> {
+      assertThat(x).satisfiesAnyOf(
+          member -> assertThat(member).isEqualTo(remoteNodeA),
+          member -> assertThat(member).isEqualTo(remoteNodeB));
+      assertThat(bnMap.get(x).size()).isEqualTo(5);
+    });
+  }
+
+  @Test
+  public void exceptionIsThrownWhenNodesAreNotFoundForQueryBuckets() {
+    List<Integer> bucketList = createBucketList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    Set<Integer> bucketSet = new HashSet<>(bucketList);
+    when(pr.getDataStore()).thenReturn(null);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Set<InternalDistributedMember> nodes = new HashSet<>(allNodes);
+    when(regionAdvisor.adviseDataStore()).thenReturn(nodes);
+    for (Integer bid : bucketList) {
+      if (bid != 1) {
+        when(regionAdvisor.getBucketOwners(bid)).thenReturn(nodes);
+      }
+    }
+    when(pr.getRegionAdvisor()).thenReturn(regionAdvisor);
+    PartitionedRegionQueryEvaluator prqe =
+        new PartitionedRegionQueryEvaluator(system, pr, query, mock(ExecutionContext.class),
+            null, new LinkedResultSet(), bucketSet);
+
+    assertThatThrownBy(prqe::buildNodeToBucketMap).isInstanceOf(QueryException.class)
+        .hasMessageContaining(
+            "Data loss detected, unable to find the hosting  node for some of the dataset.");
+  }
+
   private Map<InternalDistributedMember, List<Integer>> createFakeBucketMap() {
     Map<InternalDistributedMember, List<Integer>> bucketToNodeMap = new HashMap<>();
     bucketToNodeMap.put(localNode, createBucketList(1, 2, 3));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
index 9055bb0..9e70564 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TXStateTest.java
@@ -19,9 +19,12 @@ import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabl
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -29,12 +32,15 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+
 import javax.transaction.Status;
 
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.InOrder;
 
 import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.EntryNotFoundException;
@@ -93,6 +99,25 @@ public class TXStateTest {
   }
 
   @Test
+  public void attachFilterProfileAfterApplyingChanges() {
+    TXState txState = spy(new TXState(txStateProxy, false, disabledClock()));
+    doReturn(new ArrayList()).when(txState).generateEventOffsets();
+    doNothing().when(txState).attachFilterProfileInformation(any());
+    doNothing().when(txState).applyChanges(any());
+    TXCommitMessage txCommitMessage = mock(TXCommitMessage.class);
+    doReturn(txCommitMessage).when(txState).buildMessage();
+
+    txState.commit();
+
+    InOrder inOrder = inOrder(txState, txCommitMessage);
+    inOrder.verify(txState).applyChanges(any());
+    inOrder.verify(txState).attachFilterProfileInformation(any());
+    inOrder.verify(txState).buildMessage();
+    inOrder.verify(txCommitMessage).send(any());
+    inOrder.verify(txState).firePendingCallbacks();
+  }
+
+  @Test
   public void doAfterCompletionCanCommitJTA() {
     TXState txState = spy(new TXState(txStateProxy, false, disabledClock()));
     txState.reserveAndCheck();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java
index a711a7c..77d6712 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/FilterProfileJUnitTest.java
@@ -49,6 +49,7 @@ public class FilterProfileJUnitTest {
     when(mockCache.getCacheServers()).thenReturn(Collections.emptyList());
     when(mockRegion.getGemFireCache()).thenReturn(mockCache);
     fprofile = new FilterProfile(mockRegion);
+    when(mockRegion.getFilterProfile()).thenReturn(fprofile);
   }
 
   @Test
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
new file mode 100644
index 0000000..184d37f
--- /dev/null
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/PartitionedRegionTxDUnitTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.cq.dunit;
+
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.query.CqAttributes;
+import org.apache.geode.cache.query.CqAttributesFactory;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.TXManagerImpl;
+import org.apache.geode.internal.cache.TXState;
+import org.apache.geode.internal.cache.TXStateInterface;
+import org.apache.geode.internal.cache.TXStateProxyImpl;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.DistributedBlackboard;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category(ClientSubscriptionTest.class)
+public class PartitionedRegionTxDUnitTest implements Serializable {
+  private final String REGION_NAME = "region";
+  private MemberVM server1;
+  private MemberVM server2;
+  private ClientVM client;
+
+  @Rule
+  public DistributedBlackboard blackboard = new DistributedBlackboard();
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  @Before
+  public void setUp() {
+    MemberVM locator = clusterStartupRule.startLocatorVM(0);
+    server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      Region<Object, Object> region =
+          cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+              new PartitionAttributesFactory<>().setRedundantCopies(1).setTotalNumBuckets(1)
+                  .create())
+              .create(REGION_NAME);
+
+      PartitionRegionHelper.assignBucketsToPartitions(region);
+    });
+
+    server2.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.createRegionFactory(RegionShortcut.PARTITION).setPartitionAttributes(
+          new PartitionAttributesFactory<>().setRedundantCopies(1).setTotalNumBuckets(1).create())
+          .create(REGION_NAME);
+    });
+  }
+
+  @Test
+  public void verifyCqRegistrationWorksDuringTxCommit() throws Exception {
+    blackboard.setMailbox("CqQueryResultCount", 0);
+    blackboard.setMailbox("CqEvents", 0);
+
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    AsyncInvocation<?> serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartCQ");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      blackboard.waitForGate("StartCQ");
+      SelectResults<Object> cqResults = queryService
+          .newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+      blackboard.signalGate("RegistrationFinished");
+      blackboard.setMailbox("CqQueryResultCount", cqResults.asList().size());
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqQueryResultCount = blackboard.getMailbox("CqQueryResultCount");
+      Integer CqEvents = blackboard.getMailbox("CqEvents");
+      assertThat(CqQueryResultCount + CqEvents).isEqualTo(1);
+    });
+
+    serverAsync.await();
+  }
+
+  @Test
+  public void verifyCqEventInvocationForDestroyOpIfTxCommitFromClient() throws Exception {
+    blackboard.setMailbox("CqEvents", 0);
+
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server1.getPort()).withPoolSubscription(true));
+
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      cache.getRegion(REGION_NAME).put("Key-1", "value-1");
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+      QueryService queryService = clientCache.getQueryService();
+      CqAttributesFactory cqaf = new CqAttributesFactory();
+      TestCqListener testListener = new TestCqListener();
+      cqaf.addCqListener(testListener);
+      CqAttributes cqAttributes = cqaf.create();
+
+      queryService.newCq("Select * from " + SEPARATOR + REGION_NAME, cqAttributes)
+          .executeWithInitialResults();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      TXManagerImpl txManager = (TXManagerImpl) clientCache.getCacheTransactionManager();
+      txManager.begin();
+
+      clientCache.getRegion(REGION_NAME).destroy("Key-1");
+      txManager.commit();
+    });
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      Integer CqEvents = blackboard.getMailbox("CqEvents");
+      assertThat(CqEvents).isEqualTo(1);
+    });
+  }
+
+  @Test
+  public void verifyInterestRegistrationWorksDuringTxCommit() throws Exception {
+    client = clusterStartupRule.startClientVM(3,
+        cacheRule -> cacheRule.withServerConnection(server2.getPort()).withPoolSubscription(true));
+
+    AsyncInvocation serverAsync = server1.invokeAsync(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      txManager.begin();
+
+      TXStateInterface txState =
+          ((TXStateProxyImpl) txManager.getTXState()).getRealDeal(null, null);
+
+      ((TXState) txState).setDuringApplyChanges(() -> {
+        try {
+          blackboard.signalGate("StartReg");
+          blackboard.waitForGate("RegistrationFinished");
+        } catch (TimeoutException | InterruptedException e) {
+          // Do nothing
+        }
+      });
+
+      cache.getRegion(REGION_NAME).put("Key-5", "value-1");
+      txManager.commit();
+    });
+
+    client.invoke(() -> {
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+      Region<Object, Object> region =
+          clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+              .create(REGION_NAME);
+      blackboard.waitForGate("StartReg");
+      region.registerInterest("Key-5", InterestResultPolicy.KEYS_VALUES);
+      region.registerInterest("Key-6", InterestResultPolicy.KEYS_VALUES);
+      blackboard.signalGate("RegistrationFinished");
+
+      GeodeAwaitility.await().untilAsserted(() -> assertThat(region.size()).isEqualTo(1));
+    });
+
+    serverAsync.await();
+  }
+
+  private class TestCqListener implements CqListener, Serializable {
+
+    int numEvents = 0;
+
+    @Override
+    public void onEvent(CqEvent aCqEvent) {
+      numEvents++;
+      blackboard.setMailbox("CqEvents", numEvents);
+    }
+
+    @Override
+    public void onError(CqEvent aCqEvent) {}
+  }
+
+}