You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/03/05 07:50:29 UTC
[geode] branch feature/GEODE-4647 updated (c880713 -> 140c83e)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a change to branch feature/GEODE-4647
in repository https://gitbox.apache.org/repos/asf/geode.git.
discard c880713 GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary gateway sender queue
add fa11e2a GEODE-3928: createIndex on existing region creates lucene indexes for existing data
add 7255db4 GEODE-4989: Temporary fix for Lucene Stats
add 2a72fb2 GEODE-4717: Refactor computeRepository
add 073180c GEODE-4718: PdxReadSerialized is reset
add 33b6e1c GEODE-4719: Added comments to explain getRepository
add d7102e8 GEODE-4989: Reverted a fix for Lucene stats
add 25978ef GEODE-3928: Unused imports were removed.
add 37ff715 GEODE-4414: Support explicit nulls in geode's protobuf protocol (#1437)
add dc2b33d GEODE-4656: Decribe region shows entry-idle-time-custom-expiry and entry-time-to-live-custom-expiry (#1455)
add 059acf2 GEODE-4713: remove getInstance calls from from MultiVMRegionTestCase (#1481)
add 0b15a3b GEODE-4464: remove singleton calls from all tests in org.apache.geode.cache30 (#1484)
add 00fa527 GEODE-4493: remove InternalDistributedSystem.getAnyInstance call (#1485)
add e9ada48 GEODE-3523: Update locatorDiscoveryCallback after updating state
add 9a26976 GEODE-4570: Remove getInstance() singleton call from SecurityServiceF… (#1482)
add ce0d6a0 GEODE-4690: ClusterStartupRule causes System properties to 'disappear' (#1470)
add 804ecfa GEODE-4625: name collision check when create region through gfsh (#1483)
add ccfe15e GEODE-3876: Document gfsh command for custom expiry
add bf5b802 Merge branch 'feature/GEODE-3876' into develop
add a454e97 GEODE-4625: Add missing @Category to test. (#1494)
add 3b596ff GEODE-2667: Improved javaDoc for GatewayReceiver.destroy() (#1490)
add bbbc78f GEODE-4656: Decribe region shows entry-idle-time-custom-expiry and entry-time-to-live-custom-expiry
add 7d7361c GEODE-4691: Gfsh command Start Locator shows inconsistent behavior wi… (#1478)
add 8354b61 GEODE-3875: gfsh command to create jndi binding (#1475)
add 1f2193b GEODE-4402: Add logging where exceptions would be thrown or caught. (#1452)
add 1d1bc3b GEODE-4703: Prevent sending RemoveCacheServerProfileMessage to older members (#1468)
add df093b5 GEODE-4722: Refactor CliUtil (#1487)
add c16d1a3 GEODE-4327 Document new OQL operators (#1491)
add 94db6a7 GEODE-3813: Improved and fixed formatting in javaDoc for deprecated behavior (#1500)
add 3b6a9aa GEODE-4559: pass the Cache to a Declarable (#1422)
add 2dfc8ae GEODE-4407 (#1499): Refactoring incremental backup logic
add d752dce GEODE-4541: remove singleton calls (#1463)
add 147e6bc GEODE-4704: Modified ConflationKey to use shadowKey when comparing events
add de2cb20 GEODE-4638: Standardize hanlding of region-not-found errors. (#1492)
add 101fd19 GEODE-2673: overhaul PartitionedRegion dunit tests (#1486)
add 889da89 GEODE-4675: remove checkConnected calls while notifying listeners (#1495)
add 3dad0a3 GEODE-4738: move eventSeqNum and versionVector setting in constructors. (#1504)
add 3eaa095 GEODE-4737 Document JSON spec for gfsh command options (#1509)
add cd5edfd GEODE-3915 Document use of JSON spec for gfsh create and alter region (#1502)
add c00ce53 GEODE-2667: Reword javaDoc for destroy gateway receiver (#1501)
add be8140d GEODE-4736: Updating statistics in ProtobufOpsProcessor
add 40fb4bd GEODE-3875: gfsh command to create jndi binding (#1507)
add fa1f2ac GEODE-4406: Improve authorization granularity for protobuf (#1514)
add 42485b6 GEODE-4625: rework name collision check logic and add more tests (#1505)
add f4433fc GEODE-4404: Move BackupWriter creation (#1521)
add 7980b1c GEODE-4743: Removed five dead classes.
add 967b9db GEODE-4744: Allow java.util.Map#get in OQL when security is enabled
add 92278d4 GEODE-4753: fix permission for GetServer operation (#1524)
add 84d31b8 GEODE-4725: PdxReadSerialized reset
add 0e36a99 GEODE-4745: Stat check asserts modified
add 4ad6337 GEODE-4733: Remove unused and inline trivial ObjectUtils methods.
add 0afb75b GEODE-4362: view preparation throws uncaught RuntimeException
add b730a43 GEODE-3126: Adding a query message to the protobuf protocol
add 0a6eebd GEODE-3948 Improve CQ performance under flaky network conditions
add 4fd969e GEODE-4695 PluckStacks utility is confused by Attach Listener thread dump
add 8ce5ebf Refactoring request/response into a common class in protobuf driver
add c373f37 GEODE-3126: Adding a query command to the experimental driver
add efc413a GEODE-4746: Handle exceptions and return failures to protobuf clients
add 1c901de GEODE-4740: Removed the stat checks
add 291c4e5 GEODE-4101: User Guide - document the --redirect-output flag in GFSH commands
add 25a0d6e GEODE-4101: User Guide - incorporate review comments
add ce0251d GEODE-4734: cleanup tests for use as examples in Geode wiki (#1518)
add 7b5fe8f GEODE-4757: Prevent NPE when non-incremental backup conducted (#1530)
add c7096b9 GEODE-4756: Restore the correct help string for the --group option (#1531)
add e29f817 GEODE-4182: Add JdbcConnectorException (#1529)
add 851a29c [GEODE-4760] CI maintenance fixes.
add 0de52f3 Merge pull request #1534 from smgoller/GEODE-4760
add e9874b7 GEODE-4761 test is serializing the test class and putting it into the cache
add 8ce536e GEODE-3948 Document subscription timeout multiplier
add 7ec3956 GEODE-4517: Remove getAnyInstancce call from CliUtil.
add db63cf3 GEODE-4762: Remove dead code.
add 808d273 GEODE-4661: Implement KeySet protobuf message and handler (#1538)
add c35aba2 GEODE-4394 gfsh put command: change option --skip-if-exists to --if-not-exists (update user guide)
add 94ded7f GEODE-4672 Geode fails to start with JDK 9 if validate-serializable-objects is set
add b9d9b38 GEODE-4401: Add disconnect client message to server and driver. (#1525)
add 5380ed5 GEODE-3465: Enhance error message and add tests for when group is specified. (#1527)
add 81ae44d GEODE-4721: Fix a case that client proxy region returns empty set if … (#1532)
add 07564e8 rev the build number to 1.6.
add a636cb0 GEODE-4672 Geode fails to start with JDK 9 if validate-serializable-objects is set
new 140c83e GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary gateway sender queue
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (c880713)
\
N -- N -- N refs/heads/feature/GEODE-4647 (140c83e)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
ci/docker/initdocker | 2 +-
ci/pipelines/develop.yml | 2 +
.../cli/commands/StartLocatorAcceptanceTest.java | 9 +
.../source/subnavs/geode-subnav.erb | 3 +
.../geode/connectors/jdbc/JdbcAsyncWriter.java | 3 +-
.../connectors/jdbc/JdbcConnectorException.java | 22 +-
.../apache/geode/connectors/jdbc/JdbcLoader.java | 8 +-
.../apache/geode/connectors/jdbc/JdbcWriter.java | 24 +-
.../jdbc/internal/HikariJdbcDataSource.java | 4 +-
.../jdbc/internal/HikariJdbcDataSourceFactory.java | 2 +-
.../geode/connectors/jdbc/internal/SqlHandler.java | 106 +-
.../jdbc/internal/TableKeyColumnManager.java | 6 +-
.../jdbc/internal/cli/ExceptionHandler.java | 3 +-
.../geode/connectors/jdbc/JdbcAsyncWriterTest.java | 4 +-
.../geode/connectors/jdbc/JdbcLoaderTest.java | 2 +-
.../connectors/jdbc/JdbcWriterIntegrationTest.java | 1 -
.../geode/connectors/jdbc/JdbcWriterTest.java | 6 +-
.../connectors/jdbc/internal/SqlHandlerTest.java | 16 +-
.../jdbc/internal/TableKeyColumnManagerTest.java | 3 +-
.../org/apache/geode/GemFireCheckedException.java | 4 -
.../java/org/apache/geode/admin/BackupStatus.java | 20 +-
.../apache/geode/admin/GemFireMemberStatus.java | 20 +-
.../geode/admin/SystemMembershipListener.java | 10 -
.../admin/internal/AdminDistributedSystemImpl.java | 106 +-
.../internal/DistributedSystemConfigImpl.java | 7 -
.../internal/DistributedSystemHealthMonitor.java | 1 -
.../internal/EnabledManagedEntityController.java | 21 +-
.../geode/admin/internal/GemFireHealthImpl.java | 21 +-
.../geode/admin/internal/InetAddressUtil.java | 1 -
.../admin/internal/ManagedEntityConfigXml.java | 13 +-
.../internal/ManagedEntityConfigXmlParser.java | 21 +-
.../admin/internal/MemberHealthEvaluator.java | 1 -
.../admin/internal/StatisticResourceImpl.java | 2 +-
.../admin/internal/SystemMemberCacheImpl.java | 31 +-
.../geode/admin/internal/SystemMemberImpl.java | 27 +-
.../admin/internal/SystemMembershipEventImpl.java | 15 +-
.../internal/AdminDistributedSystemJmxImpl.java | 129 +-
.../geode/admin/jmx/internal/AgentConfigImpl.java | 53 +-
.../apache/geode/admin/jmx/internal/AgentImpl.java | 30 -
.../admin/jmx/internal/CacheServerJmxImpl.java | 35 -
.../admin/jmx/internal/GemFireHealthJmxImpl.java | 3 +-
.../admin/jmx/internal/GenerateMBeanHTML.java | 18 -
.../apache/geode/admin/jmx/internal/MBeanUtil.java | 8 -
.../jmx/internal/MemberInfoWithStatsMBean.java | 59 +-
.../admin/jmx/internal/StatisticAttributeInfo.java | 1 -
.../geode/admin/jmx/internal/SystemMemberJmx.java | 1 -
.../admin/jmx/internal/SystemMemberJmxImpl.java | 36 -
.../org/apache/geode/cache/AttributesFactory.java | 35 +-
.../java/org/apache/geode/cache/Declarable.java | 19 +
.../org/apache/geode/cache/DiskStoreFactory.java | 15 +-
.../apache/geode/cache/DynamicRegionFactory.java | 4 +-
.../apache/geode/cache/PartitionAttributes.java | 14 -
.../geode/cache/PartitionAttributesFactory.java | 68 +-
.../main/java/org/apache/geode/cache/Region.java | 63 +-
.../java/org/apache/geode/cache/RegionFactory.java | 4 +-
.../geode/cache/RegionReinitializedException.java | 1 -
.../java/org/apache/geode/cache/RegionService.java | 19 -
.../org/apache/geode/cache/RegionShortcut.java | 26 +-
.../java/org/apache/geode/cache/RequiredRoles.java | 5 +-
.../geode/cache/client/internal/AbstractOp.java | 3 -
.../cache/client/internal/AuthenticateUserOp.java | 3 +-
.../client/internal/AutoConnectionSourceImpl.java | 28 +-
.../client/internal/ClientMetadataService.java | 9 -
.../client/internal/ClientRegionFactoryImpl.java | 1 -
.../cache/client/internal/CloseConnectionOp.java | 2 +-
.../client/internal/ConnectionFactoryImpl.java | 1 -
.../cache/client/internal/ConnectionImpl.java | 4 -
.../cache/client/internal/ConnectionStats.java | 57 -
.../geode/cache/client/internal/DestroyOp.java | 3 -
.../cache/client/internal/EndpointManagerImpl.java | 9 -
.../cache/client/internal/ExecuteFunctionOp.java | 2 -
.../internal/ExecuteRegionFunctionNoAckOp.java | 6 -
.../client/internal/ExecuteRegionFunctionOp.java | 6 -
.../internal/ExecuteRegionFunctionSingleHopOp.java | 10 -
.../cache/client/internal/GetEventValueOp.java | 1 -
.../cache/client/internal/GetPDXIdForEnumOp.java | 1 -
.../cache/client/internal/GetPDXIdForTypeOp.java | 1 -
.../geode/cache/client/internal/KeySetOp.java | 2 -
.../cache/client/internal/LiveServerPinger.java | 4 -
.../cache/client/internal/OpExecutorImpl.java | 35 +-
.../cache/client/internal/ProxyCacheCloseOp.java | 1 -
.../geode/cache/client/internal/PutAllOp.java | 1 -
.../apache/geode/cache/client/internal/PutOp.java | 4 -
.../cache/client/internal/QueueManagerImpl.java | 11 -
.../cache/client/internal/QueueStateImpl.java | 35 +-
.../cache/client/internal/RegisterInterestOp.java | 7 -
.../client/internal/RegisterInterestTracker.java | 9 -
.../geode/cache/client/internal/RemoveAllOp.java | 1 -
.../client/internal/ServerRegionDataAccess.java | 4 -
.../internal/locator/LocatorStatusResponse.java | 30 +-
.../internal/pooling/ConnectionManagerImpl.java | 102 +-
.../client/internal/pooling/PooledConnection.java | 5 -
.../geode/cache/operations/InterestType.java | 30 -
.../cache/operations/KeyValueOperationContext.java | 4 -
.../RestrictedMethodInvocationAuthorizer.java | 11 +-
.../cache/query/internal/parse/ASTParameter.java | 3 +-
.../apache/geode/cache/wan/GatewayReceiver.java | 44 +-
.../apache/geode/distributed/AbstractLauncher.java | 9 +-
.../apache/geode/distributed/LocatorLauncher.java | 10 +-
.../apache/geode/distributed/ServerLauncher.java | 10 +-
.../distributed/internal/DistributionAdvisor.java | 19 +-
.../internal/DistributionConfigImpl.java | 18 +-
.../internal/InternalDistributedSystem.java | 6 -
.../distributed/internal/InternalLocator.java | 5 +-
.../internal/deadlock/DLockDependencyMonitor.java | 1 -
.../distributed/internal/direct/DirectChannel.java | 1 -
.../membership/InternalDistributedMember.java | 42 +-
.../internal/membership/gms/GMSMember.java | 2 +-
.../membership/gms/interfaces/Manager.java | 8 -
.../membership/gms/locator/GMSLocator.java | 4 +-
.../membership/gms/membership/GMSJoinLeave.java | 30 +-
.../membership/gms/messenger/GMSEncrypt.java | 81 +-
.../membership/gms/messenger/GMSQuorumChecker.java | 4 -
.../membership/gms/messenger/JGroupsMessenger.java | 22 -
.../membership/gms/messenger/StatRecorder.java | 8 -
.../membership/gms/messenger/Transport.java | 8 -
.../org/apache/geode/internal/DeployedJar.java | 12 +-
.../geode/internal/GfeConsoleReaderFactory.java | 7 +-
.../apache/geode/internal/InputStreamFilter.java | 3 +
.../geode/internal/InternalDataSerializer.java | 66 +-
.../internal/ObjectInputStreamFilterWrapper.java | 195 +-
.../org/apache/geode/internal/SystemAdmin.java | 5 +-
.../geode/internal/admin/ClientStatsManager.java | 5 +-
.../geode/internal/cache/AbstractRegionMap.java | 20 +-
.../internal/cache/AbstractUpdateOperation.java | 2 +-
.../apache/geode/internal/cache/BucketAdvisor.java | 2 +
.../apache/geode/internal/cache/BucketRegion.java | 49 +-
.../geode/internal/cache/CacheServerLauncher.java | 2 +-
.../internal/cache/CreateRegionProcessor.java | 3 +-
.../geode/internal/cache/DestroyOperation.java | 2 +-
.../apache/geode/internal/cache/DiskStoreImpl.java | 1 +
.../apache/geode/internal/cache/DistTXState.java | 8 +-
.../internal/cache/DistTXStateOnCoordinator.java | 8 +-
.../cache/DistTXStateProxyImplOnCoordinator.java | 2 +-
.../internal/cache/DistributedPutAllOperation.java | 26 +-
.../geode/internal/cache/DistributedRegion.java | 7 +-
.../geode/internal/cache/EntryEventImpl.java | 178 +-
.../geode/internal/cache/EventStateHelper.java | 9 +-
.../internal/cache/FutureCachedDeserializable.java | 35 +
.../geode/internal/cache/GemFireCacheImpl.java | 15 +-
.../internal/cache/InitialImageOperation.java | 4 +-
.../geode/internal/cache/InvalidateOperation.java | 2 +-
.../apache/geode/internal/cache/LocalRegion.java | 33 +-
.../geode/internal/cache/LocalRegionDataView.java | 8 +-
.../org/apache/geode/internal/cache/Oplog.java | 49 +-
.../geode/internal/cache/PRQueryProcessor.java | 2 +-
.../geode/internal/cache/PartitionedRegion.java | 70 +-
.../internal/cache/PartitionedRegionDataStore.java | 29 +-
.../internal/cache/PartitionedRegionDataView.java | 12 +-
.../internal/cache/PartitionedRegionHelper.java | 9 +-
.../cache/SearchLoadAndWriteProcessor.java | 2 +-
.../geode/internal/cache/TXStateProxyImpl.java | 29 +-
.../cache/UpdateEntryVersionOperation.java | 2 +-
...nation.java => AbstractBackupWriterConfig.java} | 41 +-
.../cache/backup/BackupDataStoreHelper.java | 6 +-
.../internal/cache/backup/BackupDefinition.java | 20 +-
.../internal/cache/backup/BackupFileCopier.java | 4 +-
.../backup/BackupFilter.java} | 11 +-
.../geode/internal/cache/backup/BackupService.java | 29 +-
.../geode/internal/cache/backup/BackupTask.java | 247 +-
.../geode/internal/cache/backup/BackupUtil.java | 38 +-
.../{BackupDestination.java => BackupWriter.java} | 4 +-
.../internal/cache/backup/BackupWriterFactory.java | 60 +
.../cache/{ => backup}/DiskStoreBackup.java | 13 +-
.../cache/backup/FileSystemBackupDestination.java | 145 -
.../cache/backup/FileSystemBackupWriter.java | 237 ++
...tion.java => FileSystemBackupWriterConfig.java} | 30 +-
.../FileSystemIncrementalBackupLocation.java | 132 +
...stination.java => IncrementalBackupFilter.java} | 25 +-
.../backup/IncrementalBackupLocation.java} | 12 +-
.../geode/internal/cache/backup/PrepareBackup.java | 12 +-
.../cache/backup/PrepareBackupFactory.java | 20 +-
.../cache/backup/PrepareBackupOperation.java | 14 +-
.../cache/backup/PrepareBackupRequest.java | 19 +-
.../geode/internal/cache/backup/RestoreScript.java | 4 +
.../cache/entries/AbstractRegionEntry.java | 2 +-
.../geode/internal/cache/map/RegionMapDestroy.java | 4 +-
.../cache/partitioned/PutAllPRMessage.java | 16 +-
.../geode/internal/cache/tx/DistTxEntryEvent.java | 11 +-
.../internal/cache/tx/DistributedTXRegionStub.java | 4 +-
.../internal/cache/tx/PartitionedTXRegionStub.java | 6 +-
.../wan/AbstractGatewaySenderEventProcessor.java | 15 +-
.../internal/cache/wan/GatewaySenderEventImpl.java | 2 +-
.../wan/parallel/ParallelGatewaySenderQueue.java | 1 -
.../wan/parallel/ParallelQueueRemovalMessage.java | 3 +
.../internal/cache/xmlcache/CacheCreation.java | 68 +-
.../geode/internal/cache/xmlcache/CacheXml.java | 1 +
.../internal/cache/xmlcache/CacheXmlGenerator.java | 22 +-
.../internal/cache/xmlcache/CacheXmlParser.java | 19 +-
.../cache/xmlcache/ClientCacheCreation.java | 9 +-
.../cache/xmlcache/FunctionServiceCreation.java | 23 +-
.../internal/datasource/AbstractPoolCache.java | 11 +-
.../datasource/ConfiguredDataSourceProperties.java | 11 +-
.../internal/datasource/ManagedPoolCacheImpl.java | 2 +-
.../internal/datasource/TranxPoolCacheImpl.java | 2 +-
.../apache/geode/internal/jndi/JNDIInvoker.java | 9 +-
.../apache/geode/internal/lang/ObjectUtils.java | 124 -
.../apache/geode/internal/lang/StringUtils.java | 13 +
.../internal/security/SecurityServiceFactory.java | 10 -
.../geode/internal/util/DebuggerSupport.java | 62 -
.../org/apache/geode/internal/util/IOUtils.java | 4 +-
.../apache/geode/internal/util/PluckStacks.java | 99 +-
.../geode/internal/util/SingletonCallable.java | 82 -
.../apache/geode/internal/util/SingletonValue.java | 318 --
.../geode/internal/util/StackTraceCollector.java | 170 -
.../geode/management/AsyncEventQueueMXBean.java | 6 -
.../org/apache/geode/management/BackupStatus.java | 21 +-
.../geode/management/DistributedRegionMXBean.java | 1 +
.../geode/management/internal/JettyHelper.java | 30 +-
.../geode/management/internal/ManagementAgent.java | 19 +-
.../geode/management/internal/RestAgent.java | 11 +-
.../internal/beans/AsyncEventQueueMBean.java | 5 -
.../internal/beans/AsyncEventQueueMBeanBridge.java | 4 -
.../internal/beans/CacheServerBridge.java | 16 +-
.../internal/beans/DistributedRegionBridge.java | 2 +-
.../internal/beans/DistributedSystemBridge.java | 81 +-
.../internal/beans/DistributedSystemMBean.java | 3 +-
.../internal/beans/GatewaySenderMBeanBridge.java | 4 -
.../internal/beans/MemberMBeanBridge.java | 65 +-
.../beans/stats/RegionClusterStatsMonitor.java | 2 +
.../management/internal/beans/stats/StatsKey.java | 4 -
.../geode/management/internal/cli/CliUtil.java | 677 ++--
.../management/internal/cli/CommandManager.java | 12 +-
.../internal/cli/CommandResponseBuilder.java | 4 +-
.../geode/management/internal/cli/Launcher.java | 10 +-
.../geode/management/internal/cli/LogWrapper.java | 7 +-
.../cli/commands/AlterRuntimeConfigCommand.java | 2 +-
.../cli/commands/BackupDiskStoreCommand.java | 5 +-
.../cli/commands/ChangeLogLevelCommand.java | 3 +-
.../cli/commands/CloseDurableCQsCommand.java | 2 +-
.../cli/commands/CloseDurableClientCommand.java | 2 +-
.../cli/commands/CommandAvailabilityIndicator.java | 3 +-
.../cli/commands/CompactDiskStoreCommand.java | 7 +-
.../commands/CompactOfflineDiskStoreCommand.java | 5 +-
.../internal/cli/commands/ConfigurePDXCommand.java | 2 +-
.../internal/cli/commands/ConnectCommand.java | 8 +-
.../cli/commands/CountDurableCQEventsCommand.java | 2 +-
.../cli/commands/CreateJndiBindingCommand.java | 259 ++
.../internal/cli/commands/CreateRegionCommand.java | 79 +-
.../internal/cli/commands/DeployCommand.java | 2 +-
.../cli/commands/DescribeClientCommand.java | 14 +-
.../cli/commands/DescribeMemberCommand.java | 5 +-
.../cli/commands/DescribeRegionCommand.java | 5 +-
.../internal/cli/commands/DestroyIndexCommand.java | 2 +-
.../internal/cli/commands/DisconnectCommand.java | 5 +-
.../internal/cli/commands/ExportConfigCommand.java | 2 +-
.../commands/ExportOfflineDiskStoreCommand.java | 3 +-
.../cli/commands/ExportStackTraceCommand.java | 6 +-
.../internal/cli/commands/GCCommand.java | 11 +-
.../internal/cli/commands/GfshCommand.java | 6 +-
.../cli/commands/ListAsyncEventQueuesCommand.java | 2 +-
.../internal/cli/commands/ListClientCommand.java | 6 +-
.../internal/cli/commands/ListDeployedCommand.java | 2 +-
.../cli/commands/ListDurableClientCQsCommand.java | 2 +-
.../internal/cli/commands/ListFunctionCommand.java | 2 +-
.../internal/cli/commands/ListMembersCommand.java | 2 +-
.../internal/cli/commands/ListRegionCommand.java | 4 +-
.../internal/cli/commands/NetstatCommand.java | 4 +-
.../internal/cli/commands/RebalanceCommand.java | 33 +-
.../cli/commands/ResumeGatewaySenderCommand.java | 2 +-
.../internal/cli/commands/ShCommand.java | 3 +-
.../internal/cli/commands/SleepCommand.java | 4 +-
.../cli/commands/StartGatewayReceiverCommand.java | 2 +-
.../internal/cli/commands/StartLocatorCommand.java | 18 +-
.../cli/commands/StatusGatewayReceiverCommand.java | 2 +-
.../cli/commands/StatusGatewaySenderCommand.java | 2 +-
.../cli/commands/StopGatewayReceiverCommand.java | 2 +-
.../cli/commands/StopGatewaySenderCommand.java | 2 +-
.../internal/cli/commands/UndeployCommand.java | 2 +-
.../commands/UpgradeOfflineDiskStoreCommand.java | 8 +-
.../cli/converters/ConfigPropertyConverter.java | 58 +
.../management/internal/cli/domain/ClassName.java | 7 +-
.../internal/cli/domain/RegionAttributesInfo.java | 34 +-
.../cli/exceptions/EntityExistsException.java | 25 +
.../cli/functions/AlterRuntimeConfigFunction.java | 4 +-
.../functions/CreateAsyncEventQueueFunction.java | 3 +-
.../cli/functions/CreateJndiBindingFunction.java | 38 +
.../cli/functions/GarbageCollectionFunction.java | 5 +-
.../functions/GetMemberInformationFunction.java | 10 +-
.../cli/functions/JndiBindingConfiguration.java | 217 +
.../cli/functions/RegionAlterFunction.java | 13 +-
.../cli/functions/RegionCreateFunction.java | 16 +-
.../management/internal/cli/i18n/CliStrings.java | 11 +-
.../management/internal/cli/json/GfJsonArray.java | 23 +-
.../internal/cli/remote/MemberCommandService.java | 2 +-
.../cli/remote/OnlineCommandProcessor.java | 18 +-
.../internal/cli/result/CommandResult.java | 37 +-
.../geode/management/internal/cli/shell/Gfsh.java | 21 +-
.../internal/cli/shell/GfshExecutionStrategy.java | 4 +-
.../internal/cli/shell/JmxOperationInvoker.java | 20 +-
.../internal/cli/util/DiskStoreCompacter.java | 13 +-
.../internal/cli/util/DiskStoreUpgrader.java | 13 +-
.../management/internal/cli/util/MergeLogs.java | 46 +-
.../internal/cli/util/RegionAttributesNames.java | 5 +-
.../configuration/domain/Configuration.java | 13 +-
.../web/controllers/ShellCommandsController.java | 3 +-
.../support/LoginHandlerInterceptor.java | 28 +-
.../SerializableObjectHttpMessageConverter.java | 4 +-
.../geode/pdx/ReflectionBasedAutoSerializer.java | 25 +-
.../geode/redis/internal/RegionProvider.java | 8 +-
.../geode.apache.org/schema/cache/cache-1.0.xsd | 1 +
.../sanctioned-geode-core-serializables.txt | 7 +-
.../test/java/org/apache/geode/DeltaTestImpl.java | 16 +-
.../CreateAndLocalDestroyInTXRegressionTest.java | 154 +
.../PRCacheListenerDistributedTest.java} | 18 +-
...tenerWithInterestPolicyAllDistributedTest.java} | 24 +-
.../PRCreationTotalNumBucketsDistributedTest.java | 137 +
.../cache/RegionExpirationIntegrationTest.java | 2 +-
.../org/apache/geode/cache/RegionShortcutTest.java | 41 +
.../ReplicateCacheListenerDistributedTest.java} | 87 +-
.../SerialAsyncEventQueueImplJUnitTest.java | 4 +-
.../org/apache/geode/cache/query/BugJUnitTest.java | 7 -
.../geode/cache/query/QueryServiceJUnitTest.java | 28 -
.../geode/cache/query/TypedIteratorJUnitTest.java | 3 -
.../functional/IndexPrimaryKeyUsageJUnitTest.java | 1 -
.../IndexUsageInNestedQueryJUnitTest.java | 1 -
.../cache/query/functional/MiscJUnitTest.java | 14 -
.../query/functional/NestedQueryJUnitTest.java | 2 -
.../RestrictedMethodInvocationAuthorizerTest.java | 10 +-
.../internal/index/IndexMaintainceJUnitTest.java | 32 +-
.../partitioned/PRBasicIndexCreationDUnitTest.java | 947 +++--
.../PRBasicIndexCreationDeadlockDUnitTest.java | 240 --
.../PRBasicMultiIndexCreationDUnitTest.java | 370 +-
.../query/partitioned/PRBasicQueryDUnitTest.java | 466 +--
.../partitioned/PRBasicRemoveIndexDUnitTest.java | 132 +-
.../partitioned/PRColocatedEquiJoinDUnitTest.java | 1644 +++-----
.../query/partitioned/PRInvalidQueryDUnitTest.java | 137 +-
.../partitioned/PRQueryCacheCloseDUnitTest.java | 319 --
.../cache/query/partitioned/PRQueryDUnitTest.java | 1266 ++----
.../partitioned/PRQueryPortfolioDUnitTest.java | 467 +++
.../partitioned/PRQueryRegionCloseDUnitTest.java | 211 +-
.../PRQueryRegionDestroyedDUnitTest.java | 208 +-
.../PRQueryRemoteNodeExceptionDUnitTest.java | 792 ++--
.../geode/cache30/CacheListenerTestCase.java | 3 -
.../geode/cache30/CacheSerializableRunnable.java | 3 -
.../apache/geode/cache30/CacheXml66DUnitTest.java | 6 +-
.../geode/cache30/CachedAllEventsDUnitTest.java | 17 -
.../apache/geode/cache30/CallbackArgDUnitTest.java | 20 -
.../geode/cache30/ClientMembershipDUnitTest.java | 51 +-
.../geode/cache30/ClientServerCCEDUnitTest.java | 4 +-
.../apache/geode/cache30/ClientServerTestCase.java | 22 -
.../CreateAndLocalDestroyInTXRegressionTest.java | 167 -
.../apache/geode/cache30/DiskRegionTestImpl.java | 14 -
.../DistributedNoAckRegionCCEDUnitTest.java | 14 -
.../EntriesDoNotExpireDuringGIIRegressionTest.java | 210 -
.../geode/cache30/MultiVMRegionTestCase.java | 406 +-
.../org/apache/geode/cache30/ProxyDUnitTest.java | 11 +-
.../apache/geode/cache30/ReconnectDUnitTest.java | 31 +-
.../cache30/RegionMembershipListenerDUnitTest.java | 14 +-
.../geode/cache30/RegionReliabilityTestCase.java | 7 -
.../org/apache/geode/cache30/RegionTestCase.java | 206 -
.../org/apache/geode/cache30/TXOrderDUnitTest.java | 18 -
.../AnalyzeSerializablesJUnitTest.java | 9 +-
.../apache/geode/distributed/LocatorDUnitTest.java | 5 -
.../gms/membership/GMSJoinLeaveJUnitTest.java | 67 +-
.../gms/membership/GMSJoinLeaveTestHelper.java | 15 +-
.../gms/messenger/GMSEncryptJUnitTest.java | 1 -
...alDataSerializerSerializationWhitelistTest.java | 9 +-
.../internal/cache/BucketRegionJUnitTest.java | 1 +
.../internal/cache/DeltaPropagationDUnitTest.java | 16 +-
.../EnforceUniqueHostForLonerIntegrationTest.java} | 53 +-
.../EntriesDoNotExpireDuringGiiRegressionTest.java | 182 +
.../geode/internal/cache/EntryEventImplTest.java | 2 +-
.../geode/internal/cache/OffHeapTestUtil.java | 11 +-
...a => PREntryIdleExpirationDistributedTest.java} | 3 +-
...onedRegionAPIConserveSocketsFalseDUnitTest.java | 9 +-
.../cache/PartitionedRegionAPIDUnitTest.java | 1811 +++------
.../PartitionedRegionAsSubRegionDUnitTest.java | 314 --
.../PartitionedRegionAsSubregionDUnitTest.java | 234 ++
...dRegionBucketCreationDistributionDUnitTest.java | 1851 +++------
.../PartitionedRegionCacheCloseDUnitTest.java | 276 +-
.../PartitionedRegionCacheXMLExampleDUnitTest.java | 176 +-
... => PartitionedRegionCloseDistributedTest.java} | 21 +-
.../cache/PartitionedRegionCreationDUnitTest.java | 1025 ++---
.../cache/PartitionedRegionDUnitTestCase.java | 507 ---
.../cache/PartitionedRegionDestroyDUnitTest.java | 361 +-
.../cache/PartitionedRegionGetSomeKeys.java | 80 +
.../cache/PartitionedRegionHADUnitTest.java | 654 ++-
...itionedRegionHAFailureAndRecoveryDUnitTest.java | 689 ++--
.../PartitionedRegionLocalMaxMemoryDUnitTest.java | 274 +-
...tionedRegionLocalMaxMemoryOffHeapDUnitTest.java | 45 +-
.../cache/PartitionedRegionMultipleDUnitTest.java | 576 +--
.../cache/PartitionedRegionPRIDDUnitTest.java | 287 +-
...itionedRegionSingleNodeOperationsJUnitTest.java | 2 +-
.../cache/PartitionedRegionSizeDUnitTest.java | 625 +--
.../cache/PartitionedRegionStatsDUnitTest.java | 823 ++--
.../cache/PartitionedRegionTestUtilsDUnitTest.java | 817 ++--
.../PartitionedRegionWithSameNameDUnitTest.java | 920 +----
...plicateEntryIdleExpirationDistributedTest.java} | 2 +-
.../internal/cache/TestObjectWithIdentifier.java | 85 +
.../backup/AbstractBackupWriterConfigTest.java | 86 +
.../cache/backup/BackupDefinitionTest.java | 21 +-
.../cache/backup/BackupDistributedTest.java | 14 +-
.../backup/BackupFileCopierIntegrationTest.java | 5 +-
.../cache/backup/BackupIntegrationTest.java | 23 +-
.../backup/BackupPrepareAndFinishMsgDUnitTest.java | 7 +-
.../internal/cache/backup/BackupServiceTest.java | 4 +-
.../cache/backup/BackupWriterFactoryTest.java | 63 +
.../backup/FileSystemBackupWriterConfigTest.java | 74 +
...onTest.java => FileSystemBackupWriterTest.java} | 69 +-
.../FileSystemIncrementalBackupLocationTest.java | 189 +
.../backup/IncrementalBackupDistributedTest.java | 34 +-
.../cache/backup/PrepareBackupFactoryTest.java | 10 +-
.../cache/backup/PrepareBackupOperationTest.java | 14 +-
.../cache/backup/PrepareBackupRequestTest.java | 17 +-
.../cache/control/TestMemoryThresholdListener.java | 7 +-
.../internal/cache/execute/BooleanFunction.java} | 33 +-
.../ClientFunctionTimeoutRegressionTest.java | 201 +-
.../ExecuteFunctionInstanceRegressionTest.java | 188 +
...ExceptionsIncludeLocalMemberRegressionTest.java | 136 +
.../FunctionExecutionOnLonerRegressionTest.java | 142 +
.../FunctionExecution_ExceptionDUnitTest.java | 799 ++--
.../cache/execute/MonthBasedPartitionResolver.java | 73 +
.../execute/PRCustomPartitioningDUnitTest.java | 598 +--
.../execute/PRFunctionExecutionDUnitTest.java | 4181 ++++++++------------
...FunctionExecutionWithResultSenderDUnitTest.java | 904 ++---
.../cache/execute/PRPerformanceTestDUnitTest.java | 417 --
.../cache/execute/PRSetOperationJTADUnitTest.java | 180 +
.../internal/cache/execute/SerializableMonth.java | 44 +-
.../geode/internal/cache/execute/TimeKeeper.java | 30 +-
.../internal/cache/functions/TestFunction.java | 2 +-
.../BucketRebalanceStatRegressionTest.java | 178 +-
.../PersistentPartitionedRegionTestBase.java | 4 +-
...va => ClientProxyWithDeltaDistributedTest.java} | 26 +-
.../tier/sockets/ClientServerMiscDUnitTest.java | 5 -
...ClientWithInterestFailoverDistributedTest.java} | 9 +-
...sterInterestServerMetaDataDistributedTest.java} | 4 +-
.../cache/tx/SetOperationJTADistributedTest.java | 290 ++
.../RegionVersionVectorIntegrationTest.java | 172 +
.../cache/wan/AsyncEventQueueTestBase.java | 2 +-
...rallelGatewaySenderEventProcessorJUnitTest.java | 141 +
.../wan/parallel/ParallelGatewaySenderHelper.java | 79 +
.../ParallelQueueRemovalMessageJUnitTest.java | 46 +-
.../cache/xmlcache/CacheCreationJUnitTest.java | 98 +
.../internal/jta/ClientServerJTADUnitTest.java | 290 +-
.../internal/jta/SetOperationJTAJUnitTest.java | 154 +
.../geode/internal/lang/ObjectUtilsJUnitTest.java | 88 -
.../geode/internal/lang/StringUtilsJUnitTest.java | 11 +
...chiveWithMissingResourceTypeRegressionTest.java | 22 +-
.../geode/internal/util/PluckStacksDUnitTest.java | 131 +
.../geode/internal/util/PluckStacksUnitTest.java | 65 +
.../bean/stats/AsyncEventQueueStatsJUnitTest.java | 17 -
.../bean/stats/GatewayMBeanBridgeJUnitTest.java | 6 -
...ServerBridgeClientMembershipRegressionTest.java | 24 +-
.../beans/DistributedSystemBridgeJUnitTest.java | 2 +-
.../management/internal/cli/CliUtilDUnitTest.java | 35 +-
.../geode/management/internal/cli/CliUtilTest.java | 11 -
.../CreateAsyncEventQueueCommandDUnitTest.java | 3 +-
.../CreateDefinedIndexesCommandDUnitTest.java | 5 +-
.../CreateJndiBindingCommandDUnitTest.java | 106 +
.../cli/commands/CreateJndiBindingCommandTest.java | 412 ++
.../cli/commands/CreateRegionCommandDUnitTest.java | 174 +-
.../CreateRegionCommandIntegrationTest.java | 4 +-
.../cli/commands/CreateRegionCommandTest.java | 21 +
.../commands/CreateRegionSecurityDUnitTest.java | 86 +
.../cli/commands/DescribeRegionDUnitTest.java | 28 +
.../commands/DestroyIndexCommandsDUnitTest.java | 10 +-
.../commands/DestroyRegionCommandDUnitTest.java | 19 +-
.../cli/commands/DiskStoreCommandsDUnitTest.java | 2 +-
.../IndexCommandsShareConfigurationDUnitTest.java | 9 +-
.../cli/commands/LocateEntryDUnitTest.java | 12 +-
.../cli/commands/RemoveCommandJsonDUnitTest.java | 10 +-
.../cli/commands/ShowDeadlockDUnitTest.java | 10 +-
.../cli/commands/ShowLogCommandDUnitTest.java | 6 +-
.../cli/commands/TestCustomIdleExpiry.java | 16 +-
.../internal/cli/commands/TestCustomTTLExpiry.java | 16 +-
.../converters/ConfigPropertyConverterTest.java | 75 +
.../internal/cli/domain/ClassNameTest.java | 6 +-
.../cli/remote/OnlineCommandProcessorTest.java | 3 +-
.../cli/shell/GfshExecutionStrategyTest.java | 14 +-
.../internal/cli/shell/GfshInitFileJUnitTest.java | 7 +-
.../geode/pdx/AutoSerializableJUnitTest.java | 20 +-
...aAuthorizationUsingLegacySecurityDUnitTest.java | 19 +
...onUsingLegacySecurityWithFailoverDUnitTest.java | 21 +-
.../ClientExecuteFunctionAuthDUnitTest.java | 3 +
.../DeltaClientAuthorizationDUnitTest.java | 11 +-
.../DeltaClientPostAuthorizationDUnitTest.java | 10 +-
.../QuerySecurityAllowedQueriesDUnitTest.java | 32 +
.../geode/security/query/data/QueryTestObject.java | 3 +
.../org/apache/geode/test/dunit/DebuggerUtils.java | 52 -
.../apache/geode/test/dunit/IgnoredException.java | 11 +-
.../java/org/apache/geode/test/dunit/LocalVM.java | 90 +
.../test/java/org/apache/geode/test/dunit/VM.java | 114 +-
.../apache/geode/test/dunit/rules/CacheRule.java | 50 +-
.../test/junit/assertions/CommandResultAssert.java | 16 +-
.../java/parReg/query/unittest/NewPortfolio.java | 2 +-
.../test/java/parReg/query/unittest/Position.java | 4 +-
.../apache/geode/codeAnalysis/excludedClasses.txt | 3 +-
.../codeAnalysis/sanctionedDataSerializables.txt | 16 +-
.../util/PluckStacksJstackGeneratedDump.txt | 2520 ++++++++++++
.../tier/sockets/ClientToServerDeltaDUnitTest.java | 32 +-
.../cli/commands/ClientCommandsTestUtils.java | 5 +-
.../commands/DescribeClientCommandDUnitTest.java | 23 +-
.../cli/commands/ListClientCommandDUnitTest.java | 12 +-
.../query_additional/operators.html.md.erb | 36 +-
.../query_select/the_where_clause.html.md.erb | 13 +-
.../query_grammar_and_reserved_words.html.md.erb | 58 +-
.../querying_basics/reserved_words.html.md.erb | 2 +-
.../reference/topics/client-cache.html.md.erb | 10 +
.../gfsh/chapter_overview.html.md.erb | 4 +
.../gfsh/command-pages/alter.html.md.erb | 32 +-
.../gfsh/command-pages/create.html.md.erb | 26 +-
.../gfsh/command-pages/put.html.md.erb | 4 +-
.../gfsh/command-pages/start.html.md.erb | 57 +-
.../gfsh/configuring_gfsh.html.md.erb | 13 +
.../tools_modules/gfsh/json_in_gfsh.html.md.erb | 80 +
.../how_the_pool_manages_connections.html.md.erb | 21 +-
.../apache/geode/experimental/driver/Driver.java | 2 +
.../{ProtobufDriver.java => ProtobufChannel.java} | 115 +-
.../geode/experimental/driver/ProtobufDriver.java | 135 +-
.../experimental/driver/ProtobufQueryService.java | 104 +
.../geode/experimental/driver/ProtobufRegion.java | 99 +-
.../apache/geode/experimental/driver/Query.java | 10 +-
.../geode/experimental/driver/QueryService.java | 7 +-
.../apache/geode/experimental/driver/Region.java | 9 +
.../experimental/driver/DriverConnectionTest.java | 6 -
...onnectionTest.java => IntegrationTestBase.java} | 70 +-
.../driver/QueryServiceIntegrationTest.java | 41 +-
.../experimental/driver/RegionIntegrationTest.java | 60 +-
.../lucene/internal/IndexRepositoryFactory.java | 77 +-
.../cache/lucene/internal/LuceneEventListener.java | 3 +-
.../cache/lucene/internal/LuceneServiceImpl.java | 60 +-
.../lucene/internal/filesystem/FileSystem.java | 5 +-
.../apache/geode/cache/lucene/LuceneDUnitTest.java | 10 +-
.../lucene/LuceneIndexCreationIntegrationTest.java | 24 +-
.../LuceneIndexMaintenanceIntegrationTest.java | 19 +-
.../cache/lucene/LuceneQueriesAccessorBase.java | 11 +-
.../geode/cache/lucene/LuceneQueriesDUnitTest.java | 109 +-
.../cache/lucene/LuceneQueriesIntegrationTest.java | 67 +-
.../LuceneQueriesReindexClientDUnitTest.java | 72 +
.../lucene/LuceneQueriesReindexDUnitTest.java | 128 +
...hRegionCreatedBeforeReindexClientDUnitTest.java | 66 +
...iesWithRegionCreatedBeforeReindexDUnitTest.java | 76 +
...hRegionCreatedBeforeReindexIntegrationTest.java | 61 +
...eriesWithReindexFlagEnabledClientDUnitTest.java | 41 +-
...ceneQueriesWithReindexFlagEnabledDUnitTest.java | 51 +
...eriesWithReindexFlagEnabledIntegrationTest.java | 28 +-
.../lucene/RebalanceWithRedundancyDUnitTest.java | 14 +-
.../internal/LuceneEventListenerJUnitTest.java | 15 +
.../PartitionedRepositoryManagerJUnitTest.java | 31 +-
.../internal/cli/LuceneIndexCommandsDUnitTest.java | 23 +-
.../management/LuceneManagementDUnitTest.java | 8 +-
.../LuceneIndexXmlParserIntegrationJUnitTest.java | 11 +-
.../cache/lucene/test/LuceneTestSerializer.java | 5 +
.../src/main/proto/v1/basicTypes.proto | 16 +
.../src/main/proto/v1/clientProtocol.proto | 9 +
.../src/main/proto/v1/connection_API.proto | 8 +
.../src/main/proto/v1/region_API.proto | 24 +
.../internal/protocol/protobuf/v1/Failure.java | 28 +-
.../v1/LocatorMessageExecutionContext.java | 2 -
.../protobuf/v1/ProtobufOperationContext.java | 40 +-
.../protocol/protobuf/v1/ProtobufOpsProcessor.java | 35 +-
.../protobuf/v1/ProtobufSerializationService.java | 11 +-
.../protobuf/v1/ProtobufStreamProcessor.java | 1 +
.../protobuf/v1/ServerMessageExecutionContext.java | 2 -
.../AbstractFunctionRequestOperationHandler.java | 35 +-
.../DisconnectClientRequestOperationHandler.java | 43 +
...cuteFunctionOnGroupRequestOperationHandler.java | 6 +-
...uteFunctionOnMemberRequestOperationHandler.java | 6 +-
...uteFunctionOnRegionRequestOperationHandler.java | 14 +-
.../operations/GetAllRequestOperationHandler.java | 56 +-
.../v1/operations/GetRequestOperationHandler.java | 18 +-
.../v1/operations/GetServerOperationHandler.java | 11 +-
...ionHandler.java => KeySetOperationHandler.java} | 42 +-
.../OqlQueryRequestOperationHandler.java | 126 +
.../v1/operations/ProtocolVersionHandler.java | 4 -
.../operations/PutAllRequestOperationHandler.java | 53 +-
.../v1/operations/PutRequestOperationHandler.java | 35 +-
.../operations/RemoveRequestOperationHandler.java | 28 +-
.../AuthenticationRequestOperationHandler.java | 9 +
.../registry/ProtobufOperationContextRegistry.java | 67 +-
.../serialization/exception/DecodingException.java | 2 +-
.../serialization/exception/EncodingException.java | 2 +-
...cySecurityProtobufConnectionStateProcessor.java | 6 +-
...NoSecurityProtobufConnectionStateProcessor.java | 6 +-
...obufConnectionAuthenticatingStateProcessor.java | 6 +-
...rotobufConnectionAuthorizingStateProcessor.java | 20 +-
.../ProtobufConnectionHandshakeStateProcessor.java | 6 +-
.../v1/state/ProtobufConnectionStateProcessor.java | 7 +-
...rotobufConnectionTerminatingStateProcessor.java | 6 +-
.../state/exception/ConnectionStateException.java | 3 +-
...dException.java => ExceptionWithErrorCode.java} | 7 +-
.../exception/OperationNotAuthorizedException.java | 13 +-
.../v1/utilities/ProtobufRequestUtilities.java | 17 +
.../protobuf/v1/AuthenticationIntegrationTest.java | 8 +
.../protobuf/v1/AuthorizationIntegrationTest.java | 229 +-
.../v1/DisconnectClientIntegrationTest.java | 137 +
.../v1/acceptance/CacheOperationsJUnitTest.java | 24 +
.../LocatorConnectionAuthenticationDUnitTest.java | 3 +
.../v1/acceptance/LocatorConnectionDUnitTest.java | 3 +
.../GetServerOperationHandlerJUnitTest.java | 71 +-
.../KeySetOperationHandlerJUnitTest.java | 81 +
...ueryRequestOperationHandlerIntegrationTest.java | 164 +
.../OqlQueryRequestOperationHandlerJUnitTest.java | 138 +
.../PutRequestOperationHandlerJUnitTest.java | 14 -
.../v1/utilities/ProtobufUtilitiesJUnitTest.java | 5 +
.../geode/tools/pulse/tests/rules/ServerRule.java | 2 +-
.../org/apache/geode/cache/util/AutoBalancer.java | 50 +-
.../util/AutoBalancerIntegrationJUnitTest.java | 21 +-
.../geode/cache/util/AutoBalancerJUnitTest.java | 14 +-
.../cache/wan/WANRollingUpgradeDUnitTest.java | 42 +
.../GatewayLegacyAuthenticationRegressionTest.java | 179 +-
.../geode/internal/cache/wan/WANTestBase.java | 109 +-
.../ConcurrentWANPropagation_2_DUnitTest.java | 4 +-
.../parallel/ParallelWANConflationDUnitTest.java | 56 +-
.../wan/parallel/ParallelWANStatsDUnitTest.java | 132 +-
.../serial/SerialGatewaySenderQueueDUnitTest.java | 13 +-
.../wan/serial/SerialWANConflationDUnitTest.java | 73 +-
.../wan/serial/SerialWANPropagationDUnitTest.java | 5 +-
.../cache/wan/serial/SerialWANStatsDUnitTest.java | 26 +-
.../support/LoginHandlerInterceptorJUnitTest.java | 16 +-
gradle.properties | 2 +-
612 files changed, 23767 insertions(+), 24543 deletions(-)
copy geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/serialization/exception/EncodingException.java => geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcConnectorException.java (62%)
create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/FutureCachedDeserializable.java
copy geode-core/src/main/java/org/apache/geode/internal/cache/backup/{BackupDestination.java => AbstractBackupWriterConfig.java} (51%)
copy geode-core/src/main/java/org/apache/geode/internal/{InputStreamFilter.java => cache/backup/BackupFilter.java} (76%)
copy geode-core/src/main/java/org/apache/geode/internal/cache/backup/{BackupDestination.java => BackupWriter.java} (92%)
create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/backup/BackupWriterFactory.java
rename geode-core/src/main/java/org/apache/geode/internal/cache/{ => backup}/DiskStoreBackup.java (91%)
delete mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupDestination.java
create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriter.java
copy geode-core/src/main/java/org/apache/geode/internal/cache/backup/{BackupDestination.java => FileSystemBackupWriterConfig.java} (55%)
create mode 100644 geode-core/src/main/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocation.java
rename geode-core/src/main/java/org/apache/geode/internal/cache/backup/{BackupDestination.java => IncrementalBackupFilter.java} (56%)
copy geode-core/src/main/java/org/apache/geode/internal/{InputStreamFilter.java => cache/backup/IncrementalBackupLocation.java} (73%)
delete mode 100644 geode-core/src/main/java/org/apache/geode/internal/util/DebuggerSupport.java
delete mode 100644 geode-core/src/main/java/org/apache/geode/internal/util/SingletonCallable.java
delete mode 100644 geode-core/src/main/java/org/apache/geode/internal/util/SingletonValue.java
delete mode 100644 geode-core/src/main/java/org/apache/geode/internal/util/StackTraceCollector.java
create mode 100644 geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateJndiBindingCommand.java
create mode 100644 geode-core/src/main/java/org/apache/geode/management/internal/cli/converters/ConfigPropertyConverter.java
create mode 100644 geode-core/src/main/java/org/apache/geode/management/internal/cli/exceptions/EntityExistsException.java
create mode 100644 geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateJndiBindingFunction.java
create mode 100644 geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/JndiBindingConfiguration.java
create mode 100644 geode-core/src/test/java/org/apache/geode/cache/CreateAndLocalDestroyInTXRegressionTest.java
rename geode-core/src/test/java/org/apache/geode/{internal/cache/PRCacheListenerInvocationTest.java => cache/PRCacheListenerDistributedTest.java} (77%)
rename geode-core/src/test/java/org/apache/geode/{internal/cache/PRCacheListenerWithInterestPolicyAllInvocationTest.java => cache/PRCacheListenerWithInterestPolicyAllDistributedTest.java} (70%)
create mode 100644 geode-core/src/test/java/org/apache/geode/cache/PRCreationTotalNumBucketsDistributedTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/cache/RegionShortcutTest.java
rename geode-core/src/test/java/org/apache/geode/{internal/cache/ReplicateCacheListenerInvocationTest.java => cache/ReplicateCacheListenerDistributedTest.java} (76%)
delete mode 100644 geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRBasicIndexCreationDeadlockDUnitTest.java
delete mode 100755 geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryCacheCloseDUnitTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/cache/query/partitioned/PRQueryPortfolioDUnitTest.java
delete mode 100644 geode-core/src/test/java/org/apache/geode/cache30/CreateAndLocalDestroyInTXRegressionTest.java
delete mode 100644 geode-core/src/test/java/org/apache/geode/cache30/EntriesDoNotExpireDuringGIIRegressionTest.java
copy geode-core/src/test/java/org/apache/geode/{cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java => internal/cache/EnforceUniqueHostForLonerIntegrationTest.java} (50%)
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/EntriesDoNotExpireDuringGiiRegressionTest.java
rename geode-core/src/test/java/org/apache/geode/internal/cache/{PREntryIdleExpirationTest.java => PREntryIdleExpirationDistributedTest.java} (94%)
delete mode 100755 geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAsSubRegionDUnitTest.java
create mode 100755 geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAsSubregionDUnitTest.java
rename geode-core/src/test/java/org/apache/geode/internal/cache/{PartitionedRegionCloseDUnitTest.java => PartitionedRegionCloseDistributedTest.java} (92%)
delete mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionDUnitTestCase.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionGetSomeKeys.java
rename geode-core/src/test/java/org/apache/geode/internal/cache/{ReplicateEntryIdleExpirationTest.java => ReplicateEntryIdleExpirationDistributedTest.java} (98%)
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/TestObjectWithIdentifier.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/backup/AbstractBackupWriterConfigTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/backup/BackupWriterFactoryTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemBackupWriterConfigTest.java
rename geode-core/src/test/java/org/apache/geode/internal/cache/backup/{FileSystemBackupDestinationTest.java => FileSystemBackupWriterTest.java} (73%)
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/backup/FileSystemIncrementalBackupLocationTest.java
copy geode-core/src/{main/java/org/apache/geode/cache/query/internal/parse/ASTParameter.java => test/java/org/apache/geode/internal/cache/execute/BooleanFunction.java} (57%)
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/execute/ExecuteFunctionInstanceRegressionTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/execute/FunctionExceptionsIncludeLocalMemberRegressionTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/execute/MonthBasedPartitionResolver.java
delete mode 100755 geode-core/src/test/java/org/apache/geode/internal/cache/execute/PRPerformanceTestDUnitTest.java
copy geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/HikariJdbcDataSource.java => geode-core/src/test/java/org/apache/geode/internal/cache/execute/SerializableMonth.java (52%)
copy geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/serialization/exception/EncodingException.java => geode-core/src/test/java/org/apache/geode/internal/cache/execute/TimeKeeper.java (61%)
rename geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/{ClientProxyWithDeltaTest.java => ClientProxyWithDeltaDistributedTest.java} (94%)
rename geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/{ClientWithInterestFailoverTest.java => ClientWithInterestFailoverDistributedTest.java} (99%)
rename geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/{RegisterInterestServerMetaDataTest.java => RegisterInterestServerMetaDataDistributedTest.java} (99%)
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/tx/SetOperationJTADistributedTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/versions/RegionVersionVectorIntegrationTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessorJUnitTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderHelper.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/util/PluckStacksDUnitTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/internal/util/PluckStacksUnitTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateJndiBindingCommandDUnitTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateJndiBindingCommandTest.java
create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateRegionSecurityDUnitTest.java
copy geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/exception/OperationNotAuthorizedException.java => geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/TestCustomIdleExpiry.java (67%)
copy geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/exception/OperationNotAuthorizedException.java => geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/TestCustomTTLExpiry.java (67%)
create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/cli/converters/ConfigPropertyConverterTest.java
delete mode 100755 geode-core/src/test/java/org/apache/geode/test/dunit/DebuggerUtils.java
create mode 100644 geode-core/src/test/java/org/apache/geode/test/dunit/LocalVM.java
create mode 100644 geode-core/src/test/resources/org/apache/geode/internal/util/PluckStacksJstackGeneratedDump.txt
create mode 100644 geode-docs/tools_modules/gfsh/json_in_gfsh.html.md.erb
copy geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/{ProtobufDriver.java => ProtobufChannel.java} (68%)
create mode 100644 geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufQueryService.java
copy geode-core/src/main/java/org/apache/geode/internal/InputStreamFilter.java => geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java (80%)
copy geode-core/src/main/java/org/apache/geode/internal/InputStreamFilter.java => geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java (84%)
copy geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/{DriverConnectionTest.java => IntegrationTestBase.java} (59%)
copy geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAPIConserveSocketsFalseDUnitTest.java => geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/QueryServiceIntegrationTest.java (52%)
create mode 100644 geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexClientDUnitTest.java
create mode 100644 geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexDUnitTest.java
create mode 100755 geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesWithRegionCreatedBeforeReindexClientDUnitTest.java
create mode 100644 geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesWithRegionCreatedBeforeReindexDUnitTest.java
create mode 100644 geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesWithRegionCreatedBeforeReindexIntegrationTest.java
copy geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionAPIConserveSocketsFalseDUnitTest.java => geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesWithReindexFlagEnabledClientDUnitTest.java (50%)
mode change 100644 => 100755
create mode 100644 geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesWithReindexFlagEnabledDUnitTest.java
copy geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/serialization/exception/DecodingException.java => geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesWithReindexFlagEnabledIntegrationTest.java (55%)
create mode 100644 geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/DisconnectClientRequestOperationHandler.java
copy geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/{GetRequestOperationHandler.java => KeySetOperationHandler.java} (63%)
create mode 100644 geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandler.java
copy geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/exception/{OperationNotAuthorizedException.java => ExceptionWithErrorCode.java} (81%)
create mode 100644 geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/DisconnectClientIntegrationTest.java
create mode 100644 geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/KeySetOperationHandlerJUnitTest.java
create mode 100644 geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerIntegrationTest.java
create mode 100644 geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerJUnitTest.java
rename geode-wan/src/test/java/org/apache/geode/internal/cache/wan/{misc => }/GatewayLegacyAuthenticationRegressionTest.java (71%)
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.
[geode] 01/01: GEODE-4647: add a stats eventSecondaryQueueSizeId to
track events in secondary gateway sender queue
Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-4647
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 140c83e006ba8b604a2c2df0f6e7502260e03ab9
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Feb 21 21:31:07 2018 -0800
GEODE-4647: add a stats eventSecondaryQueueSizeId to track events in secondary
gateway sender queue
---
.../asyncqueue/internal/AsyncEventQueueStats.java | 3 +
.../internal/cache/AbstractBucketRegionQueue.java | 8 +
.../geode/internal/cache/AbstractRegionMap.java | 4 +
.../apache/geode/internal/cache/BucketAdvisor.java | 4 +
.../geode/internal/cache/BucketRegionQueue.java | 2 +
.../internal/cache/wan/AbstractGatewaySender.java | 6 +
.../wan/AbstractGatewaySenderEventProcessor.java | 17 +++
.../internal/cache/wan/GatewaySenderStats.java | 57 +++++++
.../wan/parallel/ParallelQueueRemovalMessage.java | 3 +
.../SerialAsyncEventQueueImplJUnitTest.java | 3 +
.../cache/wan/AsyncEventQueueTestBase.java | 16 +-
.../asyncqueue/AsyncEventQueueStatsDUnitTest.java | 45 ++++--
.../ParallelQueueRemovalMessageJUnitTest.java | 20 +++
.../bean/stats/AsyncEventQueueStatsJUnitTest.java | 2 -
.../geode/internal/cache/wan/WANTestBase.java | 97 +++++++++++-
.../parallel/ParallelWANConflationDUnitTest.java | 56 +++++--
.../wan/parallel/ParallelWANStatsDUnitTest.java | 167 +++++++++++++++++++++
.../serial/SerialGatewaySenderQueueDUnitTest.java | 13 +-
.../wan/serial/SerialWANConflationDUnitTest.java | 73 ++++++++-
.../wan/serial/SerialWANPropagationDUnitTest.java | 1 +
20 files changed, 564 insertions(+), 33 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
index dee2c92..faa5db4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueStats.java
@@ -42,6 +42,8 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
"nanoseconds"),
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
+ f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the event secondary queue.",
+ "operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
"operations", false),
f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -106,6 +108,7 @@ public class AsyncEventQueueStats extends GatewaySenderStats {
eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+ eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index d6822da..267ec36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -230,6 +230,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
this.gatewaySenderStats.decQueueSize(size);
}
+ public void decSecondaryQueueSize(int size) {
+ this.gatewaySenderStats.decSecondaryQueueSize(size);
+ }
+
public void decQueueSize() {
this.gatewaySenderStats.decQueueSize();
}
@@ -238,6 +242,10 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
this.gatewaySenderStats.incQueueSize(size);
}
+ public void incSecondaryQueueSize(int size) {
+ this.gatewaySenderStats.incSecondaryQueueSize(size);
+ }
+
public void incQueueSize() {
this.gatewaySenderStats.incQueueSize();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 68d7134..6c47b71 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1042,6 +1042,10 @@ public abstract class AbstractRegionMap
} finally {
if (done && result) {
initialImagePutEntry(newRe);
+ if (owner instanceof BucketRegionQueue) {
+ BucketRegionQueue brq = (BucketRegionQueue) owner;
+ brq.addToEventQueue(key, done, event);
+ }
}
if (!done) {
removeEntry(key, newRe, false);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
index fc14773..8fd0e87 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java
@@ -282,8 +282,11 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
if (this.getBucket() instanceof BucketRegionQueue) {
BucketRegionQueue brq = (BucketRegionQueue) this.getBucket();
brq.decQueueSize(brq.size());
+ brq.incSecondaryQueueSize(brq.size());
}
sendProfileUpdate();
+ } else {
+ BucketRegionQueue brq = (BucketRegionQueue) this.getBucket();
}
}
} else {
@@ -1192,6 +1195,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
BucketRegionQueue brq = (BucketRegionQueue) br;
brq.incQueueSize(brq.size());
+ brq.decSecondaryQueueSize(brq.size());
}
if (br != null && br instanceof BucketRegion) {
((BucketRegion) br).afterAcquiringPrimaryState();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 4e5451e..0baa204 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -449,6 +449,8 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
if (this.getBucketAdvisor().isPrimary()) {
incQueueSize(1);
+ } else {
+ incSecondaryQueueSize(1);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index a134e1e..268bbb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -1095,6 +1095,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
}
statistics.setQueueSize(0);
+ statistics.setSecondaryQueueSize(0);
statistics.setTempQueueSize(0);
}
@@ -1251,6 +1252,11 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return localProcessor == null ? 0 : localProcessor.eventQueueSize();
}
+ public int getEventSecondaryQueueSize() {
+ AbstractGatewaySenderEventProcessor localProcessor = this.eventProcessor;
+ return localProcessor == null ? 0 : localProcessor.eventSecondaryQueueSize();
+ }
+
public void setEnqueuedAllTempQueueEvents(boolean enqueuedAllTempQueueEvents) {
this.enqueuedAllTempQueueEvents = enqueuedAllTempQueueEvents;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 9309e43..1a12abf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -270,6 +270,23 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
return this.queue.size();
}
+ public int eventSecondaryQueueSize() {
+ if (queue == null) {
+ return 0;
+ }
+
+ // if parallel, get both primary and secondary queues' size, then substract primary queue's size
+ if (this.queue instanceof ParallelGatewaySenderQueue) {
+ return ((ParallelGatewaySenderQueue) queue).localSize(true)
+ - ((ParallelGatewaySenderQueue) queue).localSize(false);
+ }
+ if (this.queue instanceof ConcurrentParallelGatewaySenderQueue) {
+ return ((ConcurrentParallelGatewaySenderQueue) queue).localSize(true)
+ - ((ConcurrentParallelGatewaySenderQueue) queue).localSize(false);
+ }
+ return this.queue.size();
+ }
+
/**
* @return the sender
*/
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
index c7fd370..32129be 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderStats.java
@@ -46,6 +46,8 @@ public class GatewaySenderStats {
protected static final String EVENT_QUEUE_TIME = "eventQueueTime";
/** Name of the event queue size statistic */
protected static final String EVENT_QUEUE_SIZE = "eventQueueSize";
+ /** Name of the event secondary queue size statistic */
+ protected static final String EVENT_SECONDARY_QUEUE_SIZE = "eventSecondaryQueueSize";
/** Name of the event temporary queue size statistic */
protected static final String TMP_EVENT_QUEUE_SIZE = "tempQueueSize";
/** Name of the events distributed statistic */
@@ -102,6 +104,8 @@ public class GatewaySenderStats {
protected static int eventQueueTimeId;
/** Id of the event queue size statistic */
protected static int eventQueueSizeId;
+ /** Id of the event in secondary queue size statistic */
+ protected static int eventSecondaryQueueSizeId;
/** Id of the temp event queue size statistic */
protected static int eventTmpQueueSizeId;
/** Id of the events distributed statistic */
@@ -164,6 +168,8 @@ public class GatewaySenderStats {
f.createLongCounter(EVENT_QUEUE_TIME, "Total time spent queueing events.",
"nanoseconds"),
f.createIntGauge(EVENT_QUEUE_SIZE, "Size of the event queue.", "operations", false),
+ f.createIntGauge(EVENT_SECONDARY_QUEUE_SIZE, "Size of the event in secondary queue.",
+ "operations", false),
f.createIntGauge(TMP_EVENT_QUEUE_SIZE, "Size of the temporary events.", "operations",
false),
f.createIntCounter(EVENTS_NOT_QUEUED_CONFLATED,
@@ -232,6 +238,7 @@ public class GatewaySenderStats {
eventsNotQueuedConflatedId = type.nameToId(EVENTS_NOT_QUEUED_CONFLATED);
eventQueueTimeId = type.nameToId(EVENT_QUEUE_TIME);
eventQueueSizeId = type.nameToId(EVENT_QUEUE_SIZE);
+ eventSecondaryQueueSizeId = type.nameToId(EVENT_SECONDARY_QUEUE_SIZE);
eventTmpQueueSizeId = type.nameToId(TMP_EVENT_QUEUE_SIZE);
eventsDistributedId = type.nameToId(EVENTS_DISTRIBUTED);
eventsExceedingAlertThresholdId = type.nameToId(EVENTS_EXCEEDING_ALERT_THRESHOLD);
@@ -350,6 +357,15 @@ public class GatewaySenderStats {
}
/**
+ * Returns the current value of the "eventSecondaryQueueSize" stat.
+ *
+ * @return the current value of the "eventSecondaryQueueSize" stat
+ */
+ public int getEventSecondaryQueueSize() {
+ return this.stats.getInt(eventSecondaryQueueSizeId);
+ }
+
+ /**
* Returns the current value of the "tempQueueSize" stat.
*
* @return the current value of the "tempQueueSize" stat.
@@ -454,6 +470,15 @@ public class GatewaySenderStats {
}
/**
+ * Sets the "eventSecondaryQueueSize" stat.
+ *
+ * @param size The size of the secondary queue
+ */
+ public void setSecondaryQueueSize(int size) {
+ this.stats.setInt(eventSecondaryQueueSizeId, size);
+ }
+
+ /**
* Sets the "tempQueueSize" stat.
*
* @param size The size of the temp queue
@@ -471,6 +496,13 @@ public class GatewaySenderStats {
}
/**
+ * Increments the "eventSecondaryQueueSize" stat by 1.
+ */
+ public void incSecondaryQueueSize() {
+ this.stats.incInt(eventSecondaryQueueSizeId, 1);
+ }
+
+ /**
* Increments the "tempQueueSize" stat by 1.
*/
public void incTempQueueSize() {
@@ -487,6 +519,15 @@ public class GatewaySenderStats {
}
/**
+ * Increments the "eventSecondaryQueueSize" stat by given delta.
+ *
+ * @param delta an integer by which secondary queue size to be increased
+ */
+ public void incSecondaryQueueSize(int delta) {
+ this.stats.incInt(eventSecondaryQueueSizeId, delta);
+ }
+
+ /**
* Increments the "tempQueueSize" stat by given delta.
*
* @param delta an integer by which temp queue size to be increased
@@ -503,6 +544,13 @@ public class GatewaySenderStats {
}
/**
+ * Decrements the "eventSecondaryQueueSize" stat by 1.
+ */
+ public void decSecondaryQueueSize() {
+ this.stats.incInt(eventSecondaryQueueSizeId, -1);
+ }
+
+ /**
* Decrements the "tempQueueSize" stat by 1.
*/
public void decTempQueueSize() {
@@ -519,6 +567,15 @@ public class GatewaySenderStats {
}
/**
+ * Decrements the "eventSecondaryQueueSize" stat by given delta.
+ *
+ * @param delta an integer by which secondary queue size to be increased
+ */
+ public void decSecondaryQueueSize(int delta) {
+ this.stats.incInt(eventSecondaryQueueSizeId, -delta);
+ }
+
+ /**
* Decrements the "tempQueueSize" stat by given delta.
*
* @param delta an integer by which temp queue size to be increased
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index 39fedbf..df89e36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -183,6 +183,9 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
brq.destroyKey(key);
+ if (!brq.getBucketAdvisor().isPrimary()) {
+ prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
+ }
if (isDebugEnabled) {
logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, prQ.getName(),
brq.getId());
diff --git a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
index cdef8bd..aea1fb4 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImplJUnitTest.java
@@ -50,14 +50,17 @@ public class SerialAsyncEventQueueImplJUnitTest {
attrs.id = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache, attrs);
queue.getStatistics().incQueueSize(5);
+ queue.getStatistics().incSecondaryQueueSize(6);
queue.getStatistics().incTempQueueSize(10);
assertEquals(5, queue.getStatistics().getEventQueueSize());
+ assertEquals(6, queue.getStatistics().getEventSecondaryQueueSize());
assertEquals(10, queue.getStatistics().getTempEventQueueSize());
queue.stop();
assertEquals(0, queue.getStatistics().getEventQueueSize());
+ assertEquals(0, queue.getStatistics().getEventSecondaryQueueSize());
assertEquals(0, queue.getStatistics().getTempEventQueueSize());
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
index de03f2b..0e81e49 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -44,8 +44,10 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.awaitility.Awaitility;
import org.junit.experimental.categories.Category;
import org.apache.geode.DataSerializable;
@@ -737,17 +739,29 @@ public class AsyncEventQueueTestBase extends JUnit4DistributedTestCase {
}
public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
- final int eventsReceived, final int eventsQueued, final int eventsDistributed) {
+ int secondaryQueueSize, final int eventsReceived, final int eventsQueued,
+ final int eventsDistributed) {
Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
AsyncEventQueue queue = null;
+ boolean isParallel = false;
for (AsyncEventQueue q : asyncQueues) {
+ isParallel = q.isParallel();
if (q.getId().equals(queueId)) {
queue = q;
break;
}
}
final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue).getStatistics();
+ Awaitility.await().atMost(30000, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+ .until(() -> assertEquals("Expected queue size: " + queueSize + " but actual queue size: "
+ + statistics.getEventQueueSize(), queueSize, statistics.getEventQueueSize()));
assertEquals(queueSize, statistics.getEventQueueSize());
+ if (isParallel) {
+ assertEquals(secondaryQueueSize, statistics.getEventSecondaryQueueSize());
+ } else {
+ // for serial queue, evenvSecondaryQueueSize is not used
+ assertEquals(0, statistics.getEventSecondaryQueueSize());
+ }
assertEquals(eventsReceived, statistics.getEventsReceived());
assertEquals(eventsQueued, statistics.getEventsQueued());
assert (statistics.getEventsDistributed() >= eventsDistributed);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
index b131b46..17288e0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
@@ -75,9 +75,10 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
// sender
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 1000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 1000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 10));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1000, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1000, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln", 0));
}
@@ -120,19 +121,43 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue(
getTestMethodName() + "_RR", "ln1,ln2", isOffHeap()));
+ vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln1"));
+ vm1.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue("ln2"));
+
vm1.invoke(() -> AsyncEventQueueTestBase.doPuts(getTestMethodName() + "_RR", 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 1000, 0, 1000, 1000, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 1000, 0, 1000, 0, 0));
+
+ vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln1"));
+ vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+ vm2.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+ vm3.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+ vm4.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln2"));
+
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln1", 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln2", 1000));
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 1000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 1000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 10));
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 1000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 1000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 10));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 1000, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln1", 0, 0, 1000, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln1", 0));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 1000, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln2", 0, 0, 1000, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueBatchStats("ln2", 0));
}
@@ -230,11 +255,12 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1500));
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 1500, 1500));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 1500, 1500));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 0));
- vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 1500, 0, 0));
+ vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 1500, 0, 0));
vm2.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueUnprocessedStats("ln", 1500));
}
@@ -302,7 +328,8 @@ public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
vm1.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000));
Wait.pause(2000);// give some time for system to become stable
- vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 2000, 2000, 1000));
+ vm1.invoke(
+ () -> AsyncEventQueueTestBase.checkAsyncEventQueueStats("ln", 0, 0, 2000, 2000, 1000));
vm1.invoke(() -> AsyncEventQueueTestBase.checkAsyncEventQueueConflatedStats("ln", 500));
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 5e0f704..44afc0b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -65,6 +65,8 @@ import org.apache.geode.internal.cache.eviction.EvictionController;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
+import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.test.fake.Fakes;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -177,12 +179,21 @@ public class ParallelQueueRemovalMessageJUnitTest {
new BucketRegionQueueHelper(this.cache, this.queueRegion, this.bucketRegionQueue);
}
+ private GatewaySenderStats setSecondaryQueueSize(int size) {
+ GatewaySenderStats stats = new GatewaySenderStats(new DummyStatisticsFactory(), "ln");
+ stats.setSecondaryQueueSize(1);
+ when(this.queueRegion.getParallelGatewaySender().getStatistics()).thenReturn(stats);
+ assertEquals(1, stats.getEventSecondaryQueueSize());
+ return stats;
+ }
+
@Test
public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue()
throws Exception {
// Validate initial BucketRegionQueue state
assertFalse(this.bucketRegionQueue.isInitialized());
assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+ GatewaySenderStats stats = setSecondaryQueueSize(1);
// Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to
// add a key)
@@ -190,6 +201,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+ assertEquals(1, stats.getEventSecondaryQueueSize());
}
@Test
@@ -201,6 +213,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Add an event to the BucketRegionQueue and verify BucketRegionQueue state
this.bucketRegionQueueHelper.addEvent(KEY);
assertEquals(1, this.bucketRegionQueue.size());
+ GatewaySenderStats stats = setSecondaryQueueSize(1);
// Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
// DESTROYED)
@@ -210,6 +223,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Clean up destroyed tokens and validate BucketRegionQueue
this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
assertEquals(0, this.bucketRegionQueue.size());
+ assertEquals(0, stats.getEventSecondaryQueueSize());
}
@Test
@@ -225,6 +239,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
BlockingQueue<GatewaySenderEventImpl> tempQueue =
createTempQueueAndAddEvent(processor, mock(GatewaySenderEventImpl.class));
assertEquals(1, tempQueue.size());
+ GatewaySenderStats stats = setSecondaryQueueSize(1);
// Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to
// add a key)
@@ -232,6 +247,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate temp queue is empty after processing ParallelQueueRemovalMessage
assertEquals(0, tempQueue.size());
+ assertEquals(1, stats.getEventSecondaryQueueSize());
}
@Test
@@ -251,6 +267,9 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Add a mock GatewaySenderEventImpl to the temp queue
BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(processor, event);
assertEquals(1, tempQueue.size());
+ GatewaySenderStats stats = mock(GatewaySenderStats.class);
+ stats.setSecondaryQueueSize(1);
+ when(this.queueRegion.getParallelGatewaySender().getStatistics()).thenReturn(stats);
// Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to
// DESTROYED)
@@ -259,6 +278,7 @@ public class ParallelQueueRemovalMessageJUnitTest {
// Validate temp queue is empty after processing ParallelQueueRemovalMessage
assertEquals(0, tempQueue.size());
+ assertEquals(0, stats.getEventSecondaryQueueSize());
// Clean up destroyed tokens
this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
index ea246bc..e7d38b4 100644
--- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/AsyncEventQueueStatsJUnitTest.java
@@ -52,8 +52,6 @@ public class AsyncEventQueueStatsJUnitTest extends MBeanStatsTestCase {
sample();
assertEquals(0, getEventQueueSize());
-
-
}
private int getEventQueueSize() {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index c29d66e..a792815 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -120,9 +120,11 @@ import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.CacheConfig;
import org.apache.geode.internal.cache.CacheServerImpl;
@@ -139,6 +141,8 @@ import org.apache.geode.internal.cache.execute.data.Order;
import org.apache.geode.internal.cache.execute.data.OrderId;
import org.apache.geode.internal.cache.execute.data.Shipment;
import org.apache.geode.internal.cache.execute.data.ShipmentId;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage;
+import org.apache.geode.internal.cache.partitioned.BecomePrimaryBucketMessage.BecomePrimaryBucketResponse;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
@@ -1175,12 +1179,57 @@ public class WANTestBase extends JUnit4DistributedTestCase {
return connectionInfo;
}
+ public static void moveAllPrimaryBuckets(String senderId, final DistributedMember destination,
+ final String regionName) {
+
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+ final RegionQueue regionQueue;
+ regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+ if (sender.isParallel()) {
+ ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+ (ConcurrentParallelGatewaySenderQueue) regionQueue;
+ PartitionedRegion prQ =
+ parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0];
+
+ Set<Integer> primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds();
+ for (int bid : primaryBucketIds) {
+ movePrimary(destination, regionName, bid);
+ }
+
+ // double check after moved all primary buckets
+ primaryBucketIds = prQ.getDataStore().getAllLocalPrimaryBucketIds();
+ assertTrue(primaryBucketIds.isEmpty());
+ }
+ }
+
+ public static void movePrimary(final DistributedMember destination, final String regionName,
+ final int bucketId) {
+ PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
+
+ BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage
+ .send((InternalDistributedMember) destination, region, bucketId, true);
+ assertNotNull(response);
+ assertTrue(response.waitForResponse());
+ }
+
+ public static int getSecondaryQueueSizeInStats(String senderId) {
+ AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
+ GatewaySenderStats statistics = sender.getStatistics();
+ return statistics.getEventSecondaryQueueSize();
+ }
+
public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize) {
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
GatewaySenderStats statistics = sender.getStatistics();
if (expectedQueueSize != -1) {
final RegionQueue regionQueue;
regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+ if (sender.isParallel()) {
+ ConcurrentParallelGatewaySenderQueue parallelGatewaySenderQueue =
+ (ConcurrentParallelGatewaySenderQueue) regionQueue;
+ PartitionedRegion pr =
+ parallelGatewaySenderQueue.getRegions().toArray(new PartitionedRegion[1])[0];
+ }
Awaitility.await().atMost(120, TimeUnit.SECONDS)
.until(() -> assertEquals("Expected queue entries: " + expectedQueueSize
+ " but actual entries: " + regionQueue.size(), expectedQueueSize,
@@ -1197,9 +1246,28 @@ public class WANTestBase extends JUnit4DistributedTestCase {
stats.add(statistics.getEventsNotQueuedConflated());
stats.add(statistics.getEventsConflatedFromBatches());
stats.add(statistics.getConflationIndexesMapSize());
+ stats.add(statistics.getEventSecondaryQueueSize());
return stats;
}
+ protected static int getTotalBucketQueueSize(PartitionedRegion prQ, boolean isPrimary) {
+ int size = 0;
+ if (prQ != null) {
+ Set<Map.Entry<Integer, BucketRegion>> allBuckets = prQ.getDataStore().getAllLocalBuckets();
+ List<Integer> thisProcessorBuckets = new ArrayList<Integer>();
+
+ for (Map.Entry<Integer, BucketRegion> bucketEntry : allBuckets) {
+ BucketRegion bucket = bucketEntry.getValue();
+ int bId = bucket.getId();
+ if ((isPrimary && bucket.getBucketAdvisor().isPrimary())
+ || (!isPrimary && !bucket.getBucketAdvisor().isPrimary())) {
+ size += bucket.size();
+ }
+ }
+ }
+ return size;
+ }
+
public static void checkQueueStats(String senderId, final int queueSize, final int eventsReceived,
final int eventsQueued, final int eventsDistributed) {
GatewaySenderStats statistics = getGatewaySenderStats(senderId);
@@ -3102,6 +3170,19 @@ public class WANTestBase extends JUnit4DistributedTestCase {
return getQueueContentSize(senderId, false);
}
+ public static Integer getSecondaryQueueContentSize(final String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+ return abstractSender.getEventSecondaryQueueSize();
+ }
+
public static Integer getQueueContentSize(final String senderId, boolean includeSecondary) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
@@ -3113,9 +3194,7 @@ public class WANTestBase extends JUnit4DistributedTestCase {
}
if (!sender.isParallel()) {
- if (includeSecondary) {
- fail("Not implemented yet");
- }
+ // if sender is serial, the queues will be all primary or all secondary at one member
final Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
int size = 0;
for (RegionQueue q : queues) {
@@ -3131,10 +3210,10 @@ public class WANTestBase extends JUnit4DistributedTestCase {
return ((ParallelGatewaySenderQueue) regionQueue).localSize(includeSecondary);
} else {
if (includeSecondary) {
- fail("Not Implemented yet");
+ fail("Not implemented yet");
+ regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
+ return regionQueue.getRegion().size();
}
- regionQueue = ((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
- return regionQueue.getRegion().size();
}
}
fail("Not yet implemented?");
@@ -3180,6 +3259,8 @@ public class WANTestBase extends JUnit4DistributedTestCase {
((AbstractGatewaySender) sender).getQueues().toArray(new RegionQueue[1])[0];
Set<BucketRegion> buckets = ((PartitionedRegion) regionQueue.getRegion()).getDataStore()
.getAllLocalPrimaryBucketRegions();
+ final AbstractGatewaySender abstractSender = (AbstractGatewaySender) sender;
+
for (final BucketRegion bucket : buckets) {
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
assertEquals("Expected bucket entries for bucket: " + bucket.getId()
@@ -3188,6 +3269,10 @@ public class WANTestBase extends JUnit4DistributedTestCase {
bucket.keySet().size());
});
} // for loop ends
+ assertEquals("Except events in all primary queues after drain is 0", 0,
+ abstractSender.getEventQueueSize());
+ assertEquals("Except events in all secondary queues after drain is 0", 0,
+ abstractSender.getEventSecondaryQueueSize());
} finally {
exp.remove();
exp1.remove();
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index a792133..6cdfb74 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -66,6 +66,11 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> checkQueueSize("ln", (keyValues.size() + updateKeyValues.size())));
+ vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
+
+ // Since no conflation, all updates are in queue
+ vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + 2 * updateKeyValues.size()));
+
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
resumeSenders();
@@ -91,7 +96,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm6.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true));
vm7.invoke(() -> createSender("ln", 2, true, 100, 50, false, false, null, true));
- createSenderPRs();
+ createSenderPRs(1);
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
@@ -108,24 +113,35 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
}
+ // sender did not turn on conflation, so queue size will be 100 (otherwise it will be 20)
+ vm4.invoke(() -> checkQueueSize("ln", 100));
vm4.invoke(() -> enableConflation("ln"));
vm5.invoke(() -> enableConflation("ln"));
vm6.invoke(() -> enableConflation("ln"));
vm7.invoke(() -> enableConflation("ln"));
- resumeSenders();
-
ArrayList<Integer> v4List =
- (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 100));
ArrayList<Integer> v5List =
- (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 100));
ArrayList<Integer> v6List =
- (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 100));
ArrayList<Integer> v7List =
- (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 100));
+ assertTrue("Event in secondary queue should be 100",
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 100);
+
+ resumeSenders();
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
assertTrue("No events conflated in batch",
(v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+ assertEquals("Event in secondary queue should be 0 after dispatched", 0,
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)));
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
@@ -160,12 +176,14 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
// aren't
// conflated
+ validateEventSecondaryQueueSize(keyValues.size() + updateKeyValues.size(), redundancy);
vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), updateKeyValues));
- vm4.invoke(() -> checkQueueSize("ln", keyValues.size() + updateKeyValues.size())); // creates
- // aren't
- // conflated
+ int expectedEventNumAfterConflation = keyValues.size() + updateKeyValues.size();
+ vm4.invoke(() -> checkQueueSize("ln", expectedEventNumAfterConflation));
+
+ validateEventSecondaryQueueSize(expectedEventNumAfterConflation, redundancy);
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 0));
@@ -173,6 +191,24 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
keyValues.putAll(updateKeyValues);
validateReceiverRegionSize(keyValues);
+
+ // after dispatch, both primary and secondary queues are empty
+ vm4.invoke(() -> checkQueueSize("ln", 0));
+ validateEventSecondaryQueueSize(0, redundancy);
+ }
+
+ private void validateEventSecondaryQueueSize(int expectedNum, int redundancy) {
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", expectedNum));
+ assertTrue("Event in secondary queue should be 100",
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == expectedNum
+ * redundancy);
}
@Test
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 3b6e860..61e75f6 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Region;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.VM;
@@ -52,6 +53,172 @@ public class ParallelWANStatsDUnitTest extends WANTestBase {
}
@Test
+ public void testQueueSizeInSecondaryBucketRegionQueuesWithMemberRestart() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSendersWithConflation(lnPort);
+
+ createSenderPRs(1);
+
+ startPausedSenders();
+
+ createReceiverPR(vm2, 1);
+ putKeyValues();
+
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+ assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue
+ // size
+ assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
+ assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
+ // queued
+ assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+
+ // stop vm7 to trigger rebalance and move some primary buckets
+ System.out.println("Current secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10)
+ + ":" + v6List.get(10) + ":" + v7List.get(10));
+ vm7.invoke(() -> WANTestBase.closeCache());
+ Awaitility.await().atMost(120, TimeUnit.SECONDS).until(() -> {
+ int v4secondarySize = vm4.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+ int v5secondarySize = vm5.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+ int v6secondarySize = vm6.invoke(() -> WANTestBase.getSecondaryQueueSizeInStats("ln"));
+ assertEquals(NUM_PUTS, v4secondarySize + v5secondarySize + v6secondarySize); // secondary
+ // queue
+ // size
+ });
+ System.out.println("New secondary queue sizes:" + v4List.get(10) + ":" + v5List.get(10) + ":"
+ + v6List.get(10));
+
+ vm7.invoke(() -> WANTestBase.createCache(lnPort));
+ vm7.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, true, false, null, true));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(testName, "ln", 1, 10, isOffHeap()));
+ startSenderInVMs("ln", vm7);
+ vm7.invoke(() -> pauseSender("ln"));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ System.out.println("After restart vm7, secondary queue sizes:" + v4List.get(10) + ":"
+ + v5List.get(10) + ":" + v6List.get(10) + ":" + v7List.get(10));
+
+ vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ }
+
+ // TODO: add a test without redudency for primary switch
+ @Test
+ public void testQueueSizeInSecondaryWithPrimarySwitch() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+
+ createSendersWithConflation(lnPort);
+
+ createSenderPRs(1);
+
+ startPausedSenders();
+
+ createReceiverPR(vm2, 1);
+
+ putKeyValues();
+
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+
+ assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0)); // queue
+ // size
+ assertEquals(NUM_PUTS * 2, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); // eventsReceived
+ assertEquals(NUM_PUTS * 2, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); // events
+ // queued
+ assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ // int vm7secondarySizeBeforeMovePrimary = v7List.get(10);
+ // System.out.println("Current secondary queue
+ // sizes:"+v4List.get(10)+":"+v5List.get(10)+":"+v6List.get(10)+":"+v7List.get(10));
+ // System.out.println("Now move a primary bucket");
+ // final DistributedMember vm6member = vm6.invoke(() -> WANTestBase.getMember());
+ // vm7.invoke(() -> WANTestBase.moveAllPrimaryBuckets("ln", vm6member, testName));
+ //
+ // v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", NUM_PUTS));
+ // assertEquals(NUM_PUTS, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); //
+ // secondary
+ // // queue
+ // // size
+ // assertTrue(v7List.get(10) > vm7secondarySizeBeforeMovePrimary);
+
+ vm4.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm5.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm6.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm7.invoke(() -> WANTestBase.resumeSender("ln"));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(testName, NUM_PUTS));
+ vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, NUM_PUTS, NUM_PUTS));
+
+ vm4.invoke(() -> WANTestBase.checkQueueSize("ln", 0));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v5List = (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v6List = (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ v7List = (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); // events
+ // distributed
+ assertEquals(0, v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)); // secondary
+ // queue
+ // size
+ }
+
+ @Test
public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception {
Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index 504a2f8..71b8c6d 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
import static org.junit.Assert.*;
import java.io.File;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
@@ -46,6 +47,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.WANTestBase;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
@@ -102,7 +104,13 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm6.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
- Wait.pause(5000);
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 1000));
+ // secondary queue size stats in serial queue should be 0
+ assertEquals(0, v4List.get(10) + v5List.get(10));
+
HashMap primarySenderUpdates = (HashMap) vm4.invoke(() -> WANTestBase.checkQueue());
HashMap secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
assertEquals(primarySenderUpdates, secondarySenderUpdates);
@@ -137,6 +145,9 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
// removing all the keys.
secondarySenderUpdates = (HashMap) vm5.invoke(() -> WANTestBase.checkQueue());
assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
+
+ vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
}
protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) {
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
index 1e78539..4d9b5c9 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANConflationDUnitTest.java
@@ -42,10 +42,10 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
- vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
- vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
- vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 0, 8, isOffHeap()));
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
vm4.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true));
vm5.invoke(() -> createSender("ln", 2, false, 100, 50, false, false, null, true));
@@ -90,6 +90,71 @@ public class SerialWANConflationDUnitTest extends WANTestBase {
assertTrue("No events conflated in batch",
(v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
+ }
+
+ @Test
+ public void testSerialPropagationPartitionRegionConflationDuringEnqueue() throws Exception {
+ Integer lnPort = (Integer) vm0.invoke(() -> createFirstLocatorWithDSId(1));
+ Integer nyPort = (Integer) vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+
+ vm2.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap()));
+ vm3.invoke(() -> createPartitionedRegion(getTestMethodName(), null, 1, 8, isOffHeap()));
+
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm5.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm6.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+ vm7.invoke(() -> createPartitionedRegion(getTestMethodName(), "ln", 1, 8, isOffHeap()));
+
+ vm4.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+ vm5.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+ vm6.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+ vm7.invoke(() -> createSender("ln", 2, false, 100, 50, true, false, null, true));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> pauseSender("ln"));
+ vm5.invoke(() -> pauseSender("ln"));
+ vm6.invoke(() -> pauseSender("ln"));
+ vm7.invoke(() -> pauseSender("ln"));
+
+
+ final Map keyValues = new HashMap();
+
+ for (int i = 1; i <= 10; i++) {
+ for (int j = 1; j <= 10; j++) {
+ keyValues.put(j, i);
+ }
+ vm4.invoke(() -> putGivenKeyValue(getTestMethodName(), keyValues));
+ }
+
+ ArrayList<Integer> v4List =
+ (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 20));
+ assertTrue("After conflation during enqueue, there should be only 20 events",
+ v4List.get(0) == 20);
+
+ vm4.invoke(() -> resumeSender("ln"));
+ vm5.invoke(() -> resumeSender("ln"));
+ vm6.invoke(() -> resumeSender("ln"));
+ vm7.invoke(() -> resumeSender("ln"));
+
+ v4List = (ArrayList<Integer>) vm4.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> v5List =
+ (ArrayList<Integer>) vm5.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> v6List =
+ (ArrayList<Integer>) vm6.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+ ArrayList<Integer> v7List =
+ (ArrayList<Integer>) vm7.invoke(() -> WANTestBase.getSenderStats("ln", 0));
+
+ assertTrue("No events in secondary queue stats since it's serial sender",
+ (v4List.get(10) + v5List.get(10) + v6List.get(10) + v7List.get(10)) == 0);
+ assertTrue("Total queued events should be 100",
+ (v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)) == 100);
vm2.invoke(() -> validateRegionSize(getTestMethodName(), 10));
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
index db861e4..3819151 100644
--- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java
@@ -162,6 +162,7 @@ public class SerialWANPropagationDUnitTest extends WANTestBase {
IgnoredException.addIgnoredException(BatchException70.class.getName());
IgnoredException.addIgnoredException(ServerOperationException.class.getName());
IgnoredException.addIgnoredException(IOException.class.getName());
+ IgnoredException.addIgnoredException(java.net.SocketException.class.getName());
vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 10000));
--
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.