You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/03/16 21:57:13 UTC

[geode] branch feature/GEODE-4647 updated (a6ffa7e -> de760d2)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-4647
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit a6ffa7e  GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary             gateway sender queue
     add b53714c  GEODE-4735: get rid of Cache.getInstance call from CliUtils and GfshC… (#1551)
     add 4143a73  GEODE-4685: Moving of static DefaultQuery.setPdxReadSerialized to the TypeRegistry. Cleaned up the overriding of readSerialized to reset to previous value. Added cache to AbstractJdbcCallback.java so that children classes can access it. Replaced AtomicLong with LongAdder.
     add 386b92b  GEODE-4801 Readme links to "how to contribute"
     add 0908acb  Bug/geode 4798: Add JDBC Connector classes to sanctioned serializables (#1592)
     add 3939595  GEODE-4772: Enhance new protocol with ability to clear region of all entries. (#1590)
     add 3079826  GEODE-4716: Added overflow attributes to gateway mbeans
     add 94586e1  GEODE-4807: Pulse UI tests using locator/cluster rule. (#1584)
     add 6214a43  GEODE-2999: Add PutIfAbsent to the Protobuf protocol. (#1578)
     add 8a12c6d  GEODE-4818: Remove beSick() from DirectChannel,TcpConduit. (#1591)
     add a2099c4  GEODE-4825: Lucene Index should reset pdx read serialized (#1598)
     add 5c85b4d  GEODE-4814: Categorized FunctionServiceTest (#1597)
     add 25aebbe  GEODE-4813: Categorize remaining register interest tests (#1586)
     add a07b67a  GEODE-4266: Make JdbcConnectionException safely serializable to clients (#1601)
     add 2f1fe57  GEODE-4768: Close the socket to the locator in the experimental driver
     add 9223b16  GEODE-4527: get rid of static getAnyInstance or getInstance call to get the cache. (#1596)
     add 9e0c8d5  [GEODE-4835] Add python3 to ci docker image.
     add 963ba03  Merge pull request #1610 from smgoller/GEODE-4835
     add 736742d  GEODE-4669: Fall back on file controller if mbean controller fails
     add 2f2a73c  GEODE-4384: gfsh command to destroy jndi binding (#1588)
     add 140118d  GEODE-4823: OQL index not updated for tombstone (#1603)
     add b8d16ee  GEODE-4829: Categorized AEQ tests (#1605)
     add 7c936ca  GEODE-4797: Categorized WanTest (#1604)
     add 29eff01  GEODE-3891: Cannot enable ciphers for REST interface (#1613)
     add 7a3da49  Revert "GEODE-2999: Add PutIfAbsent to the Protobuf protocol. (#1578)"
     add 39a81f4  GEODE-4785: Add test category annotation to Pulse tests (#1595)
     add d621b47  GEODE-4689: Colocation complete listeners added (#1565)
     add 662fb51  GEODE-4834: Remove the newly added isConcurrencyConflict from GatewaySenderEventImpl (#1609)
     add d7c46ff  GEODE-4831 Update docs for registerInterest API changes (#1607)
     add 6630f48  GEODE-4824: Categorized Session state tests (#1599)
     add f7c7451  GEODE-4693: fix JDBCLoader on non-object pdx fields (#1517)
     add 4a6ac46  GEODE-4803: Mark test as Flaky and adjust timeout
     add ef2ebd7  GEODE-4816: Add support for authentication to the experimental driver. (#1615)
     add 5dc08bf  GEODE-4844: JdbcWriter and JdbcAsyncWriter will write rows loaded by the JdbcLoader (#1618)
     add 9aa83fe  GEODE-4830: use POJOs to represent cache configuration allowed by cache-1.0.xsd
     add 7f8eedc  GEODE-4830: use CacheConfig to access the jndi list in ListJNDIBindingCommand
     add 44dc697  GEODE-4870: Categorize AlterAsyncEventQueueCommandDUnitTest as DistributedTest (#1626)
     add be9d99d  GEODE-4769: optional early serialization of EntryEvent key and new value
     add 076d9ab  GEODE-4769: fix tests using non-Serializable value objects
     add c5b4401  GEODE-4827: CQ not added to cq map on exception (#1602)
     add d48607d  GEODE-4868: depose primary should reduce brq's size in deposePrimaryForColocatedChildren (#1625)
     add 9655e40  GEODE-4622 Revert jna upgrade
     new de760d2  GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary             gateway sender queue

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a6ffa7e)
            \
             N -- N -- N   refs/heads/feature/GEODE-4647 (de760d2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 README.md                                          |   23 +-
 ci/docker/Dockerfile                               |    2 +-
 ci/scripts/concourse_job_performance.sh            |    1 -
 geode-assembly/build.gradle                        |   14 +
 .../cli/commands/PutCommandWithJsonTest.java       |    6 +-
 .../web/controllers/RestAPIsWithSSLDUnitTest.java  |   19 +-
 .../geode/tools/pulse/PulseConnectivityTest.java   |    3 +-
 .../geode/tools/pulse/PulseDataExportTest.java     |    3 +-
 .../tools/pulse/PulseJmxPasswordFileTest.java      |    4 +-
 .../tools/pulse/PulseSecurityIntegrationTest.java  |    4 +-
 .../geode/tools/pulse/PulseSecurityTest.java       |    4 +-
 .../tools/pulse/PulseSecurityWithSSLTest.java      |    4 +-
 .../tools/pulse/ui/PulseAcceptanceAuthTest.java    |  102 +
 .../tools/pulse/ui/PulseAcceptanceNoAuthTest.java  |   94 +
 .../tools/pulse/ui/PulseAcceptanceTestBase.java    |  354 ++
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     |   44 +-
 .../connectors/jdbc/JdbcConnectorException.java    |   80 +-
 .../apache/geode/connectors/jdbc/JdbcLoader.java   |    6 +-
 .../apache/geode/connectors/jdbc/JdbcWriter.java   |   25 +-
 .../jdbc/internal/AbstractJdbcCallback.java        |   20 +-
 .../geode/connectors/jdbc/internal/SqlHandler.java |   87 +-
 .../jdbc/internal/SqlToPdxInstanceCreator.java     |  205 ++
 .../jdbc/internal/TableKeyColumnManager.java       |   12 +-
 .../jdbc/internal/cli/AlterConnectionCommand.java  |    2 +-
 .../jdbc/internal/cli/AlterConnectionFunction.java |    2 +-
 .../jdbc/internal/cli/AlterMappingCommand.java     |    2 +-
 .../jdbc/internal/cli/AlterMappingFunction.java    |    2 +-
 .../jdbc/internal/cli/CreateConnectionCommand.java |    2 +-
 .../internal/cli/CreateConnectionFunction.java     |    2 +-
 .../jdbc/internal/cli/CreateMappingCommand.java    |    2 +-
 .../jdbc/internal/cli/CreateMappingFunction.java   |    2 +-
 .../internal/cli/DescribeConnectionCommand.java    |    2 +-
 .../internal/cli/DescribeConnectionFunction.java   |    2 +-
 .../jdbc/internal/cli/DescribeMappingCommand.java  |    2 +-
 .../jdbc/internal/cli/DescribeMappingFunction.java |    2 +-
 .../internal/cli/DestroyConnectionCommand.java     |    2 +-
 .../internal/cli/DestroyConnectionFunction.java    |    2 +-
 .../jdbc/internal/cli/DestroyMappingCommand.java   |    2 +-
 .../jdbc/internal/cli/DestroyMappingFunction.java  |    2 +-
 .../jdbc/internal/cli/ExceptionHandler.java        |    2 +-
 .../cli/FunctionContextArgumentProvider.java       |   10 +-
 .../jdbc/internal/cli/JdbcCliFunction.java         |   16 +-
 .../jdbc/internal/cli/ListConnectionCommand.java   |    2 +-
 .../jdbc/internal/cli/ListConnectionFunction.java  |    2 +-
 .../jdbc/internal/cli/ListMappingCommand.java      |    2 +-
 .../jdbc/internal/cli/ListMappingFunction.java     |    2 +-
 .../ConnectorsDistributedSystemService.java        |   43 +
 .../geode.apache.org/schema/jdbc/jdbc-1.0.xsd      |    0
 ...e.distributed.internal.DistributedSystemService |   16 +
 .../sanctioned-geode-connectors-serializables.txt  |   18 +
 .../AnalyzeConnectorsSerializablesJUnitTest.java   |   14 +-
 .../jdbc/ClassWithSupportedPdxFields.java          |  181 +
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |   26 +-
 .../geode/connectors/jdbc/JdbcAsyncWriterTest.java |   26 +-
 .../jdbc/JdbcConnectorExceptionTest.java           |   85 +
 .../geode/connectors/jdbc/JdbcDUnitTest.java       |  246 +-
 .../connectors/jdbc/JdbcLoaderIntegrationTest.java |  147 +-
 .../geode/connectors/jdbc/JdbcLoaderTest.java      |    7 +-
 .../connectors/jdbc/JdbcWriterIntegrationTest.java |   16 +-
 .../geode/connectors/jdbc/JdbcWriterTest.java      |   51 +-
 .../jdbc/internal/AbstractJdbcCallbackTest.java    |   33 +-
 .../connectors/jdbc/internal/SqlHandlerTest.java   |  190 +-
 .../jdbc/internal/SqlToPdxInstanceCreatorTest.java |  515 +++
 .../jdbc/internal/TableKeyColumnManagerTest.java   |    8 +-
 .../jdbc/internal/TestConfigService.java           |   22 +-
 .../cli/AlterConnectionCommandIntegrationTest.java |    5 +-
 .../cli/AlterMappingCommandIntegrationTest.java    |    7 +-
 .../CreateConnectionCommandIntegrationTest.java    |    1 +
 .../cli/CreateMappingCommandIntegrationTest.java   |    1 +
 .../DescribeConnectionCommandIntegrationTest.java  |    1 +
 .../cli/DescribeMappingCommandIntegrationTest.java |    1 +
 .../DestroyConnectionCommandIntegrationTest.java   |    1 +
 .../cli/DestroyMappingCommandIntegrationTest.java  |    1 +
 .../cli/JDBCConnectorFunctionsSecurityTest.java    |    2 +-
 .../jdbc/internal/cli/JdbcCliFunctionTest.java     |    4 +-
 .../cli/JdbcClusterConfigDistributedTest.java      |   12 +-
 .../cli/ListConnectionCommandIntegrationTest.java  |    1 +
 .../cli/ListMappingCommandIntegrationTest.java     |    1 +
 .../apache/geode/codeAnalysis/excludedClasses.txt  |    5 +
 .../codeAnalysis/sanctionedDataSerializables.txt   |    0
 .../java/org/apache/geode/cache/GemFireCache.java  |   17 +
 .../internal/AsyncEventQueueFactoryImpl.java       |    9 +-
 .../geode/cache/query/internal/DefaultQuery.java   |  146 +-
 .../cache/query/internal/index/IndexManager.java   |   30 +-
 .../internal/ClusterConfigurationService.java      |  105 +
 .../distributed/internal/direct/DirectChannel.java |   27 -
 .../membership/gms/mgr/GMSMembershipManager.java   |   66 +-
 .../internal/streaming/StreamingOperation.java     |   60 +-
 .../geode/internal/InternalDataSerializer.java     |    2 -
 .../geode/internal/cache/AbstractRegionMap.java    |    6 +
 .../apache/geode/internal/cache/BucketAdvisor.java |   11 +-
 ...rnalEntryEvent.java => ColocationListener.java} |   16 +-
 .../geode/internal/cache/EntryEventImpl.java       |    2 +-
 .../internal/cache/EntryEventSerialization.java    |   87 +
 .../apache/geode/internal/cache/FilterProfile.java |   25 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |   39 +-
 .../apache/geode/internal/cache/InternalCache.java |    4 +
 .../geode/internal/cache/InternalEntryEvent.java   |    6 +
 .../internal/cache/PartitionRegionConfig.java      |   72 +-
 .../geode/internal/cache/PartitionedRegion.java    |   23 +-
 .../geode/internal/cache/TXCommitMessage.java      |    6 +-
 .../apache/geode/internal/cache/TXEntryState.java  |  144 +-
 .../org/apache/geode/internal/cache/TXState.java   |  136 +-
 .../org/apache/geode/internal/cache/Token.java     |   31 +-
 .../internal/cache/configuration/CacheConfig.java  | 3571 ++++++++++++++++++++
 .../internal/cache/configuration/CacheElement.java |   23 +
 .../cache/configuration/CacheLoaderType.java       |  121 +
 .../configuration/CacheTransactionManagerType.java |  321 ++
 .../cache/configuration/CacheWriterType.java       |  121 +
 .../configuration/ClassWithParametersType.java     |  117 +
 .../cache/configuration/DeclarableType.java        |  121 +
 .../internal/cache/configuration/DiskDirType.java  |  111 +
 .../internal/cache/configuration/DiskDirsType.java |   94 +
 .../cache/configuration/DiskStoreType.java         |  355 ++
 .../configuration/DynamicRegionFactoryType.java    |  179 +
 .../configuration/EnumActionDestroyOverflow.java   |   70 +
 .../cache/configuration/EnumReadableWritable.java  |   70 +
 .../configuration/ExpirationAttributesType.java    |  251 ++
 .../cache/configuration/FunctionServiceType.java   |  194 ++
 .../cache/configuration/InitializerType.java       |  123 +
 .../cache/configuration/JndiBindingsType.java      |  741 ++++
 .../cache/configuration/ParameterType.java         |  139 +
 .../internal/cache/configuration/PdxType.java      |  297 ++
 .../internal/cache/configuration/PoolType.java     |  872 +++++
 .../configuration/RegionAttributesDataPolicy.java  |   80 +
 .../RegionAttributesIndexUpdateType.java           |   71 +
 .../configuration/RegionAttributesMirrorType.java  |   72 +
 .../cache/configuration/RegionAttributesScope.java |   74 +
 .../cache/configuration/RegionAttributesType.java  | 3478 +++++++++++++++++++
 .../internal/cache/configuration/RegionConfig.java | 1067 ++++++
 .../cache/configuration/RegionElement.java         |   23 +
 .../cache/configuration/ResourceManagerType.java   |  163 +
 .../SerializationRegistrationType.java             |  288 ++
 .../internal/cache/configuration/ServerType.java   |  702 ++++
 .../internal/cache/configuration/StringType.java   |   81 +
 .../cache/entries/AbstractRegionEntry.java         |   51 +-
 .../internal/cache/partitioned/QueryMessage.java   |    5 +-
 .../internal/cache/wan/GatewaySenderEventImpl.java |   33 +-
 .../wan/parallel/ParallelGatewaySenderQueue.java   |   34 +
 .../cache/wan/serial/SerialGatewaySenderQueue.java |   35 +
 .../internal/cache/xmlcache/CacheCreation.java     |   66 +-
 .../geode/internal/lang/SystemPropertyHelper.java  |    7 +-
 .../process/MBeanOrFileProcessController.java      |   64 +
 .../internal/process/ProcessControllerFactory.java |   10 +-
 .../org/apache/geode/internal/tcp/TCPConduit.java  |   72 +-
 .../geode/management/AsyncEventQueueMXBean.java    |   14 +
 .../geode/management/GatewaySenderMXBean.java      |   17 +-
 .../geode/management/internal/JettyHelper.java     |    6 +-
 .../management/internal/ManagementStrings.java     |    3 +
 .../apache/geode/management/internal/SSLUtil.java  |   12 +-
 .../internal/beans/AsyncEventQueueMBean.java       |   15 +
 .../internal/beans/AsyncEventQueueMBeanBridge.java |   44 +-
 .../internal/beans/GatewaySenderMBean.java         |   15 +
 .../internal/beans/GatewaySenderMBeanBridge.java   |   41 +-
 .../beans/stats/GatewaySenderOverflowMonitor.java  |  157 +
 .../management/internal/beans/stats/StatsKey.java  |    3 +
 .../geode/management/internal/cli/CliUtil.java     |   16 -
 .../management/internal/cli/CommandManager.java    |   29 +-
 .../cli/commands/AlterAsyncEventQueueCommand.java  |    2 +-
 .../cli/commands/AlterOfflineDiskStoreCommand.java |    2 +-
 .../internal/cli/commands/AlterRegionCommand.java  |    2 +-
 .../cli/commands/AlterRuntimeConfigCommand.java    |    4 +-
 .../cli/commands/BackupDiskStoreCommand.java       |    2 +-
 .../cli/commands/ChangeLogLevelCommand.java        |  140 +-
 .../cli/commands/ClearDefinedIndexesCommand.java   |    2 +-
 .../cli/commands/CloseDurableCQsCommand.java       |    4 +-
 .../cli/commands/CloseDurableClientCommand.java    |    4 +-
 .../cli/commands/CompactDiskStoreCommand.java      |    7 +-
 .../commands/CompactOfflineDiskStoreCommand.java   |    5 +-
 .../internal/cli/commands/ConfigurePDXCommand.java |    5 +-
 .../internal/cli/commands/ConnectCommand.java      |   10 +-
 .../cli/commands/CountDurableCQEventsCommand.java  |    4 +-
 .../cli/commands/CreateAsyncEventQueueCommand.java |    2 +-
 .../cli/commands/CreateDefinedIndexesCommand.java  |    2 +-
 .../cli/commands/CreateDiskStoreCommand.java       |    2 +-
 .../cli/commands/CreateGatewayReceiverCommand.java |    2 +-
 .../cli/commands/CreateGatewaySenderCommand.java   |    2 +-
 .../internal/cli/commands/CreateIndexCommand.java  |    2 +-
 .../cli/commands/CreateJndiBindingCommand.java     |    4 +-
 .../internal/cli/commands/CreateRegionCommand.java |    8 +-
 .../internal/cli/commands/DebugCommand.java        |    2 +-
 .../internal/cli/commands/DefineIndexCommand.java  |    2 +-
 .../internal/cli/commands/DeployCommand.java       |    4 +-
 .../cli/commands/DescribeClientCommand.java        |   11 +-
 .../cli/commands/DescribeConfigCommand.java        |    2 +-
 .../cli/commands/DescribeConnectionCommand.java    |    2 +-
 .../cli/commands/DescribeDiskStoreCommand.java     |    2 +-
 .../cli/commands/DescribeMemberCommand.java        |    2 +-
 .../commands/DescribeOfflineDiskStoreCommand.java  |    2 +-
 .../cli/commands/DescribeRegionCommand.java        |    4 +-
 .../commands/DestroyAsyncEventQueueCommand.java    |    2 +-
 .../cli/commands/DestroyDiskStoreCommand.java      |    2 +-
 .../cli/commands/DestroyFunctionCommand.java       |    2 +-
 .../cli/commands/DestroyGatewaySenderCommand.java  |    2 +-
 .../internal/cli/commands/DestroyIndexCommand.java |    5 +-
 .../cli/commands/DestroyJndiBindingCommand.java    |   14 +-
 .../cli/commands/DestroyRegionCommand.java         |    4 +-
 .../internal/cli/commands/DisconnectCommand.java   |    5 +-
 .../internal/cli/commands/EchoCommand.java         |    2 +-
 .../cli/commands/ExecuteFunctionCommand.java       |    7 +-
 .../cli/commands/ExecuteScriptCommand.java         |    2 +-
 .../internal/cli/commands/ExitCommand.java         |    2 +-
 .../internal/cli/commands/ExportConfigCommand.java |    4 +-
 .../internal/cli/commands/ExportDataCommand.java   |    2 +-
 .../ExportImportClusterConfigurationCommands.java  |    8 +-
 .../internal/cli/commands/ExportLogsCommand.java   |    2 +-
 .../commands/ExportOfflineDiskStoreCommand.java    |    5 +-
 .../cli/commands/ExportStackTraceCommand.java      |    2 +-
 .../internal/cli/commands/GCCommand.java           |   11 +-
 .../internal/cli/commands/GetCommand.java          |    4 +-
 .../internal/cli/commands/GfshCommand.java         |   73 +-
 .../internal/cli/commands/GfshHelpCommand.java     |    2 +-
 .../internal/cli/commands/GfshHintCommand.java     |    2 +-
 .../internal/cli/commands/HistoryCommand.java      |    2 +-
 .../internal/cli/commands/ImportDataCommand.java   |    2 +-
 .../cli/commands/ListAsyncEventQueuesCommand.java  |    4 +-
 .../internal/cli/commands/ListClientCommand.java   |    6 +-
 .../internal/cli/commands/ListDeployedCommand.java |    4 +-
 .../cli/commands/ListDiskStoresCommand.java        |    2 +-
 .../cli/commands/ListDurableClientCQsCommand.java  |    4 +-
 .../internal/cli/commands/ListFunctionCommand.java |    4 +-
 .../internal/cli/commands/ListGatewayCommand.java  |    2 +-
 .../internal/cli/commands/ListIndexCommand.java    |    4 +-
 .../cli/commands/ListJndiBindingCommand.java       |   46 +-
 .../internal/cli/commands/ListMembersCommand.java  |    2 +-
 .../internal/cli/commands/ListRegionCommand.java   |    4 +-
 .../commands/LoadBalanceGatewaySenderCommand.java  |    5 +-
 .../internal/cli/commands/LocateEntryCommand.java  |    4 +-
 .../internal/cli/commands/NetstatCommand.java      |    6 +-
 .../internal/cli/commands/PDXRenameCommand.java    |    2 +-
 .../cli/commands/PauseGatewaySenderCommand.java    |    2 +-
 .../internal/cli/commands/PutCommand.java          |    4 +-
 .../internal/cli/commands/QueryCommand.java        |    5 +-
 .../internal/cli/commands/RebalanceCommand.java    |   56 +-
 .../internal/cli/commands/RemoveCommand.java       |    4 +-
 .../cli/commands/ResumeGatewaySenderCommand.java   |    5 +-
 .../commands/RevokeMissingDiskStoreCommand.java    |    2 +-
 .../internal/cli/commands/SetVariableCommand.java  |    2 +-
 .../internal/cli/commands/ShCommand.java           |    5 +-
 .../internal/cli/commands/ShowDeadlockCommand.java |    5 +-
 .../internal/cli/commands/ShowLogCommand.java      |    2 +-
 .../internal/cli/commands/ShowMetricsCommand.java  |    2 +-
 .../cli/commands/ShowMissingDiskStoreCommand.java  |    2 +-
 .../internal/cli/commands/ShutdownCommand.java     |    9 +-
 .../internal/cli/commands/SleepCommand.java        |    6 +-
 .../cli/commands/StartGatewayReceiverCommand.java  |    5 +-
 .../cli/commands/StartGatewaySenderCommand.java    |    2 +-
 .../internal/cli/commands/StartLocatorCommand.java |    2 +-
 .../internal/cli/commands/StartServerCommand.java  |    2 +-
 .../StatusClusterConfigServiceCommand.java         |    5 +-
 .../cli/commands/StatusGatewayReceiverCommand.java |    5 +-
 .../cli/commands/StatusGatewaySenderCommand.java   |    5 +-
 .../cli/commands/StopGatewayReceiverCommand.java   |    5 +-
 .../cli/commands/StopGatewaySenderCommand.java     |    5 +-
 .../internal/cli/commands/UndeployCommand.java     |    4 +-
 .../commands/UpgradeOfflineDiskStoreCommand.java   |    5 +-
 .../cli/commands/ValidateDiskStoreCommand.java     |    2 +-
 .../internal/cli/commands/VersionCommand.java      |    2 +-
 .../commands/lifecycle/StartJConsoleCommand.java   |    2 +-
 .../commands/lifecycle/StartJVisualVMCommand.java  |    2 +-
 .../cli/commands/lifecycle/StartPulseCommand.java  |    2 +-
 .../cli/commands/lifecycle/StartVsdCommand.java    |    2 +-
 .../commands/lifecycle/StatusLocatorCommand.java   |    2 +-
 .../commands/lifecycle/StatusServerCommand.java    |    2 +-
 .../cli/commands/lifecycle/StopLocatorCommand.java |    2 +-
 .../cli/commands/lifecycle/StopServerCommand.java  |    2 +-
 .../cli/functions/ListJndiBindingFunction.java     |    3 -
 .../cli/remote/OnlineCommandProcessor.java         |    6 +-
 .../pdx/internal/AutoSerializableManager.java      |    2 +-
 .../geode/pdx/internal/InternalPdxInstance.java}   |    7 +-
 .../apache/geode/pdx/internal/PdxInstanceImpl.java |   12 +-
 .../apache/geode/pdx/internal/TypeRegistry.java    |   14 +-
 .../geode/redis/internal/RegionProvider.java       |    1 +
 .../sanctioned-geode-core-serializables.txt        |    6 +
 .../test/java/org/apache/geode/DeltaTestImpl.java  |   52 +-
 ...ncEventQueueEvictionAndExpirationJUnitTest.java |   13 +-
 .../SerialAsyncEventQueueImplJUnitTest.java        |    4 +-
 .../BaseLineAndCompareQueryPerfJUnitTest.java      |    3 +-
 .../query/Bug32947ValueConstraintJUnitTest.java    |   41 +-
 .../geode/cache/query/PdxStringQueryJUnitTest.java |  211 +-
 .../org/apache/geode/cache/query/data/Address.java |   18 +-
 .../org/apache/geode/cache/query/data/Data.java    |   20 +-
 .../apache/geode/cache/query/data/Employee.java    |   22 +-
 .../apache/geode/cache/query/data/Keywords.java    |   11 +-
 .../org/apache/geode/cache/query/data/Manager.java |    7 -
 .../org/apache/geode/cache/query/data/PhoneNo.java |   13 +-
 .../apache/geode/cache/query/data/Portfolio.java   |   14 +-
 .../org/apache/geode/cache/query/data/Street.java  |   13 +-
 .../apache/geode/cache/query/data/TradingLine.java |    3 +-
 .../cache/query/dunit/PdxStringQueryDUnitTest.java |   48 +-
 .../query/functional/INOperatorJUnitTest.java      |   93 +-
 .../IUMRMultiIndexesMultiRegionJUnitTest.java      |  965 ++----
 .../functional/IUMRShuffleIteratorsJUnitTest.java  |  142 +-
 .../query/functional/LikePredicateJUnitTest.java   |  417 +--
 .../cache/query/functional/MiscJUnitTest.java      |   77 +-
 .../query/functional/MultipleRegionsJUnitTest.java |   29 +-
 .../functional/ReservedKeywordsJUnitTest.java      |   13 +-
 .../query/internal/CopyOnReadQueryJUnitTest.java   |   73 +-
 .../internal/QueryObserverCallbackJUnitTest.java   |  461 ++-
 .../cache/query/internal/QueryUtilsJUnitTest.java  |  283 +-
 .../index/HashIndexQueryIntegrationTest.java       |  409 +--
 .../internal/index/IndexMaintenanceJUnitTest.java  |  857 ++---
 .../query/internal/index/IndexUseJUnitTest.java    |  937 ++---
 .../index/MapRangeIndexMaintenanceJUnitTest.java   |  183 +-
 .../PRQueryRemoteNodeExceptionDUnitTest.java       |    2 +-
 .../apache/geode/cache30/CacheXml80DUnitTest.java  |   14 +-
 .../apache/geode/cache30/CacheXml81DUnitTest.java  |   13 +-
 .../cache30/PRBucketSynchronizationDUnitTest.java  |  356 +-
 .../geode/cache30/RRSynchronizationDUnitTest.java  |  310 +-
 .../AnalyzeSerializablesJUnitTest.java             |   45 +-
 .../internal/CacheConfigIntegrationTest.java       |   55 +
 .../InternalClusterConfigurationServiceTest.java   |  222 ++
 .../geode/internal/PdxDeleteFieldJUnitTest.java    |   12 +-
 ...okenSerializationConsistencyRegressionTest.java |  216 ++
 ...ntDeserializationCopyOnReadRegressionTest.java} |  165 +-
 .../cache/EntryEventSerializationTest.java         |  201 ++
 .../internal/cache/EvictionAttributesImplTest.java |   40 +-
 .../FilterProfileNullCqBaseRegionJUnitTest.java    |   74 +
 ...st.java => PartitionRegionConfigJUnitTest.java} |   30 +-
 .../internal/cache/PartitionRegionConfigTest.java  |   84 +
 .../cache/PartitionedRegionCreationJUnitTest.java  |    1 +
 ...itionedRegionQueryEvaluatorIntegrationTest.java |   42 +-
 .../cache/entries/AbstractRegionEntryTest.java     |   23 +-
 .../ClientFunctionTimeoutRegressionTest.java       |    3 +-
 .../ClientServerFunctionExecutionDUnitTest.java    |    3 +-
 ...istributedRegionFunctionExecutionDUnitTest.java |    3 +-
 .../ExecuteFunctionInstanceRegressionTest.java     |    3 +-
 ...ExceptionsIncludeLocalMemberRegressionTest.java |    3 +-
 .../FunctionExecutionOnLonerRegressionTest.java    |    3 +-
 .../FunctionExecution_ExceptionDUnitTest.java      |    3 +-
 .../FunctionServiceClientAccessorPRDUnitTest.java  |    3 +-
 ...ceClientAccessorPRMultipleMembersDUnitTest.java |    3 +-
 ...AccessorPRMultipleMembersMultihopDUnitTest.java |    3 +-
 ...tionServiceClientMultipleOnServerDUnitTest.java |    3 +-
 .../FunctionServiceClientOnServerDUnitTest.java    |    3 +-
 .../execute/FunctionServiceLocalPRDUnitTest.java   |    3 +-
 .../execute/FunctionServiceLocalRRDUnitTest.java   |    3 +-
 .../FunctionServiceMultipleOnMemberDUnitTest.java  |    3 +-
 .../FunctionServicePeerAccessorPRDUnitTest.java    |    3 +-
 ...vicePeerAccessorPRMultipleMembersDUnitTest.java |    3 +-
 .../FunctionServicePeerAccessorRRDUnitTest.java    |    3 +-
 .../FunctionServiceSingleOnMemberDUnitTest.java    |    3 +-
 .../execute/FunctionServiceStatsDUnitTest.java     |    3 +-
 .../cache/execute/LocalDataSetDUnitTest.java       |    3 +-
 .../execute/LocalDataSetIndexingDUnitTest.java     |    3 +-
 .../execute/LocalFunctionExecutionDUnitTest.java   |    3 +-
 .../execute/MemberFunctionExecutionDUnitTest.java  |    3 +-
 .../MultiRegionFunctionExecutionDUnitTest.java     |    3 +-
 .../OnGroupsFunctionExecutionDUnitTest.java        |    3 +-
 ...lientServerFunctionExecutionNoAckDUnitTest.java |    3 +-
 ...ientServerRegionFunctionExecutionDUnitTest.java |    3 +-
 ...erRegionFunctionExecutionFailoverDUnitTest.java |    3 +-
 ...egionFunctionExecutionNoSingleHopDUnitTest.java |    3 +-
 ...ctionExecutionSelectorNoSingleHopDUnitTest.java |    3 +-
 ...rRegionFunctionExecutionSingleHopDUnitTest.java |    3 +-
 .../execute/PRFunctionExecutionDUnitTest.java      |    3 +-
 ...FunctionExecutionWithResultSenderDUnitTest.java |    3 +-
 .../extension/mock/MockExtensionCommands.java      |   16 +-
 .../cache/tier/sockets/HAInterestTestCase.java     |    3 +-
 .../sockets/RegisterInterestIntegrationTest.java   |    3 +-
 ...isterInterestServerMetaDataDistributedTest.java |    3 +-
 ...yncEventQueueEntrySynchronizationDUnitTest.java |    3 +-
 .../cache/wan/AsyncEventQueueTestBase.java         |   87 +-
 ...ewayReceiverXmlParsingValidationsJUnitTest.java |    3 +-
 .../cache/wan/MyGatewaySenderEventListener.java    |    1 -
 .../asyncqueue/AsyncEventListenerDUnitTest.java    |   24 +-
 .../AsyncEventListenerOffHeapDUnitTest.java        |    8 +-
 .../asyncqueue/AsyncEventQueueStatsDUnitTest.java  |    7 +-
 .../AsyncEventQueueValidationsJUnitTest.java       |   11 +-
 .../ConcurrentAsyncEventQueueDUnitTest.java        |    7 +-
 .../ConcurrentAsyncEventQueueOffHeapDUnitTest.java |    8 +-
 .../CommonParallelAsyncEventQueueDUnitTest.java    |    7 +-
 ...monParallelAsyncEventQueueOffHeapDUnitTest.java |    8 +-
 ...elGatewaySenderFlushedCoordinatorJUnitTest.java |   11 +-
 .../ProcessControllerFactoryIntegrationTest.java   |    2 +-
 .../process/ProcessControllerFactoryTest.java      |    2 +-
 .../internal/cli/CommandManagerJUnitTest.java      |    1 -
 .../AlterAsyncEventQueueCommandDUnitTest.java      |   10 +-
 .../cli/commands/ConcurrentDeployDUnitTest.java    |    9 +-
 .../commands/DestroyJndiBindingCommandTest.java    |   32 +-
 .../cli/commands/DestroyRegionCommandTest.java     |    8 +-
 .../cli/commands/DiskStoreCommandsJUnitTest.java   |    4 +-
 .../cli/commands/ListIndexCommandJUnitTest.java    |    2 +-
 .../commands/ListJndiBindingCommandDUnitTest.java  |   29 +-
 ...dTest.java => QueryCommandIntegrationTest.java} |    8 +-
 .../DataCommandFunctionWithPDXJUnitTest.java       |    2 +-
 .../apache/geode/management/model/EmptyObject.java |    5 +-
 .../geode/pdx/AutoSerializableJUnitTest.java       |   10 +-
 .../apache/geode/pdx/PdxClientServerDUnitTest.java |   20 +-
 .../apache/geode/pdx/PdxSerializableJUnitTest.java |  135 +-
 .../rules/DistributedRestoreSystemProperties.java  |    8 +-
 .../java/org/apache/geode/test/fake/Fakes.java     |    3 +
 .../geode/test/junit/rules/ServerStarterRule.java  |   18 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |    2 +-
 .../codeAnalysis/sanctionedDataSerializables.txt   |   12 +-
 .../cache/query/internal/cq/CqServiceImpl.java     |    5 +-
 .../cache/PRDeltaPropagationDUnitTest.java         |  796 ++---
 ...igure_client_server_event_messaging.html.md.erb |    6 +-
 .../geode/experimental/driver/DriverFactory.java   |   34 +-
 .../geode/experimental/driver/ProtobufChannel.java |   54 +-
 .../geode/experimental/driver/ProtobufDriver.java  |    7 +-
 .../geode/experimental/driver/ProtobufRegion.java  |    8 +
 .../apache/geode/experimental/driver/Region.java   |    6 +
 .../experimental/driver/AuthenticationTest.java    |   92 +
 .../experimental/driver/RegionIntegrationTest.java |   11 +
 .../geode/test/junit/categories/AEQTest.java       |    6 +-
 .../test/junit/categories/FunctionServiceTest.java |    6 +-
 .../geode/test/junit/categories/PulseTest.java     |    6 +-
 .../geode/test/junit/categories/SessionTest.java   |    6 +-
 .../lucene/internal/IndexRepositoryFactory.java    |   38 +-
 .../cache/lucene/internal/LuceneEventListener.java |   18 +-
 .../LuceneFileRegionColocationListener.java        |   47 +
 .../cache/lucene/internal/LuceneIndexImpl.java     |    2 +-
 .../cache/lucene/internal/LuceneServiceImpl.java   |    4 +-
 .../internal/PartitionedRepositoryManager.java     |    2 +-
 .../lucene/internal/RawIndexRepositoryFactory.java |    4 +-
 .../internal/RawLuceneRepositoryManager.java       |    2 +-
 .../lucene/internal/cli/LuceneIndexCommands.java   |   13 +-
 .../internal/LuceneEventListenerJUnitTest.java     |   77 +-
 .../PartitionedRepositoryManagerJUnitTest.java     |    2 +
 .../internal/cli/LuceneIndexCommandsJUnitTest.java |   20 +-
 .../cache/lucene/test/IndexRepositorySpy.java      |    8 +-
 .../src/main/proto/v1/clientProtocol.proto         |    3 +
 .../src/main/proto/v1/region_API.proto             |    7 +
 .../operations/ClearRequestOperationHandler.java   |   62 +
 .../registry/ProtobufOperationContextRegistry.java |    7 +
 ...rotobufConnectionAuthorizingStateProcessor.java |    4 +-
 .../protobuf/v1/AuthenticationIntegrationTest.java |    3 -
 .../internal/protocol/protobuf/v1/MessageUtil.java |    1 -
 .../protobuf/v1}/ProtobufRequestUtilities.java     |   14 +-
 .../v1/acceptance/CacheOperationsJUnitTest.java    |    2 +-
 .../LocatorConnectionAuthenticationDUnitTest.java  |    2 +-
 .../v1/acceptance/LocatorConnectionDUnitTest.java  |    2 +-
 ... => ClearRequestOperationHandlerJUnitTest.java} |   51 +-
 .../GetAllRequestOperationHandlerJUnitTest.java    |    2 +-
 .../GetAndPutJsonDocumentsDUnitTest.java           |    2 +-
 ...egionNamesRequestOperationHandlerJUnitTest.java |    2 +-
 .../GetRequestOperationHandlerJUnitTest.java       |    2 +-
 .../GetServerOperationHandlerJUnitTest.java        |    2 +-
 .../KeySetOperationHandlerJUnitTest.java           |    2 +-
 .../PutAllRequestOperationHandlerJUnitTest.java    |    2 +-
 .../PutRequestOperationHandlerJUnitTest.java       |    2 +-
 .../RemoveRequestOperationHandlerJUnitTest.java    |    2 +-
 .../controllers/PulseControllerJUnitTest.java      |   13 +-
 ...ueueOverflowMBeanAttributesDistributedTest.java |  231 ++
 ...nderOverflowMBeanAttributesDistributedTest.java |  220 ++
 .../cli/commands/QueryCommandOverHttpTest.java     |    4 +-
 gradle/dependency-versions.properties              |    2 +-
 gradle/test.gradle                                 |   34 +
 449 files changed, 23009 insertions(+), 6346 deletions(-)
 create mode 100644 geode-assembly/src/test/java/org/apache/geode/tools/pulse/ui/PulseAcceptanceAuthTest.java
 create mode 100644 geode-assembly/src/test/java/org/apache/geode/tools/pulse/ui/PulseAcceptanceNoAuthTest.java
 create mode 100644 geode-assembly/src/test/java/org/apache/geode/tools/pulse/ui/PulseAcceptanceTestBase.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreator.java
 create mode 100644 geode-connectors/src/main/java/org/apache/geode/internal/ConnectorsDistributedSystemService.java
 rename geode-connectors/src/main/resources/META-INF/{services => }/schemas/geode.apache.org/schema/jdbc/jdbc-1.0.xsd (100%)
 create mode 100644 geode-connectors/src/main/resources/META-INF/services/org.apache.geode.distributed.internal.DistributedSystemService
 create mode 100755 geode-connectors/src/main/resources/org/apache/geode/internal/sanctioned-geode-connectors-serializables.txt
 copy geode-web/src/test/java/org/apache/geode/management/internal/cli/commands/QueryCommandOverHttpTest.java => geode-connectors/src/test/java/org/apache/geode/codeAnalysis/AnalyzeConnectorsSerializablesJUnitTest.java (70%)
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/ClassWithSupportedPdxFields.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcConnectorExceptionTest.java
 create mode 100644 geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlToPdxInstanceCreatorTest.java
 create mode 100644 geode-connectors/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
 copy {geode-web-api => geode-connectors}/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt (100%)
 copy geode-core/src/main/java/org/apache/geode/internal/cache/{InternalEntryEvent.java => ColocationListener.java} (71%)
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventSerialization.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/CacheConfig.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/CacheElement.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/CacheLoaderType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/CacheTransactionManagerType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/CacheWriterType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/ClassWithParametersType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/DeclarableType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/DiskDirType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/DiskDirsType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/DiskStoreType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/DynamicRegionFactoryType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/EnumActionDestroyOverflow.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/EnumReadableWritable.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/ExpirationAttributesType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/FunctionServiceType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/InitializerType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/JndiBindingsType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/ParameterType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/PdxType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/PoolType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/RegionAttributesDataPolicy.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/RegionAttributesIndexUpdateType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/RegionAttributesMirrorType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/RegionAttributesScope.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/RegionAttributesType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/RegionConfig.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/RegionElement.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/ResourceManagerType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/SerializationRegistrationType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/ServerType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/configuration/StringType.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/internal/process/MBeanOrFileProcessController.java
 create mode 100644 geode-core/src/main/java/org/apache/geode/management/internal/beans/stats/GatewaySenderOverflowMonitor.java
 copy geode-core/src/{test/java/org/apache/geode/management/model/EmptyObject.java => main/java/org/apache/geode/pdx/internal/InternalPdxInstance.java} (83%)
 create mode 100644 geode-core/src/test/java/org/apache/geode/distributed/internal/CacheConfigIntegrationTest.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/distributed/internal/InternalClusterConfigurationServiceTest.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/BrokenSerializationConsistencyRegressionTest.java
 rename geode-core/src/test/java/org/apache/geode/{cache30/Bug38741DUnitTest.java => internal/cache/ClientDeserializationCopyOnReadRegressionTest.java} (80%)
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventSerializationTest.java
 copy geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/cli/JdbcCliFunctionTest.java => geode-core/src/test/java/org/apache/geode/internal/cache/EvictionAttributesImplTest.java (51%)
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/FilterProfileNullCqBaseRegionJUnitTest.java
 copy geode-core/src/test/java/org/apache/geode/internal/cache/{wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java => PartitionRegionConfigJUnitTest.java} (56%)
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/PartitionRegionConfigTest.java
 rename geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/{QueryCommandTest.java => QueryCommandIntegrationTest.java} (97%)
 create mode 100644 geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/AuthenticationTest.java
 copy geode-core/src/test/java/org/apache/geode/management/model/EmptyObject.java => geode-junit/src/main/java/org/apache/geode/test/junit/categories/AEQTest.java (84%)
 copy geode-core/src/test/java/org/apache/geode/management/model/EmptyObject.java => geode-junit/src/main/java/org/apache/geode/test/junit/categories/FunctionServiceTest.java (85%)
 mode change 100644 => 100755
 copy geode-core/src/test/java/org/apache/geode/management/model/EmptyObject.java => geode-junit/src/main/java/org/apache/geode/test/junit/categories/PulseTest.java (85%)
 copy geode-core/src/test/java/org/apache/geode/management/model/EmptyObject.java => geode-junit/src/main/java/org/apache/geode/test/junit/categories/SessionTest.java (83%)
 create mode 100644 geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneFileRegionColocationListener.java
 create mode 100644 geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandler.java
 rename geode-protobuf/src/{main/java/org/apache/geode/internal/protocol/protobuf/v1/utilities => test/java/org/apache/geode/internal/protocol/protobuf/v1}/ProtobufRequestUtilities.java (91%)
 copy geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/{KeySetOperationHandlerJUnitTest.java => ClearRequestOperationHandlerJUnitTest.java} (64%)
 create mode 100644 geode-wan/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueOverflowMBeanAttributesDistributedTest.java
 create mode 100644 geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderOverflowMBeanAttributesDistributedTest.java

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.

[geode] 01/01: GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary gateway sender queue

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-4647
in repository https://gitbox.apache.org/repos/asf/geode.git

commit de760d278d772acaaae7de64a8f7386fb5a4c125
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Feb 21 21:31:07 2018 -0800

    GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary
                gateway sender queue
---
 .../asyncqueue/internal/AsyncEventQueueStats.java  |   3 +
 .../internal/cache/AbstractBucketRegionQueue.java  |   8 +
 .../geode/internal/cache/AbstractRegionMap.java    |   4 +
 .../apache/geode/internal/cache/BucketAdvisor.java |   2 +
 .../geode/internal/cache/BucketRegionQueue.java    |   2 +
 .../internal/cache/PartitionedRegionDataStore.java |   6 +-
 .../internal/cache/wan/AbstractGatewaySender.java  |   6 +
 .../wan/AbstractGatewaySenderEventProcessor.java   |  17 +++
 .../internal/cache/wan/GatewaySenderStats.java     |  61 ++++++++
 .../wan/parallel/ParallelGatewaySenderQueue.java   |   2 +-
 .../wan/parallel/ParallelQueueRemovalMessage.java  |   3 +
 .../SerialAsyncEventQueueImplJUnitTest.java        |   3 +
 .../cache/wan/AsyncEventQueueTestBase.java         |  16 +-
 .../asyncqueue/AsyncEventQueueStatsDUnitTest.java  |  45 ++++--
 .../ParallelQueueRemovalMessageJUnitTest.java      |  12 ++
 .../bean/stats/AsyncEventQueueStatsJUnitTest.java  |   2 -
 .../geode/internal/cache/wan/WANTestBase.java      | 105 +++++++++++--
 .../parallel/ParallelWANConflationDUnitTest.java   |  56 +++++--
 .../wan/parallel/ParallelWANStatsDUnitTest.java    | 167 +++++++++++++++++++++
 .../serial/SerialGatewaySenderQueueDUnitTest.java  |  13 +-
 .../wan/serial/SerialWANConflationDUnitTest.java   |  73 ++++++++-
 .../wan/serial/SerialWANPropagationDUnitTest.java  |   1 +
 22 files changed, 567 insertions(+), 40 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..77f6eff 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -42,6 +42,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
+            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the secondary event queue.",
+                "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
                 "operations", false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -106,6 +108,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index d6822da..267ec36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -230,6 +230,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     this.gatewaySenderStats.decQueueSize(size);
   }
 
+  public void decSecondaryQueueSize(int size) {
+    this.gatewaySenderStats.decSecondaryQueueSize(size);
+  }
+
   public void decQueueSize() {
     this.gatewaySenderStats.decQueueSize();
   }
@@ -238,6 +242,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     this.gatewaySenderStats.incQueueSize(size);
   }
 
+  public void incSecondaryQueueSize(int size) {
+    this.gatewaySenderStats.incSecondaryQueueSize(size);
+  }
+
   public void incQueueSize() {
     this.gatewaySenderStats.incQueueSize();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 100264a..71294b0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1044,6 +1044,10 @@ public abstract class AbstractRegionMap
           } finally {
             if (done && result) {
               initialImagePutEntry(newRe);
+              if (owner instanceof BucketRegionQueue) {
+                BucketRegionQueue brq = (BucketRegionQueue) owner;
+                brq.addToEventQueue(key, done, event);
+              }
             }
             if (!done) {
               removeEntry(key, newRe, false);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index 074a60d..33442a9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -315,6 +315,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
           if (b instanceof BucketRegionQueue) {
             BucketRegionQueue brq = (BucketRegionQueue) b;
             brq.decQueueSize(brq.size());
+            brq.incSecondaryQueueSize(brq.size());
           }
         }
       }
@@ -1192,6 +1193,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
           if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
             BucketRegionQueue brq = (BucketRegionQueue) br;
             brq.incQueueSize(brq.size());
+            brq.decSecondaryQueueSize(brq.size());
           }
           if (br != null && br instanceof BucketRegion) {
             ((BucketRegion) br).afterAcquiringPrimaryState();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 4e5451e..0baa204 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -449,6 +449,8 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     }
     if (this.getBucketAdvisor().isPrimary()) {
       incQueueSize(1);
+    } else {
+      incSecondaryQueueSize(1);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index ef8eb99..d468ef4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2450,10 +2450,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
     return sizeOfLocalPrimaries;
   }
 
-  public int getSizeOfLocalBuckets(boolean includeSecondary) {
+  public int getSizeOfLocalBuckets() {
     int sizeOfLocal = 0;
-    Set<BucketRegion> primaryBuckets = getAllLocalBucketRegions();
-    for (BucketRegion br : primaryBuckets) {
+    Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions();
+    for (BucketRegion br : allLocalBuckets) {
       sizeOfLocal += br.size();
     }
     return sizeOfLocal;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..268bbb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1095,6 +1095,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     }
 
     statistics.setQueueSize(0);
+    statistics.setSecondaryQueueSize(0);
     statistics.setTempQueueSize(0);
   }
 
@@ -1251,6 +1252,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return localProcessor == null ? 0 : localProcessor.eventQueueSize();
   }
 
+  public int getEventSecondaryQueueSize() {
+    AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+    return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+  }
+
   public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
     this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..1a12abf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -270,6 +270,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
     return this.queue.size();
   }
 
+  public int eventSecondaryQueueSize() {
+    if (queue == null) {
+      return 0;
+    }
+
+    // if parallel, get both primary and secondary queues' size, then substract primary queue's size
+    if (this.queue instanceof ParallelGatewaySenderQueue) {
+      return ((ParallelGatewaySenderQueue) queue).localSize(true)
+          - ((ParallelGatewaySenderQueue) queue).localSize(false);
+    }
+    if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+      return ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
+          - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
+    }
+    return this.queue.size();
+  }
+
   /**
    * @return the sender
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index c7fd370..c2866d1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -46,6 +46,8 @@ public class GatewaySenderStats {
   protected static final String EVENT_QUEUE_TIME = "eventQueueTime";
   /** Name of the event queue size statistic */
   protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
+  /** Name of the event secondary queue size statistic */
+  protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
   /** Name of the event temporary queue size statistic */
   protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
   /** Name of the events distributed statistic */
@@ -102,6 +104,8 @@ public class GatewaySenderStats {
   protected static int eventQueueTimeId;
   /** Id of the event queue size statistic */
   protected static int eventQueueSizeId;
+  /** Id of the event in secondary queue size statistic */
+  protected static int eventSecondaryQueueSizeId;
   /** Id of the temp event queue size statistic */
   protected static int eventTmpQueueSizeId;
   /** Id of the events distributed statistic */
@@ -164,6 +168,8 @@ public class GatewaySenderStats {
             f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
                 "nanoseconds"),
             f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
+            f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of secondary event queue.",
+                "operations", false),
             f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
                 false),
             f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -232,6 +238,7 @@ public class GatewaySenderStats {
     eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
     eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
     eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+    eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
     eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
     eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
     eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -350,6 +357,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Returns the current value of the "eventSecondaryQueueSize" stat.
+   *
+   * @return the current value of the "eventSecondaryQueueSize" stat
+   */
+  public int getEventSecondaryQueueSize() {
+    return this.stats.getInt(eventSecondaryQueueSizeId);
+  }
+
+  /**
    * Returns the current value of the "tempQueueSize" stat.
    *
    * @return the current value of the "tempQueueSize" stat.
@@ -454,6 +470,15 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Sets the "eventSecondaryQueueSize" stat.
+   *
+   * @param size The size of the secondary queue
+   */
+  public void setSecondaryQueueSize(int size) {
+    this.stats.setInt(eventSecondaryQueueSizeId, size);
+  }
+
+  /**
    * Sets the "tempQueueSize" stat.
    *
    * @param size The size of the temp queue
@@ -471,6 +496,14 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Increments the "eventSecondaryQueueSize" stat by 1.
+   */
+  public void incSecondaryQueueSize() {
+    this.stats.incInt(eventSecondaryQueueSizeId, 1);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Increments the "tempQueueSize" stat by 1.
    */
   public void incTempQueueSize() {
@@ -487,6 +520,16 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Increments the "eventSecondaryQueueSize" stat by given delta.
+   *
+   * @param delta an integer by which secondary queue size to be increased
+   */
+  public void incSecondaryQueueSize(int delta) {
+    this.stats.incInt(eventSecondaryQueueSizeId, delta);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Increments the "tempQueueSize" stat by given delta.
    *
    * @param delta an integer by which temp queue size to be increased
@@ -503,6 +546,14 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Decrements the "eventSecondaryQueueSize" stat by 1.
+   */
+  public void decSecondaryQueueSize() {
+    this.stats.incInt(eventSecondaryQueueSizeId, -1);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Decrements the "tempQueueSize" stat by 1.
    */
   public void decTempQueueSize() {
@@ -519,6 +570,16 @@ public class GatewaySenderStats {
   }
 
   /**
+   * Decrements the "eventSecondaryQueueSize" stat by given delta.
+   *
+   * @param delta an integer by which secondary queue size to be increased
+   */
+  public void decSecondaryQueueSize(int delta) {
+    this.stats.incInt(eventSecondaryQueueSizeId, -delta);
+    assert this.stats.getInt(eventSecondaryQueueSizeId) >= 0;
+  }
+
+  /**
    * Decrements the "tempQueueSize" stat by given delta.
    *
    * @param delta an integer by which temp queue size to be increased
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 75ce63c..6181f39 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1410,7 +1410,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
     for (PartitionedRegion prQ : this.userRegionNameToshadowPRMap.values()) {
       if (prQ != null && prQ.getDataStore() != null) {
         if (includeSecondary) {
-          size += prQ.getDataStore().getSizeOfLocalBuckets(true);
+          size += prQ.getDataStore().getSizeOfLocalBuckets();
         } else {
           size += prQ.getDataStore().getSizeOfLocalPrimaryBuckets();
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index 39fedbf..df89e36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -183,6 +183,9 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
       brq.destroyKey(key);
+      if (!brq.getBucketAdvisor().isPrimary()) {
+        prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+      }
       if (isDebugEnabled) {
         logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
             brq.getId());
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index eb8ad01..4c5caa2 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -50,14 +50,17 @@ public class SerialAsyncEventQueueImplJUnitTest {
     attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
     SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, attrs);
     queue.getStatistics().incQueueSize(5);
+    queue.getStatistics().incSecondaryQueueSize(6);
     queue.getStatistics().incTempQueueSize(10);
 
     assertEquals(5, queue.getStatistics().getEventQueueSize());
+    assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(10, queue.getStatistics().getTempEventQueueSize());
 
     queue.stop();
 
     assertEquals(0, queue.getStatistics().getEventQueueSize());
+    assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
     assertEquals(0, queue.getStatistics().getTempEventQueueSize());
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index 7a956c8..5ea5914 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -707,10 +707,13 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
   }
 
   public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
-      final int eventsReceived, final int eventsQueued, final int eventsDistributed) {
+      int secondaryQueueSize, final int eventsReceived, final int eventsQueued,
+      final int eventsDistributed) {
     Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
     AsyncEventQueue queue = null;
+    boolean isParallel = false;
     for (AsyncEventQueue q : asyncQueues) {
+      isParallel = q.isParallel();
       if (q.getId().equals(queueId)) {
         queue = q;
         break;
@@ -721,6 +724,17 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
         .until(() -> assertEquals("Expected queue entries: " + queueSize + " but actual entries: "
             + statistics.getEventQueueSize(), queueSize, statistics.getEventQueueSize()));
     assertEquals(queueSize, statistics.getEventQueueSize());
+    if (isParallel) {
+      Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+        assertEquals(
+            "Expected events in the secondary queue is " + secondaryQueueSize + ", but actual is "
+                + statistics.getEventSecondaryQueueSize(),
+            secondaryQueueSize, statistics.getEventSecondaryQueueSize());
+      });
+    } else {
+      // for serial queue, evenvSecondaryQueueSize is not used
+      assertEquals(0, statistics.getEventSecondaryQueueSize());
+    }
     assertEquals(eventsReceived, statistics.getEventsReceived());
     assertEquals(eventsQueued, statistics.getEventsQueued());
     assert (statistics.getEventsDistributed() >= eventsDistributed);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
index 935a650..849b823 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
@@ -74,9 +74,10 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
                                                                                      // sender
     Wait.pause(2000);// give some time for system to become stable
 
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 1000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 1000, 1000));
     vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 10));
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 0, 0));
     vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 0));
   }
 
@@ -119,19 +120,43 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
     vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue(
         getTestMethodName() + "_RR", "ln1,ln2", isOffHeap()));
 
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+    vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+
     vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_RR", 1000));
 
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 1000, 0, 1000, 1000, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 1000, 0, 1000, 0, 0));
+
+    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+    vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+    vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+    vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+    vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln1", 1000));
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln2", 1000));
     Wait.pause(2000);// give some time for system to become stable
 
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 1000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 1000, 1000));
     vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 10));
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 1000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 1000, 1000));
     vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 10));
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 0, 0));
     vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 0));
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 0, 0));
     vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 0));
   }
 
@@ -229,11 +254,12 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1500));
 
     Wait.pause(2000);// give some time for system to become stable
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 1500, 1500));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 1500, 1500));
     vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 0));
 
 
-    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 0, 0));
+    vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 0, 0));
     vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 1500));
   }
 
@@ -301,7 +327,8 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
     vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000));
 
     Wait.pause(2000);// give some time for system to become stable
-    vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 2000, 2000, 1000));
+    vm1.invoke(
+        () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 2000, 2000, 1000));
     vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueConflatedStats("ln", 500));
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 5e0f704..d1ea59f 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -65,6 +65,8 @@ import org.apache.geode.internal.cache.eviction.EvictionController;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
 import org.apache.geode.test.fake.Fakes;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -81,6 +83,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
   private PartitionedRegion rootRegion;
   private BucketRegionQueue bucketRegionQueue;
   private BucketRegionQueueHelper bucketRegionQueueHelper;
+  private GatewaySenderStats stats;
 
   @Before
   public void setUpGemFire() {
@@ -116,6 +119,8 @@ public class ParallelQueueRemovalMessageJUnitTest {
     when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender);
     when(this.sender.getQueues()).thenReturn(null);
     when(this.sender.getDispatcherThreads()).thenReturn(1);
+    stats = new GatewaySenderStats(new DummyStatisticsFactory(), "ln");
+    when(this.sender.getStatistics()).thenReturn(stats);
   }
 
   private void createRootRegion() {
@@ -183,6 +188,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Validate initial BucketRegionQueue state
     assertFalse(this.bucketRegionQueue.isInitialized());
     assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+    stats.setSecondaryQueueSize(1);
 
     // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to
     // add a key)
@@ -190,6 +196,8 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
     // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
     assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+    // failed BatchRemovalMessage will not modify stats
+    assertEquals(1, stats.getEventSecondaryQueueSize());
   }
 
   @Test
@@ -201,6 +209,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
+    assertEquals(1, stats.getEventSecondaryQueueSize());
 
     // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
     // DESTROYED)
@@ -210,6 +219,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Clean up destroyed tokens and validate BucketRegionQueue
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
     assertEquals(0, this.bucketRegionQueue.size());
+    assertEquals(0, stats.getEventSecondaryQueueSize());
   }
 
   @Test
@@ -247,6 +257,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
     // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
     GatewaySenderEventImpl event = this.bucketRegionQueueHelper.addEvent(KEY);
     assertEquals(1, this.bucketRegionQueue.size());
+    assertEquals(1, stats.getEventSecondaryQueueSize());
 
     // Add a mock GatewaySenderEventImpl to the temp queue
     BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event);
@@ -259,6 +270,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
 
     // Validate temp queue is empty after processing ParallelQueueRemovalMessage
     assertEquals(0, tempQueue.size());
+    assertEquals(0, stats.getEventSecondaryQueueSize());
 
     // Clean up destroyed tokens
     this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index ea246bc..e7d38b4 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -52,8 +52,6 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
     sample();
 
     assertEquals(0, getEventQueueSize());
-
-
   }
 
   private int getEventQueueSize() {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index c29d66e..3333584 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -120,9 +120,11 @@ import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.AvailablePort;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.CacheConfig;
 import org.apache.geode.internal.cache.CacheServerImpl;
@@ -139,6 +141,8 @@ import org.apache.geode.internal.cache.execute.data.Order;
 import org.apache.geode.internal.cache.execute.data.OrderId;
 import org.apache.geode.internal.cache.execute.data.Shipment;
 import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
 import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
@@ -1175,12 +1179,57 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     return connectionInfo;
   }
 
+  public static void moveAllPrimaryBuckets(String senderId, final DistributedMember destination,
+      final String regionName) {
+
+    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+    final RegionQueue regionQueue;
+    regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+    if (sender.isParallel()) {
+      ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+          (ConcurrentParallelGatewaySenderQueue) regionQueue;
+      PartitionedRegion prQ =
+          parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0];
+
+      Set<Integer> primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds();
+      for (int bid : primaryBucketIds) {
+        movePrimary(destination, regionName, bid);
+      }
+
+      // double check after moved all primary buckets
+      primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds();
+      assertTrue(primaryBucketIds.isEmpty());
+    }
+  }
+
+  public static void movePrimary(final DistributedMember destination, final String regionName,
+      final int bucketId) {
+    PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
+
+    BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage
+        .send((InternalDistributedMember) destination, region, bucketId, true);
+    assertNotNull(response);
+    assertTrue(response.waitForResponse());
+  }
+
+  public static int getSecondaryQueueSizeInStats(String senderId) {
+    AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+    GatewaySenderStats statistics = sender.getStatistics();
+    return statistics.getEventSecondaryQueueSize();
+  }
+
   public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
     AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
     GatewaySenderStats statistics = sender.getStatistics();
     if (expectedQueueSize != -1) {
       final RegionQueue regionQueue;
       regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+      if (sender.isParallel()) {
+        ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+            (ConcurrentParallelGatewaySenderQueue) regionQueue;
+        PartitionedRegion pr =
+            parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0];
+      }
       Awaitility.await().atMost(120, TimeUnit.SECONDS)
           .until(() -> assertEquals("Expected queue entries: " + expectedQueueSize
               + " but actual entries: " + regionQueue.size(), expectedQueueSize,
@@ -1197,9 +1246,28 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     stats.add(statistics.getEventsNotQueuedConflated());
     stats.add(statistics.getEventsConflatedFromBatches());
     stats.add(statistics.getConflationIndexesMapSize());
+    stats.add(statistics.getEventSecondaryQueueSize());
     return stats;
   }
 
+  protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) {
+    int size = 0;
+    if (prQ != null) {
+      Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets();
+      List<Integer> thisProcessorBuckets = new ArrayList<Integer>();
+
+      for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) {
+        BucketRegion bucket = bucketEntry.getValue();
+        int bId = bucket.getId();
+        if ((isPrimary && bucket.getBucketAdvisor().isPrimary())
+            || (!isPrimary && !bucket.getBucketAdvisor().isPrimary())) {
+          size += bucket.size();
+        }
+      }
+    }
+    return size;
+  }
+
   public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
       final int eventsQueued, final int eventsDistributed) {
     GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -3102,6 +3170,19 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     return getQueueContentSize(senderId, false);
   }
 
+  public static Integer getSecondaryQueueContentSize(final String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+    return abstractSender.getEventSecondaryQueueSize();
+  }
+
   public static Integer getQueueContentSize(final String senderId, boolean includeSecondary) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -3113,9 +3194,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
     }
 
     if (!sender.isParallel()) {
-      if (includeSecondary) {
-        fail("Not implemented yet");
-      }
+      // if sender is serial, the queues will be all primary or all secondary at one member
       final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
       int size = 0;
       for (RegionQueue q : queues) {
@@ -3130,11 +3209,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
       } else if (regionQueue instanceof ParallelGatewaySenderQueue) {
         return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
       } else {
-        if (includeSecondary) {
-          fail("Not Implemented yet");
-        }
-        regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
-        return regionQueue.getRegion().size();
+        fail("Not implemented yet");
       }
     }
     fail("Not yet implemented?");
@@ -3180,14 +3255,26 @@ public class WANTestBase extends JUnit4DistributedTestCase {
           ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
       Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
           .getAllLocalPrimaryBucketRegions();
+      final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+
       for (final BucketRegion bucket : buckets) {
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+        Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
           assertEquals("Expected bucket entries for bucket: " + bucket.getId()
               + " is: 0 but actual entries: " + bucket.keySet().size() + " This bucket isPrimary: "
               + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(), 0,
               bucket.keySet().size());
         });
       } // for loop ends
+      assertEquals("Except events in all primary queues after drain is 0", 0,
+          abstractSender.getEventQueueSize());
+      Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> {
+        assertEquals(
+            "Expected events in all secondary queues are drained but actual is "
+                + abstractSender.getEventSecondaryQueueSize(),
+            0, abstractSender.getEventSecondaryQueueSize());
+      });
+      assertEquals("Except events in all secondary queues after drain is 0", 0,
+          abstractSender.getEventSecondaryQueueSize());
     } finally {
       exp.remove();
       exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 1613501..c9b968f 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -67,6 +67,11 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
 
     vm4.invoke(() -> checkQueueSize("ln", (keyValues.size() + updateKeyValues.size())));
 
+    vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
+
+    // Since no conflation, all updates are in queue
+    vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 2 * updateKeyValues.size()));
+
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
 
     resumeSenders();
@@ -92,7 +97,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
     vm6.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true));
     vm7.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true));
 
-    createSenderPRs();
+    createSenderPRs(1);
 
     startSenderInVMs("ln", vm4, vm5, vm6, vm7);
 
@@ -109,24 +114,35 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
       vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
     }
 
+    // sender did not turn on conflation, so queue size will be 100 (otherwise it will be 20)
+    vm4.invoke(() -> checkQueueSize("ln", 100));
     vm4.invoke(() -> enableConflation("ln"));
     vm5.invoke(() -> enableConflation("ln"));
     vm6.invoke(() -> enableConflation("ln"));
     vm7.invoke(() -> enableConflation("ln"));
 
-    resumeSenders();
-
     ArrayList<Integer> v4List =
-        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100));
     ArrayList<Integer> v5List =
-        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100));
     ArrayList<Integer> v6List =
-        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100));
     ArrayList<Integer> v7List =
-        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+    assertTrue("Event in secondary queue should be 100",
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100);
+
+    resumeSenders();
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
 
     assertTrue("No events conflated in batch",
         (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+    assertEquals("Event in secondary queue should be 0 after dispatched", 0,
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)));
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
 
@@ -161,12 +177,14 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
     vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
                                                                                        // aren't
                                                                                        // conflated
+    validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
 
     vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
 
-    vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
-                                                                                       // aren't
-                                                                                       // conflated
+    int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size();
+    vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation));
+
+    validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy);
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
 
@@ -174,6 +192,24 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
 
     keyValues.putAll(updateKeyValues);
     validateReceiverRegionSize(keyValues);
+
+    // after dispatch, both primary and secondary queues are empty
+    vm4.invoke(() -> checkQueueSize("ln", 0));
+    validateEventSecondaryQueueSize(0, redundancy);
+  }
+
+  private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) {
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+    assertTrue("Event in secondary queue should be 100",
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum
+            * redundancy);
   }
 
   @Test
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index a54a67d..07c0d86 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
@@ -53,6 +54,172 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
   }
 
   @Test
+  public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSendersWithConflation(lnPort);
+
+    createSenderPRs(1);
+
+    startPausedSenders();
+
+    createReceiverPR(vm2, 1);
+    putKeyValues();
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+    assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue
+                                                                                           // size
+    assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
+    assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
+                                                                                               // queued
+    assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+                                                                                    // distributed
+    assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+                                                                                               // queue
+                                                                                               // size
+
+    // stop vm7 to trigger rebalance and move some primary buckets
+    System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
+        + ":" + v6List.get(10) + ":" + v7List.get(10));
+    vm7.invoke(() -> WANTestBase.closeCache());
+    Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+      int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+      int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+      int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+      assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + v6secondarySize); // secondary
+      // queue
+      // size
+    });
+    System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":"
+        + v6List.get(10));
+
+    vm7.invoke(() -> WANTestBase.createCache(lnPort));
+    vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 10, isOffHeap()));
+    startSenderInVMs("ln", vm7);
+    vm7.invoke(() -> pauseSender("ln"));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+    // queue
+    // size
+    System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
+        + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+
+    vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+                                                                                           // distributed
+    assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+                                                                                        // queue
+                                                                                        // size
+  }
+
+  // TODO: add a test without redudency for primary switch
+  @Test
+  public void testQueueSizeInSecondaryWithPrimarySwitch() throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+
+    createSendersWithConflation(lnPort);
+
+    createSenderPRs(1);
+
+    startPausedSenders();
+
+    createReceiverPR(vm2, 1);
+
+    putKeyValues();
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+    assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue
+                                                                                           // size
+    assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
+    assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
+                                                                                               // queued
+    assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+                                                                                    // distributed
+    assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+                                                                                               // queue
+                                                                                               // size
+    // int vm7secondarySizeBeforeMovePrimary = v7List.get(10);
+    // System.out.println("Current secondary queue
+    // sizes:"+v4List.get(10)+":"+v5List.get(10)+":"+v6List.get(10)+":"+v7List.get(10));
+    // System.out.println("Now move a primary bucket");
+    // final DistributedMember vm6member = vm6.invoke(() -> WANTestBase.getMember());
+    // vm7.invoke(() -> WANTestBase.moveAllPrimaryBuckets("ln", vm6member, testName));
+    //
+    // v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    // v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    // v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    // v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+    // assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); //
+    // secondary
+    // // queue
+    // // size
+    // assertTrue(v7List.get(10) > vm7secondarySizeBeforeMovePrimary);
+
+    vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+    vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
+
+    vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+                                                                                           // distributed
+    assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+                                                                                        // queue
+                                                                                        // size
+  }
+
+  @Test
   public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception {
     Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
     Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index ab673e6..dfd6a1d 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
@@ -46,6 +47,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
 import org.apache.geode.internal.cache.wan.WANTestBase;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.VM;
@@ -103,7 +105,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
     vm4.invoke(() -> WANTestBase.pauseSender("ln"));
 
     vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
-    Wait.pause(5000);
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+    // secondary queue size stats in serial queue should be 0
+    assertEquals(0, v4List.get(10) + v5List.get(10));
+
     HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
     HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
     assertEquals(primarySenderUpdates, secondarySenderUpdates);
@@ -138,6 +146,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
     // removing all the keys.
     secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
     assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
+
+    vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
   }
 
   protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index 7297179..091befd 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -44,10 +44,10 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
 
     createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
 
-    vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
-    vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
-    vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
-    vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
+    vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+    vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+    vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+    vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
 
     vm4.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true));
     vm5.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true));
@@ -92,6 +92,71 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
 
     assertTrue("No events conflated in batch",
         (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+  }
+
+  @Test
+  public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception {
+    Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+
+    vm2.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap()));
+    vm3.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap()));
+
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+    vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+    vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+    vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+
+    vm4.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+    vm5.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+    vm6.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+    vm7.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> pauseSender("ln"));
+    vm5.invoke(() -> pauseSender("ln"));
+    vm6.invoke(() -> pauseSender("ln"));
+    vm7.invoke(() -> pauseSender("ln"));
+
+
+    final Map keyValues = new HashMap();
+
+    for (int i = 1; i <= 10; i++) {
+      for (int j = 1; j <= 10; j++) {
+        keyValues.put(j, i);
+      }
+      vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
+    }
+
+    ArrayList<Integer> v4List =
+        (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
+    assertTrue("After conflation during enqueue, there should be only 20 events",
+        v4List.get(0) == 20);
+
+    vm4.invoke(() -> resumeSender("ln"));
+    vm5.invoke(() -> resumeSender("ln"));
+    vm6.invoke(() -> resumeSender("ln"));
+    vm7.invoke(() -> resumeSender("ln"));
+
+    v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> v5List =
+        (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> v6List =
+        (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+    ArrayList<Integer> v7List =
+        (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+    assertTrue("No events in secondary queue stats since it's serial sender",
+        (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0);
+    assertTrue("Total queued events should be 100",
+        (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100);
 
     vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
 
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
index e84fd89..87c90e0 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
@@ -163,6 +163,7 @@ public class SerialWANPropagationDUnitTest extends WANTestBase {
     IgnoredException.addIgnoredException(BatchException70.class.getName());
     IgnoredException.addIgnoredException(ServerOperationException.class.getName());
     IgnoredException.addIgnoredException(IOException.class.getName());
+    IgnoredException.addIgnoredException(java.net.SocketException.class.getName());
 
     vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 10000));
 

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.