You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2020/03/16 21:37:38 UTC

[geode] branch feature/GEODE-7665 updated (43bd364 -> 7b62741)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a change to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit 43bd364  GEODE-7682: add PR.clear  API (#4755)
    omit ccb543f  GEODE-7683: introduce BR.cmnClearRegion
     add a9a7d15  GEODE-7853: Remove unused field (#4777)
     add f0982cd  Revert "GEODE-7828: Convert backing store for Redis Hashes and Sets to single regions (#4745)" (#4780)
     add d5adfbe  GEODE-7849: change release urls from www.apache.org/dist/ to downloads.apache.org (#4770)
     add 69ade72  GEODE-7851: configure pulse security using java code instead of using xml (#4772)
     add ec0c1a2  GEODE-7859 Increase timeout for channel subscription (#4782)
     add 9e093c1  GEODE-7854 Update Geode Redis benchmark scripts (#4779)
     add c841359  GEODE-7727: modify sender thread to detect relese of connection (#4751)
     add 0c06008  GEODE-7828: Convert backing store for Redis Hashes and Sets to single regions (#4781)
     add 7c10396  GEODE-7642: Removes warnings about missing annotations. (#4776)
     add 791017c  Geode prefers not to use merge-commits, so do not test against them. (#4790)
     add 86defd4  Feature/geode 6536 1: modify borrowConnection singleHop solution (#4753)
     add 1259b18  GEODE-7808: standardize on use of HostAndPort for connection formation  (#4778)
     add 0713e73  GEODE-7866: Clean up geode-tcp-server module and add missing javadocs (#4791)
     add b4c3e94  GEODE-7763: Do not exportValue if no clients are waiting for result (#4771)
     add 8c320ce  GEODE-7815: add Pulse custom security profile test (#4794)
     add 5234ed1  Geode 7850: Implement Redis Rename Command (#4774)
     add 356ef6d  GEODE-7830: fix race in rebalance start (#4769)
     add 0297d2d  GEODE-7829: fixing typos (#4797)
     add f1b4230  GEODE-7862 Add tests for Redis DEL command (#4786)
     add 9a3c62d  GEODE-7804: move methods up from RebalanceResult to OperationResult (#4801)
     add eb03056  GEODE-4194: Re-ignoring a failing test that was previously ignored (#4795)
     add b75ac43  GEODE-7874 write a serialization backward-compatibility test for geode-membership (#4804)
     add a3fb386  GEODE-7879: Remove locking on HGetallExecutor (#4806)
     add 1165869  GEODE-7867 Add more unit tests to geode-tcp-server (#4796)
     add d013311  GEODE-7880: revert changes related to GEODE-6536 (#4810)
     add 0e2d4a9  GEODE-7869: New template to make warnings errors.
     add 563d485  GEODE-7869: Cleanup warnings in geode-log4j
     add bde8136  GEODE-7869: Cleanup warnings in geode-logging
     add 0138ab5  GEODE-7869: Cleanup warnings in geode-gfsh
     add 9e3c473  GEODE-7863: Reduce ServerCQImpl Contention (#4798)
     add 57cc931  GEODE-7710: Add getRedundancyZone to DistributionManager (#4809)
     add 48ea49c  GEODE-7869: Cleanup warnings in geode-http-service
     add 5abf502  GEODE-7869: Cleanup warnings in geode-common
     add f098829  GEODE-7869: Cleanup warnings in geode-concurrency-test
     add 4fb85db  GEODE-7869: Cleanup warnings in geode-connectors
     add fa9bf79  GEODE-7869: Cleanup warnings in geode-management
     add ebdae67  GEODE-7869: Cleanup warnings in geode-rebalancer
     add 0f6f28b  GEODE-7869: Cleanup warnings in geode-serialization
     add b04a072  GEODE-7869: Cleanup warnings in geode-unsafe
     add 9b269de  GEODE-7869: Cleanup warnings in geode-web
     add 9d1e682  GEODE-7869: Cleanup warnings in geode-web-api
     add 95e88d9  GEODE-7869: Cleanup warnings in geode-web-management
     add ba258cc  GEODE-7869: Cleanup warnings in static-analysis
     add 8e4601e  GEODE-7869: Cleanup warnings in geode-experimental-driver
     add feab561  GEODE-7869: Cleanup warnings in geode-assembly
     add d7a2bae  GEODE-7869: Cleanup warnings in geode-pulse
     add 3bd72e2  GEODE-7869: Cleanup warnings in geode-protobuf
     add 644c52f  GEODE-7869: Cleanup warnings in geode-protobuf-messages
     add 23701b2  GEODE-7869: Cleanup warnings in geode-redis
     add 028777b  GEODE-7869: Cleanup warnings in geode-tcp-server
     add ef533fa  GEODE-7825: improve rebalance result (#4803)
     add 4dd6a00  GEODE-7864: Resolved all javadoc errors and warning in geode-core main (#4814)
     new 7d557d5  GEODE-7683: introduce BR.cmnClearRegion
     new 7b62741  GEODE-7682: add PR.clear  API (#4755)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (43bd364)
            \
             N -- N -- N   refs/heads/feature/GEODE-7665 (7b62741)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/test/resources/expected-pom.xml            |  12 +
 .../gradle/plugins/DependencyConstraints.groovy    |   2 +
 ci/pipelines/pull-request/jinja.template.yml       |   4 +
 dev-tools/release/promote_rc.sh                    |   9 +-
 docker/Dockerfile                                  |   3 +-
 geode-assembly/build.gradle                        |   6 +
 .../cli/commands/ImportClusterConfigTest.java      |   2 +-
 .../apache/geode/metrics/CacheCommonTagsTest.java  |   2 +-
 .../cli/commands/RunOutOfMemoryFunction.java       |   6 +-
 ...StatusClusterConfigServiceCommandDUnitTest.java |   7 +-
 .../cli/commands/StopLocatorCommandDUnitTest.java  |  38 +-
 ...ClusterManagementLocatorReconnectDunitTest.java |   9 +-
 .../rest/DeploymentManagementRedployDUnitTest.java |  23 +-
 .../internal/rest/RegionManagementDunitTest.java   |  37 +-
 .../internal/rest/ServerRestartTest.java           |   7 +-
 .../rest/internal/web/RestFunctionTemplate.java    |   4 +-
 .../controllers/RestAPIsAndInterOpsDUnitTest.java  |  11 +-
 .../web/controllers/RestAPIsWithSSLDUnitTest.java  |  34 +-
 .../internal/web/RestInterfaceIntegrationTest.java |  32 +-
 .../rest/internal/web/RestRegionAPIDUnitTest.java  |  22 +-
 .../internal/web/RestSecurityIntegrationTest.java  |  54 +-
 .../web/RestSecurityPostProcessorTest.java         |  20 +-
 .../geode/tools/pulse/PulseConnectivityTest.java   |  14 -
 .../PulseSecurityConfigDefaultProfileTest.java     |  81 +++
 .../PulseSecurityConfigGemfireProfileTest.java     | 101 ++++
 .../tools/pulse/PulseSecurityWithSSLTest.java      |   4 +-
 .../integrationTest/resources/expected_jars.txt    |   1 +
 geode-common/build.gradle                          |   1 +
 .../apache/geode/internal/inet/LocalHostUtil.java  |  13 +
 geode-concurrency-test/build.gradle                |   1 +
 geode-connectors/build.gradle                      |   1 +
 .../jdbc/JdbcAsyncWriterIntegrationTest.java       |  12 +-
 .../connectors/jdbc/JdbcWriterIntegrationTest.java |  16 +-
 .../jdbc/internal/TestConfigService.java           |   5 +-
 .../cli/CreateDataSourceCommandDUnitTest.java      |  21 +-
 ...reateMappingCommandForProxyRegionDUnitTest.java |  23 +-
 .../cli/DeregisterDriverCommandDUnitTest.java      |  14 +-
 .../cli/DescribeDataSourceCommandDUnitTest.java    |  15 +-
 .../cli/DescribeMappingCommandDUnitTest.java       |  40 +-
 .../cli/DestroyDataSourceCommandDUnitTest.java     |  10 +-
 .../cli/ListDataSourceCommandDUnitTest.java        |  17 +-
 .../internal/cli/ListDriversCommandDUnitTest.java  |   9 +-
 .../internal/cli/ListMappingCommandDUnitTest.java  |  13 +-
 .../cli/RegisterDriverCommandDUnitTest.java        |  13 +-
 .../cli/JDBCConnectorFunctionsSecurityTest.java    |  20 +-
 .../geode/connectors/jdbc/JdbcAsyncWriter.java     |  37 +-
 .../apache/geode/connectors/jdbc/JdbcLoader.java   |   1 +
 .../jdbc/internal/cli/CreateMappingCommand.java    |   9 +-
 .../jdbc/internal/cli/CreateMappingFunction.java   |  11 +-
 .../internal/cli/DeregisterDriverFunction.java     |   2 +-
 .../jdbc/internal/cli/ListDriversCommand.java      |   5 +-
 .../jdbc/internal/cli/RegisterDriverFunction.java  |   2 +-
 .../geode/connectors/jdbc/JdbcLoaderTest.java      |   9 +-
 .../geode/connectors/jdbc/JdbcWriterTest.java      |  12 +-
 .../jdbc/internal/JdbcConnectorServiceTest.java    |   9 +-
 .../internal/cli/CreateDataSourceCommandTest.java  |   7 +-
 .../internal/cli/CreateMappingFunctionTest.java    |  27 +-
 ...CreateMappingPreconditionCheckFunctionTest.java |  93 ++--
 .../internal/cli/DeregisterDriverCommandTest.java  |  15 +-
 .../internal/cli/DeregisterDriverFunctionTest.java |   3 +-
 .../internal/cli/DestroyDataSourceCommandTest.java |  10 +-
 .../cli/DestroyMappingCommandFunctionTest.java     |  24 +-
 .../cli/FunctionContextArgumentProviderTest.java   |  18 +-
 .../jdbc/internal/cli/ListDriversCommandTest.java  |   1 +
 .../jdbc/internal/cli/ListDriversFunctionTest.java |   1 +
 .../jdbc/internal/cli/ListMappingCommandTest.java  |  22 +-
 .../internal/cli/RegisterDriverCommandTest.java    |   6 +-
 .../internal/cli/RegisterDriverFunctionTest.java   |   7 +-
 .../jdbc/internal/xml/ElementTypeTest.java         |   8 +-
 .../xml/JdbcConnectorServiceXmlParserTest.java     |   4 +-
 geode-core/build.gradle                            |   4 +-
 .../DestroyRegionDuringGIIDistributedTest.java     |   2 +
 .../geode/internal/tcp/CloseConnectionTest.java    |  76 +++
 .../geode/internal/tcp/TCPConduitDUnitTest.java    |   5 +-
 ...erStartupWhenAsyncDistributionTimeoutIsSet.java |  71 +++
 ...butedSystemMXBeanWithAlertsDistributedTest.java |   1 +
 .../internal/BindDistributedSystemJUnitTest.java   |   3 -
 .../query/partitioned/PRColocatedEquiJoinTest.java |  42 +-
 ...java => AnalyzeCoreSerializablesJUnitTest.java} |   2 +-
 .../RestrictUseOfInetAddressJUnitTest.java         | 272 ++++++++++
 .../geode/internal/AvailablePortJUnitTest.java     |  12 +-
 .../internal/jta/TransactionImplJUnitTest.java     |   6 +-
 .../SimpleStatSamplerIntegrationTest.java          |   2 +-
 .../main/java/org/apache/geode/SystemFailure.java  |  12 +-
 .../geode/admin/DistributedSystemConfig.java       |   2 +-
 .../admin/internal/AdminDistributedSystemImpl.java |   5 +-
 .../admin/internal/ConfigurationParameterImpl.java |   4 +-
 .../internal/DistributedSystemConfigImpl.java      |   4 +-
 .../internal/DistributionLocatorConfigImpl.java    |   2 +-
 .../internal/EnabledManagedEntityController.java   |   4 +-
 .../net => admin/internal}/InetAddressUtils.java   |  90 +--
 .../internal}/InetAddressUtilsWithLogging.java     |   4 +-
 .../admin/internal/ManagedEntityConfigImpl.java    |   4 +-
 .../geode/admin/internal/SystemMemberImpl.java     |   4 +-
 .../geode/admin/jmx/internal/AgentConfigImpl.java  |   4 +-
 .../apache/geode/admin/jmx/internal/AgentImpl.java |   2 +-
 .../jmx/internal/MX4JServerSocketFactory.java      |   2 +-
 .../java/org/apache/geode/cache/CacheWriter.java   |   2 +-
 .../java/org/apache/geode/cache/Operation.java     |   2 +-
 .../apache/geode/cache/RegionAccessException.java  |   2 +-
 .../geode/cache/RegionDistributionException.java   |   2 +-
 .../client/internal/AutoConnectionSourceImpl.java  |  12 +-
 .../geode/cache/client/internal/Connection.java    |   1 +
 .../org/apache/geode/cache/execute/Function.java   |   4 +-
 .../org/apache/geode/cache/query/IndexType.java    |   1 +
 .../internal/AbstractGroupOrRangeJunction.java     |   2 +-
 .../cache/query/internal/CompiledComparison.java   |   4 +-
 .../cache/query/internal/CqQueryVsdStats.java      |   2 +-
 .../cache/query/internal/ObjectIntHashMap.java     |   2 +-
 .../geode/cache/query/internal/PlanInfo.java       |   7 +-
 .../cache/query/internal/SortedStructSet.java      |   4 +-
 .../cache/query/internal/cq/CqServiceProvider.java |   2 +-
 .../geode/cache/query/internal/cq/ServerCQ.java    |   6 +
 .../cache/query/internal/index/HashIndexSet.java   |   2 +-
 .../query/internal/index/PartitionedIndex.java     |   2 +-
 .../apache/geode/cache/util/CqListenerAdapter.java |   2 +-
 .../apache/geode/cache/wan/GatewayReceiver.java    |   2 +-
 .../java/org/apache/geode/distributed/Locator.java |   2 +-
 .../apache/geode/distributed/LocatorLauncher.java  |   6 +-
 .../apache/geode/distributed/ServerLauncher.java   |   4 +-
 .../internal/ClusterDistributionManager.java       |   1 +
 .../distributed/internal/DistributionConfig.java   |   4 +-
 .../internal/DistributionConfigImpl.java           |   7 +-
 .../distributed/internal/DistributionImpl.java     |   4 +
 .../distributed/internal/DistributionManager.java  |   6 +-
 .../internal/FunctionExecutionPooledExecutor.java  |   4 +-
 .../distributed/internal/HealthMonitorImpl.java    |   8 +-
 .../internal/InternalDistributedSystem.java        |   3 -
 .../distributed/internal/InternalLocator.java      |  16 +-
 .../internal/LonerDistributionManager.java         |  21 +-
 .../internal/PooledExecutorWithDMStats.java        |   2 +-
 .../geode/distributed/internal/ServerLocation.java |   2 +-
 .../geode/distributed/internal/ServerLocator.java  |   4 +-
 .../geode/distributed/internal/StartupMessage.java |   4 +-
 .../internal/StartupResponseMessage.java           |   2 +-
 .../distributed/internal/direct/DirectChannel.java |   2 +-
 .../distributed/internal/locks/DLockToken.java     |   1 -
 .../membership/InternalDistributedMember.java      |   3 +-
 .../geode/examples/SimpleSecurityManager.java      |   4 +-
 .../apache/geode/internal/DistributionLocator.java |  19 -
 .../geode/internal/InternalDataSerializer.java     |   5 +-
 .../org/apache/geode/internal/SystemAdmin.java     |   4 +-
 .../org/apache/geode/internal/SystemTimer.java     |   3 -
 .../apache/geode/internal/VersionDescription.java  |   2 +-
 .../org/apache/geode/internal/admin/GemFireVM.java |  15 -
 .../internal/cache/AbstractBucketRegionQueue.java  |   6 +-
 .../apache/geode/internal/cache/AcceptHelper.java  |   3 +-
 .../apache/geode/internal/cache/BucketRegion.java  |   1 -
 .../geode/internal/cache/DistTXCommitMessage.java  |   4 +-
 .../internal/cache/DistTXPrecommitMessage.java     |   4 +-
 .../internal/cache/DistTXRollbackMessage.java      |   4 +-
 .../internal/cache/DistributedPutAllOperation.java |   2 +-
 .../cache/DistributedRemoveAllOperation.java       |   2 +-
 .../apache/geode/internal/cache/EntriesMap.java    |  10 +-
 .../geode/internal/cache/EntryExpiryTask.java      |   2 +-
 .../apache/geode/internal/cache/ExpiryTask.java    |   5 +-
 .../cache/ForceableLinkedBlockingQueue.java        |  32 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |   4 +-
 .../internal/cache/InitialImageOperation.java      |   7 +-
 .../geode/internal/cache/InternalDataView.java     |   3 -
 .../geode/internal/cache/LoaderHelperFactory.java  |   8 +-
 .../internal/cache/PartitionedRegionDataView.java  |   3 -
 .../geode/internal/cache/RegionExpiryTask.java     |   2 +-
 .../geode/internal/cache/TXBucketRegionState.java  |   3 -
 .../apache/geode/internal/cache/TXManagerImpl.java |   2 +-
 .../geode/internal/cache/TXStateInterface.java     |   3 -
 .../apache/geode/internal/cache/TXStateProxy.java  |   3 -
 .../geode/internal/cache/ValueByteWrapper.java     |   6 -
 .../cache/WrappedRegionMembershipListener.java     |   5 +-
 .../geode/internal/cache/entries/DiskEntry.java    |   2 +-
 .../cache/execute/FunctionExecutionNodePruner.java |   2 +-
 .../execute/metrics/FunctionServiceStats.java      |   2 +-
 .../internal/cache/ha/HARegionQueueStats.java      |   2 +-
 .../locks/TXLockUpdateParticipantsMessage.java     |   2 -
 .../cache/partitioned/IndexCreationMsg.java        |   2 +-
 .../PRFunctionStreamingResultCollector.java        |   2 +-
 .../cache/partitioned/RemoveIndexesMessage.java    |  17 -
 .../region/entry/RegionEntryFactoryBuilder.java    |   2 +-
 .../internal/cache/tier/sockets/AcceptorImpl.java  |  19 +-
 .../cache/tier/sockets/CacheClientNotifier.java    |  12 +-
 .../cache/tier/sockets/CacheClientProxy.java       |  16 +-
 .../cache/tier/sockets/CacheClientProxyStats.java  |   2 +-
 .../tier/sockets/ClientDataSerializerMessage.java  |   5 +-
 .../tier/sockets/ClientInstantiatorMessage.java    |   5 +-
 .../cache/tier/sockets/MessageDispatcher.java      |   5 -
 .../cache/tier/sockets/ServerConnection.java       |   6 +-
 .../internal/cache/wan/GatewaySenderEventImpl.java |   2 +-
 .../internal/cache/wan/GatewaySenderStats.java     |   2 +-
 ...currentParallelGatewaySenderEventProcessor.java |   4 +-
 ...oncurrentSerialGatewaySenderEventProcessor.java |   2 +-
 .../geode/internal/cache/xmlcache/CacheXml.java    |   6 +-
 .../internal/cache/xmlcache/CacheXmlParser.java    |   2 +-
 .../cache/xmlcache/CacheXmlPropertyResolver.java   |   2 +-
 .../xmlcache/CacheXmlPropertyResolverHelper.java   |   6 +-
 .../xmlcache/DiskStoreAttributesCreation.java      |   2 +-
 .../cache/xmlcache/ResourceManagerCreation.java    |   3 -
 .../concurrent/CompactConcurrentHashSet2.java      |   5 -
 .../datasource/ConfiguredDataSourceProperties.java |   2 +-
 .../datasource/ConnectionPoolCacheImpl.java        |   2 +-
 .../internal/datasource/DataSourceFactory.java     |   2 +-
 .../i18n/AbstractStringIdResourceBundle.java       |   2 +-
 .../geode/internal/io/CompositeOutputStream.java   |   5 +-
 .../apache/geode/internal/jndi/JNDIInvoker.java    |   2 +-
 .../geode/internal/jta/GlobalTransaction.java      |  29 +-
 .../apache/geode/internal/jta/TransactionImpl.java |  13 +-
 .../geode/internal/jta/TransactionManagerImpl.java |  20 +-
 .../geode/internal/jta/UserTransactionImpl.java    |  35 +-
 .../org/apache/geode/internal/jta/XidImpl.java     |   7 +-
 ...ketCreator.java => SCClusterSocketCreator.java} |   6 +-
 .../apache/geode/internal/net/SocketCreator.java   |  13 +-
 .../internal/offheap/MemoryAllocatorImpl.java      |   2 +-
 .../geode/internal/offheap/RefCountChangeInfo.java |   2 +-
 .../geode/internal/statistics/HostStatSampler.java |   2 +-
 .../internal/statistics/OsStatisticsProvider.java  |   7 +-
 .../internal/statistics/StatArchiveWriter.java     |   2 +-
 .../org/apache/geode/internal/tcp/Connection.java  |  38 +-
 .../apache/geode/internal/tcp/ConnectionTable.java |  14 +-
 .../org/apache/geode/internal/tcp/TCPConduit.java  |   2 +-
 .../org/apache/geode/internal/util/IOUtils.java    |   2 +-
 .../management/internal/BaseManagementService.java |   9 +
 .../geode/management/internal/ManagementAgent.java |   2 +-
 .../geode/management/internal/MemberMessenger.java |   8 +-
 .../api/LocatorClusterManagementService.java       |   9 +-
 .../management/internal/beans/MBeanAggregator.java |   2 +-
 .../management/internal/beans/MemberMBean.java     |   4 +-
 .../internal/beans/MemberMBeanBridge.java          |  84 ++-
 .../realizers/ConfigurationRealizer.java           |   1 +
 .../internal/operation/OperationManager.java       |   6 +-
 .../internal/operation/OperationState.java         |  27 +-
 .../operation/RebalanceOperationPerformer.java     |  63 ++-
 .../operation/RegionOperationStateStore.java       |  13 +-
 .../geode/management/internal/util/HostUtils.java  |   2 +-
 .../main/java/org/apache/geode/pdx/PdxWriter.java  |   3 +-
 .../apache/geode/pdx/internal/PdxReaderImpl.java   |   1 -
 .../tier/sockets/CacheClientNotifierTest.java      |  28 +
 .../api/LocatorClusterManagementServiceTest.java   |  63 ++-
 .../operation/OperationStateConcurrentTest.java    |  74 +++
 .../internal/operation/OperationStateTest.java     |  32 ++
 .../operation/RebalanceOperationPerformerTest.java | 168 ++++++
 .../operation/RegionOperationStateStoreTest.java   |  12 +-
 geode-core/src/test/resources/expected-pom.xml     |  12 +-
 .../geode/cache/query/cq/internal/CqQueryImpl.java |   2 -
 .../cache/query/cq/internal/CqServiceImpl.java     |  40 +-
 .../cache/query/cq/internal/ServerCQImpl.java      | 176 ++----
 .../query/cq/internal/ServerCQResultsCache.java    |  38 +-
 .../cq/internal/ServerCQResultsCacheNoOpImpl.java  |  68 +++
 .../ServerCQResultsCachePartitionRegionImpl.java   | 133 +++++
 .../ServerCQResultsCacheReplicateRegionImpl.java   | 176 ++++++
 .../query/cq/internal/command/ExecuteCQ61.java     |   2 +-
 .../cache/query/cq/internal/ServerCQImplTest.java  |   4 +-
 .../management/internal/cli/HeadlessGfsh.java      |  43 +-
 .../geode/test/dunit/internal/DUnitLauncher.java   |   2 +-
 .../geode/test/junit/rules/MemberStarterRule.java  |  11 +-
 geode-experimental-driver/build.gradle             |   1 +
 .../experimental/driver/AuthorizationTest.java     |   3 +-
 .../experimental/driver/IntegrationTestBase.java   |   6 +-
 .../driver/PostProcessingIntegrationTest.java      |   5 +-
 .../geode/experimental/driver/FunctionService.java |   2 +-
 .../geode/experimental/driver/ProtobufDriver.java  |  10 +-
 .../experimental/driver/ProtobufFunction.java      |  11 +-
 .../driver/ProtobufFunctionService.java            |   4 +-
 .../experimental/driver/ProtobufQueryService.java  |  18 +-
 .../geode/experimental/driver/ProtobufRegion.java  |  12 +-
 .../geode/experimental/driver/QueryService.java    |   2 +-
 .../geode/experimental/driver/ValueEncoder.java    |  10 +-
 geode-gfsh/build.gradle                            |   3 +
 .../management/internal/cli/NetstatDUnitTest.java  |  12 +-
 .../cli/commands/AlterCompressorDUnitTest.java     |  20 +-
 .../cli/commands/AlterDiskStoreDUnitTest.java      |   4 +-
 .../cli/commands/ConcurrentDeployDUnitTest.java    |  12 +-
 .../CreateAsyncEventQueueCommandDUnitTest.java     |   7 +-
 .../CreateJndiBindingCommandDUnitTest.java         |   1 +
 .../commands/CreatePooledJndiBindingDUnitTest.java |   3 +-
 .../cli/commands/CreateRegionCommandDUnitTest.java |  83 +--
 ...DeployCommandFunctionRegistrationDUnitTest.java |  13 +-
 .../commands/DeployCommandRedeployDUnitTest.java   |  13 +-
 .../DescribeJndiBindingCommandDUnitTest.java       |  11 +-
 .../cli/commands/DescribeRegionDUnitTest.java      |  32 +-
 .../commands/DestroyIndexCommandsDUnitTest.java    |  31 +-
 .../DestroyJndiBindingCommandDUnitTest.java        |   1 +
 .../DestroySecondJndiBindingCommandDUnitTest.java  |   1 +
 .../cli/commands/DiskStoreCommandsDUnitTest.java   |  28 +-
 .../commands/ExecuteFunctionCommandDUnitTest.java  |  23 +-
 .../ExecuteFunctionCommandSecurityTest.java        |   3 +
 ...ImportClusterConfigurationCommandDUnitTest.java |  29 +-
 .../IndexCommandsShareConfigurationDUnitTest.java  |   6 +-
 .../ListAsyncEventQueuesCommandDUnitTest.java      |   1 +
 .../internal/cli/commands/ListRegionDUnitTest.java |  13 +-
 .../cli/commands/RebalanceCommandDUnitTest.java    |  23 +-
 ...ersistThroughClusterConfigurationDUnitTest.java |   9 +-
 .../cli/commands/RemoveCommandDUnitTest.java       |  13 +-
 .../cli/commands/RepeatedRebalanceDUnitTest.java   |   2 +-
 .../ResumeAsyncEventQueueDispatcherDUnitTest.java  |   5 +-
 .../ShowMissingDiskStoreCommandDUnitTest.java      |  13 +-
 .../cli/commands/TestCustomIdleExpiry.java         |   4 +-
 .../internal/cli/commands/TestCustomTTLExpiry.java |   4 +-
 .../GfshParserAutoCompletionIntegrationTest.java   |   5 +-
 .../internal/cli/GfshParserConverterTest.java      |  16 +-
 .../internal/cli/HeadlessGfshIntegrationTest.java  |  16 +-
 .../AlterRegionCommandIntegrationTest.java         |  18 +-
 .../CreateRegionCommandIntegrationTest.java        | 120 ++--
 .../cli/commands/ExportDataIntegrationTest.java    |  23 +-
 .../commands/HTTPServiceSSLSupportJUnitTest.java   |  16 +-
 .../commands/HistoryCommandIntegrationTest.java    |  11 +-
 .../cli/commands/ImportDataIntegrationTest.java    |  24 +-
 .../ListDiskStoreCommandIntegrationTest.java       |   3 +-
 .../cli/commands/ListRegionIntegrationTest.java    |  11 +-
 .../cli/commands/PutCommandIntegrationTest.java    |  24 +-
 .../ShowMetricsCommandIntegrationTest.java         |  25 +-
 .../cli/domain/IndexDetailsIntegrationTest.java    |   9 +-
 .../ChangeLogLevelFunctionIntegrationTest.java     |  28 +-
 .../functions/CreateJndiBindingFunctionTest.java   |   6 +-
 .../functions/DataCommandFunctionJUnitTest.java    |  21 +-
 .../DataCommandFunctionWithPDXJUnitTest.java       |   2 +-
 .../functions/DestroyJndiBindingFunctionTest.java  |  23 +-
 .../ExportLogsFunctionIntegrationTest.java         |  19 +-
 .../internal/cli/functions/Geode3544JUnitTest.java |  10 +-
 .../cli/functions/ListJndiBindingFunctionTest.java |  11 +-
 .../SizeExportLogsFunctionIntegrationTest.java     |  19 +-
 .../result/model/ResultModelIntegrationTest.java   |   8 +-
 .../cli/util/LogExporterFileIntegrationTest.java   |  24 +-
 .../cli/util/LogExporterIntegrationTest.java       |   8 +-
 .../geode/management/cli/CommandService.java       |   6 +-
 .../apache/geode/management/cli/GfshCommand.java   |  15 +-
 .../geode/management/internal/cli/CliUtil.java     |   5 +-
 .../geode/management/internal/cli/Launcher.java    |  18 +-
 .../cli/commands/AlterRuntimeConfigCommand.java    |   1 +
 .../cli/commands/ChangeLogLevelCommand.java        |   7 +-
 .../cli/commands/CloseDurableCQsCommand.java       |   1 +
 .../cli/commands/CloseDurableClientCommand.java    |   1 +
 .../internal/cli/commands/ConnectCommand.java      |   1 +
 .../cli/commands/CountDurableCQEventsCommand.java  |   1 +
 .../cli/commands/CreateDefinedIndexesCommand.java  |   1 +
 .../cli/commands/CreateGatewaySenderCommand.java   |   3 +-
 .../internal/cli/commands/CreateIndexCommand.java  |   6 +-
 .../internal/cli/commands/DataCommandsUtils.java   |  10 +-
 .../internal/cli/commands/DefineIndexCommand.java  |   6 +-
 .../internal/cli/commands/DeployCommand.java       |  33 +-
 .../cli/commands/DescribeDiskStoreCommand.java     |   1 +
 .../commands/DescribeOfflineDiskStoreCommand.java  |   6 +-
 .../commands/DestroyAsyncEventQueueCommand.java    |  12 +-
 .../cli/commands/DestroyFunctionCommand.java       |  18 +-
 .../internal/cli/commands/DestroyIndexCommand.java |   5 +-
 .../cli/commands/DestroyRegionCommand.java         |   8 +-
 .../internal/cli/commands/ExportConfigCommand.java |   3 +-
 .../internal/cli/commands/ExportDataCommand.java   |   6 +-
 .../internal/cli/commands/ExportLogsCommand.java   |  25 +-
 .../commands/ExportOfflineDiskStoreCommand.java    |   6 +-
 .../cli/commands/ExportStackTraceCommand.java      |   1 +
 .../internal/cli/commands/GCCommand.java           |   6 +-
 .../ImportClusterConfigurationCommand.java         |  15 +-
 .../internal/cli/commands/ImportDataCommand.java   |   6 +-
 .../internal/cli/commands/IndexDefinition.java     |   5 +-
 .../cli/commands/ListAsyncEventQueuesCommand.java  |  12 +-
 .../internal/cli/commands/ListDeployedCommand.java |   1 +
 .../cli/commands/ListDurableClientCQsCommand.java  |   1 +
 .../internal/cli/commands/ListFunctionCommand.java |   1 +
 .../internal/cli/commands/ListIndexCommand.java    |   9 +-
 .../cli/commands/ListJndiBindingCommand.java       |  17 +-
 .../internal/cli/commands/QueryCommand.java        |  11 +-
 .../internal/cli/commands/ShowMetricsCommand.java  |   6 +-
 .../cli/commands/StartGatewaySenderCommand.java    |   6 +-
 .../internal/cli/commands/StartLocatorCommand.java |  20 +-
 .../internal/cli/commands/StartServerCommand.java  |   9 +-
 .../StatusClusterConfigServiceCommand.java         |   2 +-
 .../internal/cli/commands/UndeployCommand.java     |   1 +
 .../cli/commands/lifecycle/StopLocatorCommand.java |   5 +-
 .../cli/commands/lifecycle/StopServerCommand.java  |   6 +-
 .../cli/converters/IndexTypeConverter.java         |  15 +-
 .../internal/cli/domain/IndexDetails.java          |  53 +-
 .../cli/functions/AlterQueryServiceFunction.java   |   2 +
 .../cli/functions/AlterRuntimeConfigFunction.java  |  40 +-
 .../cli/functions/ChangeLogLevelFunction.java      |   6 +-
 .../cli/functions/CloseDurableClientFunction.java  |   6 +-
 .../cli/functions/CloseDurableCqFunction.java      |   6 +-
 .../cli/functions/ContinuousQueryFunction.java     |  20 +-
 .../functions/CreateAsyncEventQueueFunction.java   |  21 +-
 .../functions/CreateDefinedIndexesFunction.java    |   9 +-
 .../cli/functions/CreateIndexFunction.java         |   7 +-
 .../cli/functions/CreateJndiBindingFunction.java   |   4 +-
 .../cli/functions/DataCommandFunction.java         |  29 +-
 .../internal/cli/functions/DeployFunction.java     |  14 +-
 .../cli/functions/DescribeDiskStoreFunction.java   |  15 +-
 .../functions/DestroyAsyncEventQueueFunction.java  |  22 +-
 .../cli/functions/DestroyDiskStoreFunction.java    |  11 +-
 .../functions/DestroyGatewayReceiverFunction.java  |   4 +-
 .../cli/functions/ExportConfigFunction.java        |   6 +-
 .../internal/cli/functions/ExportDataFunction.java |  12 +-
 .../internal/cli/functions/ExportLogsFunction.java |  19 +-
 .../FetchSharedConfigurationStatusFunction.java    |   4 +-
 .../cli/functions/GarbageCollectionFunction.java   |   4 +-
 .../functions/GatewayReceiverCreateFunction.java   |   6 +-
 .../cli/functions/GatewaySenderCreateFunction.java |  13 +-
 .../functions/GatewaySenderDestroyFunction.java    |   8 +-
 .../GetMemberConfigInformationFunction.java        |  21 +-
 .../functions/GetRegionDescriptionFunction.java    |   6 +-
 .../cli/functions/GetStackTracesFunction.java      |   4 +-
 .../GetSubscriptionQueueSizeFunction.java          |   6 +-
 .../internal/cli/functions/ImportDataFunction.java |  12 +-
 .../cli/functions/ListDeployedFunction.java        |   4 +-
 .../cli/functions/ListDiskStoresFunction.java      |   7 +-
 .../cli/functions/ListDurableCqNamesFunction.java  |   7 +-
 .../cli/functions/ListFunctionFunction.java        |   9 +-
 .../internal/cli/functions/ListIndexFunction.java  |   7 +-
 .../cli/functions/ListJndiBindingFunction.java     |  12 +-
 .../internal/cli/functions/NetstatFunction.java    |  22 +-
 .../cli/functions/RegionAlterFunction.java         |  10 +-
 .../cli/functions/RegionCreateFunction.java        |   9 +-
 .../cli/functions/RegionDestroyFunction.java       |  11 +-
 .../internal/cli/functions/RegionFunctionArgs.java |   5 +-
 .../functions/ShowMissingDiskStoresFunction.java   |   8 +-
 .../internal/cli/functions/ShutDownFunction.java   |   6 +-
 .../cli/functions/SizeExportLogsFunction.java      |  17 +-
 .../internal/cli/functions/UndeployFunction.java   |   9 +-
 .../internal/cli/functions/UnregisterFunction.java |   6 +-
 .../cli/functions/UserFunctionExecution.java       |  25 +-
 .../internal/cli/remote/CommandExecutor.java       |  13 +-
 .../internal/cli/remote/CommandStatementImpl.java  |   6 +-
 .../internal/cli/remote/MemberCommandService.java  |  12 +-
 .../geode/management/internal/cli/shell/Gfsh.java  |  16 +-
 .../internal/cli/shell/MXBeanProvider.java         |   7 +-
 .../internal/cli/util/DiskStoreCompacter.java      |  23 +-
 .../internal/cli/util/DiskStoreUpgrader.java       |  32 +-
 .../internal/cli/util/ExportLogsCacheWriter.java   |  22 +-
 .../management/internal/cli/util/JdkTool.java      |   3 +-
 .../internal/cli/util/ReadWriteFile.java           |  64 ++-
 .../sanctioned-geode-gfsh-serializables.txt        |   1 +
 .../internal/cli/commands/ConnectCommandTest.java  |  66 ++-
 .../commands/CreateAsyncEventQueueCommandTest.java |   6 +-
 .../commands/CreateDefinedIndexesCommandTest.java  |  36 +-
 .../cli/commands/CreateIndexCommandTest.java       |  36 +-
 .../cli/commands/CreateJndiBindingCommandTest.java |   7 +-
 .../cli/commands/CreateRegionCommandTest.java      |  41 +-
 .../DestroyAsyncEventQueueCommandTest.java         |  27 +-
 .../DestroyGatewayReceiverCommandTest.java         |  10 +-
 .../commands/DestroyGatewaySenderCommandTest.java  |  19 +-
 .../commands/DestroyJndiBindingCommandTest.java    |  11 +-
 .../cli/commands/DestroyRegionCommandTest.java     |   8 +-
 .../cli/commands/ExportLogsCommandTest.java        |  32 +-
 .../cli/commands/ListAsyncEventQueuesTest.java     |   8 +-
 .../cli/commands/ListGatewayCommandTest.java       |  16 +-
 ...ResumeAsyncEventQueueDispatcherCommandTest.java |   4 +-
 .../cli/converters/IndexTypeConverterTest.java     |  39 +-
 .../cli/functions/CliFunctionResultTest.java       |  12 +-
 .../CreateDefinedIndexesFunctionTest.java          |  43 +-
 .../DescribeDiskStoreFunctionJUnitTest.java        | 184 +++++--
 .../DestroyAsyncEventQueueFunctionTest.java        |  14 +-
 .../DestroyGatewayReceiverFunctionTest.java        |   8 +-
 .../GatewayReceiverCreateFunctionTest.java         |   6 +-
 .../GatewaySenderDestroyFunctionTest.java          |  11 +-
 .../functions/ListDiskStoresFunctionJUnitTest.java |   5 +-
 .../cli/functions/ListIndexFunctionJUnitTest.java  |  25 +-
 .../cli/functions/RegionAlterFunctionTest.java     |  20 +-
 .../functions/RegionCreateFunctionJUnitTest.java   |   2 +
 .../cli/functions/RegionDestroyFunctionTest.java   |  26 +-
 .../ShowMissingDiskStoresFunctionTest.java         |  11 +-
 .../cli/remote/MemberCommandServiceTest.java       |   1 +
 .../cli/remote/OnlineCommandProcessorTest.java     |  16 +-
 .../cli/result/model/TabularResultModelTest.java   |   2 +-
 .../cli/shell/GfshExecutionStrategyTest.java       |   2 +-
 .../cli/util/ExportLogsCacheWriterTest.java        |   3 +-
 .../web/http/support/HttpRequesterTest.java        |   1 +
 geode-http-service/build.gradle                    |   1 +
 .../geode/internal/cache/InternalHttpService.java  |  15 +-
 .../AnalyzeDataSerializablesJUnitTestBase.java     | 322 +++++++++++
 .../AnalyzeSerializablesJUnitTestBase.java         | 430 ++++-----------
 .../geode/codeAnalysis/decode/CompiledClass.java   | 116 ++--
 .../geode/codeAnalysis/decode/CompiledMethod.java  |  16 +
 .../geode/codeAnalysis/decode/cp/CpFieldref.java   |   8 +
 .../geode/codeAnalysis/decode/cp/CpMethodref.java  |   1 +
 .../codeAnalysis/decode/cp/CpNameAndType.java      |  10 +
 .../geode/test/junit/rules/ClassAnalysisRule.java  | 142 +++++
 .../org/apache/geode/test/version/TestVersion.java |  29 +-
 geode-log4j/build.gradle                           |   1 +
 .../impl/AlertAppenderIntegrationTest.java         |   1 +
 .../AlertListenerRegistrationIntegrationTest.java  |   1 +
 ...mWithBothLogWriterAppendersIntegrationTest.java |   9 +-
 ...edSystemWithLogLevelChangesIntegrationTest.java |   1 +
 ...reVerboseMarkerFilterAcceptIntegrationTest.java |   1 +
 ...fireVerboseMarkerFilterDenyIntegrationTest.java |   1 +
 ...deVerboseMarkerFilterAcceptIntegrationTest.java |   1 +
 ...eodeVerboseMarkerFilterDenyIntegrationTest.java |   1 +
 ...oggingWithDistributedSystemIntegrationTest.java |   6 +-
 .../log4j/internal/impl/AlertAppender.java         |   3 +-
 .../log4j/internal/impl/GeodeConsoleAppender.java  |  13 +-
 .../log4j/internal/impl/LogWriterAppender.java     |   3 +-
 geode-logging/build.gradle                         |   1 +
 .../apache/geode/logging/internal/OSProcess.java   |  33 +-
 geode-lucene/build.gradle                          |   5 +-
 geode-management/build.gradle                      |   9 +
 .../AnalyzeManagementSerializablesJUnitTest.java   |   5 -
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   0
 .../org/apache/geode/codeAnalysis/openBugs.txt     |   0
 .../codeAnalysis/sanctionedDataSerializables.txt   |   0
 .../api/ClusterManagementListResult.java           |  15 +-
 ...tTemplateClusterManagementServiceTransport.java |   4 +-
 .../internal/api/BaseManagementServiceBuilder.java |  22 +-
 .../geode/management/runtime/OperationResult.java  |  15 +-
 .../geode/management/runtime/RebalanceResult.java  |  11 -
 .../sanctioned-geode-management-serializables.txt  |   2 +
 .../api/ClusterManagementGetResultTest.java        |   1 +
 .../api/ClusterManagementListResultTest.java       |   4 +-
 .../api/ClusterManagementOperationResultTest.java  |   6 +-
 .../geode/management/api/EntityInfoTest.java       |   1 +
 ...plateClusterManagementServiceTransportTest.java |  18 +-
 .../ClusterManagementServiceBuilderTest.java       |   3 +-
 .../ClientClusterManagementServiceTest.java        |   4 +
 .../AnalyzeMembershipSerializablesJUnitTest.java   |  68 +++
 .../locator/GMSLocatorRecoveryIntegrationTest.java |   2 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 .../org/apache/geode/codeAnalysis/openBugs.txt     |   0
 .../codeAnalysis/sanctionedDataSerializables.txt   |  72 +++
 .../internal/membership/api/MembershipConfig.java  |   4 +-
 .../internal/membership/api/MembershipLocator.java |   2 +-
 .../gms/locator/MembershipLocatorImpl.java         |  11 +-
 geode-protobuf-messages/build.gradle               |   1 +
 geode-protobuf/build.gradle                        |   1 +
 .../v1/ValueSerializerIntegrationTest.java         |   9 +-
 .../protobuf/security/SecureCacheImpl.java         |  35 +-
 .../security/SecureFunctionServiceImpl.java        |  27 +-
 .../statistics/ProtobufClientStatistics.java       |  42 +-
 .../protocol/protobuf/v1/ProtobufOpsProcessor.java |  26 +-
 .../OqlQueryRequestOperationHandler.java           |   4 +-
 .../protocol/protobuf/v1/state/RequireVersion.java |   7 +-
 .../protobuf/security/SecureCacheImplTest.java     |  41 +-
 .../security/SecureFunctionServiceImplTest.java    |  37 +-
 .../ClearRequestOperationHandlerJUnitTest.java     |  21 +-
 ...ionOnGroupRequestOperationHandlerJUnitTest.java |  45 +-
 ...onOnMemberRequestOperationHandlerJUnitTest.java |  58 +-
 ...onOnRegionRequestOperationHandlerJUnitTest.java |  43 +-
 .../GetAllRequestOperationHandlerJUnitTest.java    |  79 +--
 ...egionNamesRequestOperationHandlerJUnitTest.java |  52 +-
 .../GetRequestOperationHandlerJUnitTest.java       |  65 +--
 .../GetServerOperationHandlerJUnitTest.java        |  54 +-
 .../GetSizeRequestOperationHandlerJUnitTest.java   |  19 +-
 .../KeySetOperationHandlerJUnitTest.java           |  26 +-
 .../v1/operations/OperationHandlerJUnitTest.java   |   4 +-
 ...ueryRequestOperationHandlerIntegrationTest.java |  51 +-
 .../OqlQueryRequestOperationHandlerJUnitTest.java  |  11 +-
 .../PutAllRequestOperationHandlerJUnitTest.java    |  74 +--
 .../PutRequestOperationHandlerJUnitTest.java       |  28 +-
 .../RemoveRequestOperationHandlerJUnitTest.java    |  57 +-
 .../codec/JsonPdxConverterJUnitTest.java           |  16 +-
 geode-pulse/build.gradle                           |  20 +-
 geode-pulse/geode-pulse-test/build.gradle          |   1 +
 .../controllers/PulseControllerJUnitTest.java      | 185 +++----
 .../pulse/security/CustomSecurityConfigTest.java   |  70 +++
 .../pulse/security/DefaultSecurityConfigTest.java  |  70 +++
 .../resources/pulse-authentication-custom.xml      |  38 ++
 .../internal/controllers/PulseController.java      |  95 ++--
 .../geode/tools/pulse/internal/data/Cluster.java   | 602 ++++++++++-----------
 .../tools/pulse/internal/data/DataBrowser.java     |   4 +-
 .../tools/pulse/internal/data/JMXDataUpdater.java  | 304 +++++------
 .../internal/security/CustomSecurityConfig.java    |  41 ++
 .../internal/security/DefaultSecurityConfig.java   | 108 ++++
 .../internal/security/GemfireSecurityConfig.java   |  43 ++
 .../service/ClusterDiskThroughputService.java      |   4 +-
 .../internal/service/ClusterGCPausesService.java   |   2 +-
 .../service/ClusterKeyStatisticsService.java       |   6 +-
 .../internal/service/ClusterMemberService.java     |  14 +-
 .../service/ClusterMembersRGraphService.java       | 148 +++--
 .../service/ClusterMemoryUsageService.java         |   2 +-
 .../internal/service/ClusterRegionService.java     |  38 +-
 .../internal/service/ClusterRegionsService.java    |  47 +-
 .../service/ClusterSelectedRegionService.java      |  31 +-
 .../ClusterSelectedRegionsMemberService.java       |  31 +-
 .../internal/service/ClusterWANInfoService.java    |   4 +-
 .../service/MemberAsynchEventQueuesService.java    |   2 +-
 .../internal/service/MemberClientsService.java     |  16 +-
 .../service/MemberDiskThroughputService.java       |   6 +-
 .../internal/service/MemberGCPausesService.java    |   4 +-
 .../internal/service/MemberGatewayHubService.java  |  11 +-
 .../internal/service/MemberHeapUsageService.java   |   4 +-
 .../service/MemberKeyStatisticsService.java        |   8 +-
 .../internal/service/MemberRegionsService.java     |  28 +-
 .../pulse/internal/service/MembersListService.java |   2 +-
 .../internal/service/QueryStatisticsService.java   |  90 +--
 .../internal/service/SystemAlertsService.java      |  14 +-
 ...vc-dispatcher-servlet.xml => pulse-servlet.xml} |   3 +-
 .../src/main/webapp/WEB-INF/spring-security.xml    | 108 ----
 geode-pulse/src/main/webapp/WEB-INF/web.xml        |  19 +-
 .../ClusterSelectedRegionsMemberServiceTest.java   |  55 +-
 .../geode/tools/pulse/tests/ui/PulseBase.java      | 117 +---
 geode-rebalancer/build.gradle                      |   1 +
 geode-redis/README.md                              |  81 +--
 geode-redis/build.gradle                           |   1 +
 .../org/apache/geode/redis/PubSubDUnitTest.java    |   2 +-
 .../org/apache/geode/redis/RedisDistDUnitTest.java |  37 +-
 .../redis/ConcurrentStartIntegrationTest.java      |  31 +-
 .../apache/geode/redis/HashesIntegrationTest.java  |  60 +-
 .../java/org/apache/geode/redis/RenameTest.java    | 187 +++++++
 .../org/apache/geode/redis/GeodeRedisServer.java   | 149 +++--
 .../redis/internal/ExecutionHandlerContext.java    |  42 +-
 .../java/org/apache/geode/redis/internal/Pair.java |  30 -
 .../apache/geode/redis/internal/PubSubImpl.java    |  11 +-
 .../geode/redis/internal/RedisCommandType.java     |  25 +
 .../geode/redis/internal/RedisLockService.java     |   7 +-
 .../redis/internal/RedisLockServiceMBean.java      |  33 ++
 .../geode/redis/internal/RegionProvider.java       | 142 +++--
 .../redis/internal/executor/AuthExecutor.java      |   6 +-
 .../redis/internal/executor/RenameExecutor.java    |  89 +++
 .../internal/executor/hash/HGetAllExecutor.java    |  34 +-
 .../redis/internal/executor/hash/HashExecutor.java |   2 +-
 .../internal/executor/list/LIndexExecutor.java     |   6 +-
 .../redis/internal/executor/list/LLenExecutor.java |   8 +-
 .../internal/executor/list/LRangeExecutor.java     |  13 +-
 .../redis/internal/executor/list/LRemExecutor.java |  11 +-
 .../redis/internal/executor/list/LSetExecutor.java |  13 +-
 .../internal/executor/list/LTrimExecutor.java      |  15 +-
 .../redis/internal/executor/list/ListExecutor.java |  20 +-
 .../redis/internal/executor/list/PopExecutor.java  |   8 +-
 .../redis/internal/executor/list/PushExecutor.java |   4 +-
 .../internal/executor/list/PushXExecutor.java      |   4 +-
 .../redis/internal/executor/set/SetExecutor.java   |   2 +-
 .../executor/sortedset/GeoSortedSetExecutor.java   |   1 +
 .../executor/sortedset/ZRangeByLexExecutor.java    |   9 +-
 .../executor/sortedset/ZRangeByScoreExecutor.java  |  14 +-
 .../internal/executor/string/GetExecutor.java      |   6 +-
 .../internal/executor/string/StringExecutor.java   |   2 +-
 .../sanctioned-geode-redis-serializables.txt       |   1 +
 geode-redis/src/performanceTest/aggregator.sh      |  91 ----
 geode-redis/src/performanceTest/benchmark.sh       | 115 ++--
 .../{benchmark.sh => environment-setup.sh}         |  12 +-
 .../{shacompare.sh => execute-operation.sh}        |  52 +-
 geode-redis/src/performanceTest/shacompare.sh      |   6 +-
 .../src/performanceTest/summarize-batch-results.sh |  72 +++
 .../performanceTest/summarize-operation-results.sh |  61 +++
 .../internal/ExecutionHandlerContextJUnitTest.java |   6 +-
 .../internal/executor/string/DelExecutorTest.java  |  71 +++
 .../string/StringSetExecutorJUnitTest.java         |   4 +-
 geode-serialization/build.gradle                   |   1 +
 .../serialization/BufferDataOutputStream.java      | 333 ++++++------
 .../internal/serialization/ByteArrayDataInput.java | 134 ++---
 .../internal/serialization/DSFIDSerializer.java    |   2 +-
 .../serialization/StaticSerialization.java         |  14 +-
 .../internal/DSFIDSerializerImpl.java              |  22 +-
 geode-tcp-server/build.gradle                      |   1 +
 .../tcpserver/TcpServerGossipVersionDUnitTest.java | 162 ------
 .../TcpServerProductVersionDUnitTest.java          |  52 +-
 .../tcpserver/AdvancedSocketCreatorImpl.java       |   6 +-
 .../tcpserver/ClientSocketCreatorImpl.java         |   4 +
 ...atorImpl.java => ClusterSocketCreatorImpl.java} |   8 +-
 .../internal/tcpserver/ConnectionWatcher.java      |   5 +-
 .../internal/tcpserver/HostAndPort.java            |  27 +-
 .../internal/tcpserver/InfoRequest.java            |   4 +-
 .../internal/tcpserver/InfoResponse.java           |   5 +-
 .../internal/tcpserver/ProtocolChecker.java        |   6 +
 .../internal/tcpserver/ShutdownRequest.java        |   3 +-
 .../internal/tcpserver/ShutdownResponse.java       |   4 +-
 .../distributed/internal/tcpserver/TcpClient.java  |  26 +-
 .../distributed/internal/tcpserver/TcpHandler.java |   4 +
 .../distributed/internal/tcpserver/TcpServer.java  | 109 ++--
 .../internal/tcpserver/TcpSocketCreatorImpl.java   |   6 +-
 .../internal/tcpserver/VersionRequest.java         |   3 +-
 .../internal/tcpserver/VersionResponse.java        |   4 +-
 .../{ConnectionWatcher.java => package-info.java}  |  32 +-
 .../internal/tcpserver/HostAndPortTest.java        |  94 +++-
 .../tcpserver/TcpServerDependenciesTest.java       |   3 +
 .../internal/tcpserver/TcpServerJUnitTest.java     | 231 +++-----
 geode-unsafe/build.gradle                          |   1 +
 .../WANHostNameVerificationDistributedTest.java    |   6 +-
 .../internal/cache/wan/GatewayReceiverImpl.java    |   2 +-
 .../cache/wan/GatewayReceiverImplTest.java         |   2 +-
 geode-web-api/build.gradle                         |   1 +
 .../web/controllers/AddFreeItemToOrders.java       |  45 +-
 .../web/controllers/EchoArgumentFunction.java      |   4 +-
 .../controllers/GetOrderDescriptionFunction.java   |  23 +-
 .../geode/rest/internal/web/controllers/Order.java |  39 +-
 .../web/controllers/RestAccessControllerTest.java  |  21 +-
 .../web/controllers/AbstractBaseController.java    |  54 +-
 .../web/controllers/CommonCrudController.java      |  19 +-
 .../web/controllers/FunctionAccessController.java  | 122 +++--
 .../web/controllers/PdxBasedCrudController.java    |  12 +-
 .../web/controllers/QueryAccessController.java     |   9 +-
 .../web/controllers/support/RegionData.java        |  13 +-
 .../support/RestServersResultCollector.java        |   6 +-
 .../geode/rest/internal/web/util/ArrayUtils.java   |  11 +-
 .../geode/rest/internal/web/util/JSONUtils.java    |  39 +-
 geode-web-management/build.gradle                  |   1 +
 .../internal/rest/RebalanceIntegrationTest.java    |   2 +-
 .../rest/security/RestSecurityConfiguration.java   |   2 +-
 geode-web/build.gradle                             |   1 +
 .../cli/commands/ConnectCommandWithSSLTest.java    | 109 ++--
 ...xecuteFunctionCommandWithSecurityDUnitTest.java |   6 +-
 .../ExportLogsStatsOverHttpDistributedTest.java    |   8 +-
 ...rverSerializableObjectHttpMessageConverter.java |   2 +-
 .../build.gradle => gradle/warnings.gradle         |   5 +-
 static-analysis/build.gradle                       |   1 +
 static-analysis/pmd-rules/build.gradle             |   1 +
 689 files changed, 8738 insertions(+), 6182 deletions(-)
 create mode 100644 geode-assembly/src/integrationTest/java/org/apache/geode/tools/pulse/PulseSecurityConfigDefaultProfileTest.java
 create mode 100644 geode-assembly/src/integrationTest/java/org/apache/geode/tools/pulse/PulseSecurityConfigGemfireProfileTest.java
 create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
 create mode 100644 geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TestServerStartupWhenAsyncDistributionTimeoutIsSet.java
 rename geode-core/src/integrationTest/java/org/apache/geode/codeAnalysis/{AnalyzeSerializablesJUnitTest.java => AnalyzeCoreSerializablesJUnitTest.java} (92%)
 create mode 100644 geode-core/src/integrationTest/java/org/apache/geode/codeAnalysis/RestrictUseOfInetAddressJUnitTest.java
 rename geode-core/src/main/java/org/apache/geode/{internal/net => admin/internal}/InetAddressUtils.java (63%)
 rename geode-core/src/main/java/org/apache/geode/{internal/net => admin/internal}/InetAddressUtilsWithLogging.java (95%)
 rename geode-core/src/main/java/org/apache/geode/internal/net/{SCServerSocketCreator.java => SCClusterSocketCreator.java} (95%)
 create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/operation/OperationStateConcurrentTest.java
 create mode 100644 geode-core/src/test/java/org/apache/geode/management/internal/operation/RebalanceOperationPerformerTest.java
 copy geode-core/src/main/java/org/apache/geode/cache/query/internal/PlanInfo.java => geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCache.java (54%)
 create mode 100644 geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheNoOpImpl.java
 create mode 100644 geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCachePartitionRegionImpl.java
 create mode 100644 geode-cq/src/main/java/org/apache/geode/cache/query/cq/internal/ServerCQResultsCacheReplicateRegionImpl.java
 create mode 100644 geode-junit/src/main/java/org/apache/geode/codeAnalysis/AnalyzeDataSerializablesJUnitTestBase.java
 create mode 100644 geode-junit/src/main/java/org/apache/geode/test/junit/rules/ClassAnalysisRule.java
 rename {geode-core => geode-management}/src/integrationTest/java/org/apache/geode/codeAnalysis/AnalyzeManagementSerializablesJUnitTest.java (85%)
 copy {geode-memcached => geode-management}/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt (100%)
 copy {geode-web-api => geode-management}/src/integrationTest/resources/org/apache/geode/codeAnalysis/openBugs.txt (100%)
 copy {geode-web-api => geode-management}/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt (100%)
 rename {geode-core => geode-management}/src/main/resources/org/apache/geode/internal/sanctioned-geode-management-serializables.txt (94%)
 create mode 100644 geode-membership/src/integrationTest/java/org/apache/geode/codeAnalysis/AnalyzeMembershipSerializablesJUnitTest.java
 create mode 100644 geode-membership/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
 copy {geode-web-api => geode-membership}/src/integrationTest/resources/org/apache/geode/codeAnalysis/openBugs.txt (100%)
 create mode 100644 geode-membership/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 create mode 100644 geode-pulse/src/integrationTest/java/org/apache/geode/tools/pulse/security/CustomSecurityConfigTest.java
 create mode 100644 geode-pulse/src/integrationTest/java/org/apache/geode/tools/pulse/security/DefaultSecurityConfigTest.java
 create mode 100644 geode-pulse/src/integrationTest/resources/pulse-authentication-custom.xml
 create mode 100644 geode-pulse/src/main/java/org/apache/geode/tools/pulse/internal/security/CustomSecurityConfig.java
 create mode 100644 geode-pulse/src/main/java/org/apache/geode/tools/pulse/internal/security/DefaultSecurityConfig.java
 create mode 100644 geode-pulse/src/main/java/org/apache/geode/tools/pulse/internal/security/GemfireSecurityConfig.java
 rename geode-pulse/src/main/webapp/WEB-INF/{mvc-dispatcher-servlet.xml => pulse-servlet.xml} (99%)
 delete mode 100644 geode-pulse/src/main/webapp/WEB-INF/spring-security.xml
 create mode 100644 geode-redis/src/integrationTest/java/org/apache/geode/redis/RenameTest.java
 delete mode 100644 geode-redis/src/main/java/org/apache/geode/redis/internal/Pair.java
 create mode 100644 geode-redis/src/main/java/org/apache/geode/redis/internal/RedisLockServiceMBean.java
 create mode 100644 geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RenameExecutor.java
 delete mode 100644 geode-redis/src/performanceTest/aggregator.sh
 mode change 100644 => 100755 geode-redis/src/performanceTest/benchmark.sh
 copy geode-redis/src/performanceTest/{benchmark.sh => environment-setup.sh} (88%)
 mode change 100644 => 100755
 copy geode-redis/src/performanceTest/{shacompare.sh => execute-operation.sh} (50%)
 mode change 100644 => 100755
 mode change 100644 => 100755 geode-redis/src/performanceTest/shacompare.sh
 create mode 100755 geode-redis/src/performanceTest/summarize-batch-results.sh
 create mode 100755 geode-redis/src/performanceTest/summarize-operation-results.sh
 create mode 100644 geode-redis/src/test/java/org/apache/geode/redis/internal/executor/string/DelExecutorTest.java
 delete mode 100644 geode-tcp-server/src/distributedTest/java/org/apache/geode/distributed/internal/tcpserver/TcpServerGossipVersionDUnitTest.java
 rename geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/{ServerSocketCreatorImpl.java => ClusterSocketCreatorImpl.java} (91%)
 copy geode-tcp-server/src/main/java/org/apache/geode/distributed/internal/tcpserver/{ConnectionWatcher.java => package-info.java} (55%)
 mode change 100755 => 100644
 rename geode-tcp-server/src/{distributedTest => test}/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java (52%)
 copy static-analysis/build.gradle => gradle/warnings.gradle (87%)


[geode] 01/02: GEODE-7683: introduce BR.cmnClearRegion

Posted by nn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7d557d5f96165e533fa62121d44b868186ae0566
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Mon Jan 27 17:02:48 2020 -0800

    GEODE-7683: introduce BR.cmnClearRegion
    
    Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
    
    GEODE-7684: Create messaging class for PR Clear (#4689)
    
    * Added new message class and test
    
    Co-authored-by: Benjamin Ross <br...@pivotal.io>
    Co-authored-by: Donal Evans <do...@pivotal.io>
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |   8 +
 .../apache/geode/internal/cache/BucketRegion.java  |  38 +-
 .../geode/internal/cache/DistributedRegion.java    |  23 +-
 .../internal/cache/partitioned/ClearPRMessage.java | 388 +++++++++++++++++++++
 .../internal/cache/BucketRegionJUnitTest.java      |  77 ++++
 .../internal/cache/DistributedRegionJUnitTest.java |  18 +
 .../cache/partitioned/ClearPRMessageTest.java      | 288 +++++++++++++++
 .../serialization/DataSerializableFixedID.java     |   3 +
 8 files changed, 832 insertions(+), 11 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 3076db7..d6806f2 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1435,6 +1435,14 @@ org/apache/geode/internal/cache/partitioned/BucketSizeMessage$BucketSizeReplyMes
 fromData,27
 toData,27
 
+org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
+fromData,30
+toData,44
+
+org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
+fromData,17
+toData,17
+
 org/apache/geode/internal/cache/partitioned/ColocatedRegionDetails,2
 fromData,81
 toData,133
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index d91786f..e4fa7ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -557,6 +557,36 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     }
   }
 
+  @Override
+  public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
+    if (!getBucketAdvisor().isPrimary()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Not primary bucket when doing clear, do nothing");
+      }
+      return;
+    }
+
+    boolean enableRVV = useRVV && getConcurrencyChecksEnabled();
+    RegionVersionVector rvv = null;
+    if (enableRVV) {
+      rvv = getVersionVector().getCloneForTransmission();
+    }
+
+    // get rvvLock
+    Set<InternalDistributedMember> participants =
+        getCacheDistributionAdvisor().adviseInvalidateRegion();
+    try {
+      obtainWriteLocksForClear(regionEvent, participants);
+      // no need to dominate my own rvv.
+      // Clear is on going here, there won't be GII for this member
+      clearRegionLocally(regionEvent, cacheWrite, null);
+      distributeClearOperation(regionEvent, rvv, participants);
+
+      // TODO: call reindexUserDataRegion if there're lucene indexes
+    } finally {
+      releaseWriteLocksForClear(regionEvent, participants);
+    }
+  }
 
   long generateTailKey() {
     long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets());
@@ -2093,11 +2123,9 @@ public class BucketRegion extends DistributedRegion implements Bucket {
       // if GII has failed, because there is not primary. So it's safe to set these
       // counters to 0.
       oldMemValue = bytesInMemory.getAndSet(0);
-    }
-
-    else {
-      throw new InternalGemFireError(
-          "Trying to clear a bucket region that was not destroyed or in initialization.");
+    } else {
+      // BucketRegion's clear is supported now
+      oldMemValue = bytesInMemory.getAndSet(0);
     }
     if (oldMemValue != BUCKET_DESTROYED) {
       partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index dd73b20..1465eef 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2003,6 +2003,10 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     super.basicClear(regionEvent, cacheWrite);
   }
 
+  void distributeClearOperation(RegionEventImpl regionEvent, RegionVersionVector rvv,
+      Set<InternalDistributedMember> participants) {
+    DistributedClearOperation.clear(regionEvent, rvv, participants);
+  }
 
   @Override
   void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) {
@@ -2025,7 +2029,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
             obtainWriteLocksForClear(regionEvent, participants);
             clearRegionLocally(regionEvent, cacheWrite, null);
             if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) {
-              DistributedClearOperation.clear(regionEvent, null, participants);
+              distributeClearOperation(regionEvent, null, participants);
             }
           } finally {
             releaseWriteLocksForClear(regionEvent, participants);
@@ -2081,10 +2085,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   /**
    * obtain locks preventing generation of new versions in other members
    */
-  private void obtainWriteLocksForClear(RegionEventImpl regionEvent,
+  protected void obtainWriteLocksForClear(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
     lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent);
-    DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+    if (!isUsedForPartitionedRegionBucket()) {
+      DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants);
+    }
   }
 
   /**
@@ -2121,7 +2127,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   /**
    * releases the locks obtained in obtainWriteLocksForClear
    */
-  private void releaseWriteLocksForClear(RegionEventImpl regionEvent,
+  protected void releaseWriteLocksForClear(RegionEventImpl regionEvent,
       Set<InternalDistributedMember> participants) {
 
     ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook();
@@ -2129,8 +2135,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
       armLockTestHook.beforeRelease(this, regionEvent);
     }
 
-    getVersionVector().unlockForClear(getMyId());
-    DistributedClearOperation.releaseLocks(regionEvent, participants);
+    RegionVersionVector rvv = getVersionVector();
+    if (rvv != null) {
+      rvv.unlockForClear(getMyId());
+    }
+    if (!isUsedForPartitionedRegionBucket()) {
+      DistributedClearOperation.releaseLocks(regionEvent, participants);
+    }
 
     if (armLockTestHook != null) {
       armLockTestHook.afterRelease(this, regionEvent);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
new file mode 100644
index 0000000..1a8aba1
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.partitioned;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DirectReplyProcessor;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Assert;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.NanoTimer;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.internal.cache.RegionEventImpl;
+import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+
+public class ClearPRMessage extends PartitionMessageWithDirectReply {
+  private static final Logger logger = LogService.getLogger();
+
+  private RegionEventImpl regionEvent;
+
+  private Integer bucketId;
+
+  /** The time in ms to wait for a lock to be obtained during doLocalClear() */
+  public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
+  public static final String BUCKET_NON_PRIMARY_MESSAGE =
+      "The bucket region on target member is no longer primary";
+  public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE =
+      "A lock for the bucket region could not be obtained.";
+  public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
+      "An exception was thrown during the local clear operation: ";
+
+  /**
+   * state from operateOnRegion that must be preserved for transmission from the waiting pool
+   */
+  transient boolean result = false;
+
+  /**
+   * Empty constructor to satisfy {@link DataSerializer}requirements
+   */
+  public ClearPRMessage() {}
+
+  public ClearPRMessage(int bucketId) {
+    this.bucketId = bucketId;
+
+    // These are both used by the parent class, but don't apply to this message type
+    this.notificationOnly = false;
+    this.posDup = false;
+  }
+
+  public void setRegionEvent(RegionEventImpl event) {
+    regionEvent = event;
+  }
+
+  public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
+      DirectReplyProcessor replyProcessor) {
+    this.resetRecipients();
+    if (recipients != null) {
+      setRecipients(recipients);
+    }
+    this.regionId = region.getPRId();
+    this.processor = replyProcessor;
+    this.processorId = replyProcessor == null ? 0 : replyProcessor.getProcessorId();
+    if (replyProcessor != null) {
+      replyProcessor.enableSevereAlertProcessing();
+    }
+  }
+
+  @Override
+  public boolean isSevereAlertCompatible() {
+    // allow forced-disconnect processing for all cache op messages
+    return true;
+  }
+
+  public RegionEventImpl getRegionEvent() {
+    return regionEvent;
+  }
+
+  public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
+      throws ForceReattemptException {
+    Set<InternalDistributedMember> recipients =
+        Collections.singleton((InternalDistributedMember) recipient);
+    ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients);
+    initMessage(region, recipients, clearResponse);
+    if (logger.isDebugEnabled()) {
+      logger.debug("ClearPRMessage.send: recipient is {}, msg is {}", recipient, this);
+    }
+
+    Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
+    if (failures != null && failures.size() > 0) {
+      throw new ForceReattemptException("Failed sending <" + this + ">");
+    }
+    return clearResponse;
+  }
+
+  @Override
+  public int getDSFID() {
+    return PR_CLEAR_MESSAGE;
+  }
+
+  @Override
+  public void toData(DataOutput out, SerializationContext context) throws IOException {
+    super.toData(out, context);
+    if (bucketId == null) {
+      InternalDataSerializer.writeSignedVL(-1, out);
+    } else {
+      InternalDataSerializer.writeSignedVL(bucketId, out);
+    }
+    DataSerializer.writeObject(regionEvent, out);
+  }
+
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+    super.fromData(in, context);
+    this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
+    this.regionEvent = DataSerializer.readObject(in);
+  }
+
+  @Override
+  public EventID getEventID() {
+    return regionEvent.getEventId();
+  }
+
+  /**
+   * This method is called upon receipt and make the desired changes to the PartitionedRegion Note:
+   * It is very important that this message does NOT cause any deadlocks as the sender will wait
+   * indefinitely for the acknowledgement
+   */
+  @Override
+  @VisibleForTesting
+  protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
+      PartitionedRegion region, long startTime) {
+    try {
+      result = doLocalClear(region);
+    } catch (ForceReattemptException ex) {
+      sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
+          startTime);
+      return false;
+    }
+    sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime);
+    return false;
+  }
+
+  public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException {
+    // Retrieve local bucket region which matches target bucketId
+    BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId);
+
+    // Check if we are primary, throw exception if not
+    if (!bucketRegion.isPrimary()) {
+      throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
+    }
+
+    DistributedLockService lockService = getPartitionRegionLockService();
+    String lockName = bucketRegion.getFullPath();
+    try {
+      boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
+
+      if (!locked) {
+        throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
+      }
+
+      // Double check if we are still primary, as this could have changed between our first check
+      // and obtaining the lock
+      if (!bucketRegion.isPrimary()) {
+        throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
+      }
+
+      try {
+        bucketRegion.cmnClearRegion(regionEvent, true, true);
+      } catch (Exception ex) {
+        throw new ForceReattemptException(
+            EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
+      }
+
+    } finally {
+      lockService.unlock(lockName);
+    }
+
+    return true;
+  }
+
+  // Extracted for testing
+  protected DistributedLockService getPartitionRegionLockService() {
+    return DistributedLockService
+        .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+  }
+
+  @Override
+  public boolean canStartRemoteTransaction() {
+    return false;
+  }
+
+  @Override
+  protected void sendReply(InternalDistributedMember member, int processorId,
+      DistributionManager distributionManager, ReplyException ex,
+      PartitionedRegion partitionedRegion, long startTime) {
+    if (partitionedRegion != null) {
+      if (startTime > 0) {
+        partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+      }
+    }
+    ClearReplyMessage.send(member, processorId, getReplySender(distributionManager), this.result,
+        ex);
+  }
+
+  @Override
+  protected void appendFields(StringBuilder buff) {
+    super.appendFields(buff);
+    buff.append("; bucketId=").append(this.bucketId);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buff = new StringBuilder();
+    String className = getClass().getName();
+    buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
+    buff.append("(prid="); // make sure this is the first one
+    buff.append(this.regionId);
+
+    // Append name, if we have it
+    String name = null;
+    try {
+      PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId);
+      if (region != null) {
+        name = region.getFullPath();
+      }
+    } catch (Exception ignore) {
+      /* ignored */
+    }
+    if (name != null) {
+      buff.append(" (name = \"").append(name).append("\")");
+    }
+
+    appendFields(buff);
+    buff.append(" ,distTx=");
+    buff.append(this.isTransactionDistributed);
+    buff.append(")");
+    return buff.toString();
+  }
+
+  public static class ClearReplyMessage extends ReplyMessage {
+    /** Result of the Clear operation */
+    boolean result;
+
+    @Override
+    public boolean getInlineProcess() {
+      return true;
+    }
+
+    /**
+     * Empty constructor to conform to DataSerializable interface
+     */
+    @SuppressWarnings("unused")
+    public ClearReplyMessage() {}
+
+    private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
+      super();
+      this.result = result;
+      setProcessorId(processorId);
+      setException(ex);
+    }
+
+    /** Send an ack */
+    public static void send(InternalDistributedMember recipient, int processorId,
+        ReplySender replySender,
+        boolean result, ReplyException ex) {
+      Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message");
+      ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
+      message.setRecipient(recipient);
+      replySender.putOutgoing(message);
+    }
+
+    /**
+     * Processes this message. This method is invoked by the receiver of the message.
+     *
+     * @param distributionManager the distribution manager that is processing the message.
+     */
+    @Override
+    public void process(final DistributionManager distributionManager,
+        final ReplyProcessor21 replyProcessor) {
+      final long startTime = getTimestamp();
+      if (replyProcessor == null) {
+        if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+          logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
+        }
+        return;
+      }
+      if (replyProcessor instanceof ClearResponse) {
+        ((ClearResponse) replyProcessor).setResponse(this);
+      }
+      replyProcessor.process(this);
+
+      if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+        logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", replyProcessor, this);
+      }
+      distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
+    }
+
+    @Override
+    public int getDSFID() {
+      return PR_CLEAR_REPLY_MESSAGE;
+    }
+
+    @Override
+    public void fromData(DataInput in,
+        DeserializationContext context) throws IOException, ClassNotFoundException {
+      super.fromData(in, context);
+      this.result = in.readBoolean();
+    }
+
+    @Override
+    public void toData(DataOutput out,
+        SerializationContext context) throws IOException {
+      super.toData(out, context);
+      out.writeBoolean(this.result);
+    }
+
+    @Override
+    public String toString() {
+      return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result
+          + " exception=" + getException();
+    }
+  }
+
+  /**
+   * A processor to capture the value returned by {@link ClearPRMessage}
+   */
+  public static class ClearResponse extends PartitionResponse {
+    private volatile boolean returnValue;
+
+    public ClearResponse(InternalDistributedSystem distributedSystem,
+        Set<InternalDistributedMember> recipients) {
+      super(distributedSystem, recipients, false);
+    }
+
+    public void setResponse(ClearReplyMessage response) {
+      this.returnValue = response.result;
+    }
+
+    /**
+     * @return the result of the remote clear operation
+     * @throws ForceReattemptException if the peer is no longer available
+     * @throws CacheException if the peer generates an error
+     */
+    public boolean waitForResult() throws CacheException, ForceReattemptException {
+      waitForCacheException();
+      return this.returnValue;
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
index 72e6657..c7cf5a6 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java
@@ -14,7 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.anyLong;
@@ -31,7 +33,10 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.junit.Test;
+
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.statistics.StatisticsClock;
 
 public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
@@ -128,4 +133,76 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest {
     }
   }
 
+  @Test
+  public void cmnClearRegionWillDoNothingIfNotPrimary() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    RegionVersionVector rvv = mock(RegionVersionVector.class);
+    doReturn(rvv).when(region).getVersionVector();
+    doReturn(ba).when(region).getBucketAdvisor();
+    when(ba.isPrimary()).thenReturn(false);
+    region.cmnClearRegion(event, true, true);
+    verify(region, never()).clearRegionLocally(eq(event), eq(true), eq(rvv));
+  }
+
+  @Test
+  public void cmnClearRegionCalledOnPrimary() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    RegionVersionVector rvv = mock(RegionVersionVector.class);
+    doReturn(rvv).when(region).getVersionVector();
+    doReturn(true).when(region).getConcurrencyChecksEnabled();
+    doReturn(ba).when(region).getBucketAdvisor();
+    doNothing().when(region).distributeClearOperation(any(), any(), any());
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).clearRegionLocally(event, true, null);
+    when(ba.isPrimary()).thenReturn(true);
+    region.cmnClearRegion(event, true, true);
+    verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+  }
+
+  @Test
+  public void clearWillUseNullAsRVVWhenConcurrencyCheckDisabled() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    BucketAdvisor ba = mock(BucketAdvisor.class);
+    doReturn(false).when(region).getConcurrencyChecksEnabled();
+    doReturn(ba).when(region).getBucketAdvisor();
+    doNothing().when(region).distributeClearOperation(any(), any(), any());
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    doNothing().when(region).clearRegionLocally(event, true, null);
+    when(ba.isPrimary()).thenReturn(true);
+    region.cmnClearRegion(event, true, true);
+    verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null));
+  }
+
+  @Test
+  public void obtainWriteLocksForClearInBRShouldNotDistribute() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    doNothing().when(region).lockLocallyForClear(any(), any(), any());
+    region.obtainWriteLocksForClear(event, null);
+    assertTrue(region.isUsedForPartitionedRegionBucket());
+  }
+
+  @Test
+  public void updateSizeToZeroOnClearBucketRegion() {
+    RegionEventImpl event = createClearRegionEvent();
+    BucketRegion region = (BucketRegion) event.getRegion();
+    PartitionedRegion pr = region.getPartitionedRegion();
+    PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class);
+    PartitionedRegionStats prStats = mock(PartitionedRegionStats.class);
+    when(pr.getPrStats()).thenReturn(prStats);
+    doNothing().when(prStats).incDataStoreEntryCount(anyInt());
+    doNothing().when(prds).updateMemoryStats(anyInt());
+    when(pr.getDataStore()).thenReturn(prds);
+    region.updateSizeOnCreate("key1", 20);
+    long sizeBeforeClear = region.getTotalBytes();
+    assertEquals(20, sizeBeforeClear);
+    region.updateSizeOnClearRegion((int) sizeBeforeClear);
+    long sizeAfterClear = region.getTotalBytes();
+    assertEquals(0, sizeAfterClear);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
index 9fbd8fc..ca53ced 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.cache;
 
+import static org.apache.geode.internal.Assert.fail;
 import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertFalse;
@@ -53,6 +54,14 @@ public class DistributedRegionJUnitTest
   @Override
   protected void setInternalRegionArguments(InternalRegionArguments ira) {}
 
+  protected RegionEventImpl createClearRegionEvent() {
+    DistributedRegion region = prepare(true, true);
+    DistributedMember member = mock(DistributedMember.class);
+    RegionEventImpl regionEvent = new RegionEventImpl(region, Operation.REGION_CLEAR, null, false,
+        member, true);
+    return regionEvent;
+  }
+
   @Override
   protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled,
       RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache,
@@ -246,4 +255,13 @@ public class DistributedRegionJUnitTest
     region.basicBridgeReplace("key1", "value1", false, null, client, true, clientEvent);
     assertThat(clientEvent.getVersionTag().equals(tag));
   }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void localClearIsNotSupportedOnReplicatedRegion() {
+    RegionEventImpl event = createClearRegionEvent();
+    DistributedRegion region = (DistributedRegion) event.getRegion();
+    region.basicLocalClear(event);
+    fail("Expect UnsupportedOperationException");
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
new file mode 100644
index 0000000..2cf5231
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplySender;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.PartitionedRegionStats;
+
+public class ClearPRMessageTest {
+
+  ClearPRMessage message;
+  PartitionedRegion region;
+  PartitionedRegionDataStore dataStore;
+  BucketRegion bucketRegion;
+
+  @Before
+  public void setup() throws ForceReattemptException {
+    message = spy(new ClearPRMessage());
+    region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+    dataStore = mock(PartitionedRegionDataStore.class);
+    when(region.getDataStore()).thenReturn(dataStore);
+    bucketRegion = mock(BucketRegion.class);
+    when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
+  }
+
+  @Test
+  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
+    when(bucketRegion.isPrimary()).thenReturn(false);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+  }
+
+  @Test
+  public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+    when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false);
+    when(bucketRegion.isPrimary()).thenReturn(true);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
+  }
+
+  @Test
+  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+    // Be primary on the first check, then be not primary on the second check
+    when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
+    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+    // Confirm that we actually obtained and released the lock
+    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
+    verify(mockLockService, times(1)).unlock(any());
+  }
+
+  @Test
+  public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+    NullPointerException exception = new NullPointerException("Error encountered");
+    doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+
+    // Be primary on the first check, then be not primary on the second check
+    when(bucketRegion.isPrimary()).thenReturn(true);
+    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+
+    assertThatThrownBy(() -> message.doLocalClear(region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
+
+    // Confirm that cmnClearRegion was called
+    verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+  }
+
+  @Test
+  public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
+      throws ForceReattemptException {
+    DistributedLockService mockLockService = mock(DistributedLockService.class);
+    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+
+
+    // Be primary on the first check, then be not primary on the second check
+    when(bucketRegion.isPrimary()).thenReturn(true);
+    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    assertThat(message.doLocalClear(region)).isTrue();
+
+    // Confirm that cmnClearRegion was called
+    verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
+
+    // Confirm that we actually obtained and released the lock
+    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
+    verify(mockLockService, times(1)).unlock(any());
+  }
+
+  @Test
+  public void initMessageSetsReplyProcessorCorrectlyWithDefinedReplyProcessor() {
+    InternalDistributedMember sender = mock(InternalDistributedMember.class);
+
+    Set<InternalDistributedMember> recipients = new HashSet<>();
+    recipients.add(sender);
+
+    ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
+    int mockProcessorId = 5;
+    when(mockProcessor.getProcessorId()).thenReturn(mockProcessorId);
+
+    message.initMessage(region, recipients, mockProcessor);
+
+    verify(mockProcessor, times(1)).enableSevereAlertProcessing();
+    assertThat(message.getProcessorId()).isEqualTo(mockProcessorId);
+  }
+
+  @Test
+  public void initMessageSetsProcessorIdToZeroWithNullProcessor() {
+    message.initMessage(region, null, null);
+
+    assertThat(message.getProcessorId()).isEqualTo(0);
+  }
+
+  @Test
+  public void sendThrowsExceptionIfPutOutgoingMethodReturnsNonNullSetOfFailures() {
+    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    when(region.getDistributionManager()).thenReturn(distributionManager);
+
+    doNothing().when(message).initMessage(any(), any(), any());
+    Set<InternalDistributedMember> failures = new HashSet<>();
+    failures.add(recipient);
+
+    when(distributionManager.putOutgoing(message)).thenReturn(failures);
+
+    assertThatThrownBy(() -> message.send(recipient, region))
+        .isInstanceOf(ForceReattemptException.class)
+        .hasMessageContaining("Failed sending <" + message + ">");
+  }
+
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  @Test
+  public void operateOnPartitionedRegionCallsSendReplyWithNoExceptionWhenDoLocalClearSucceeds()
+      throws ForceReattemptException {
+    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+    InternalDistributedMember sender = mock(InternalDistributedMember.class);
+    int processorId = 1000;
+    int startTime = 0;
+
+    doReturn(true).when(message).doLocalClear(region);
+    doReturn(sender).when(message).getSender();
+    doReturn(processorId).when(message).getProcessorId();
+
+    // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
+    // do nothing and verify later that it was called with proper input
+    doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
+
+    message.operateOnPartitionedRegion(distributionManager, region, startTime);
+
+    verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region,
+        startTime);
+  }
+
+  @SuppressWarnings("ResultOfMethodCallIgnored")
+  @Test
+  public void operateOnPartitionedRegionCallsSendReplyWithExceptionWhenDoLocalClearFailsWithException()
+      throws ForceReattemptException {
+    ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class);
+    InternalDistributedMember sender = mock(InternalDistributedMember.class);
+    int processorId = 1000;
+    int startTime = 0;
+    ForceReattemptException exception =
+        new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+    doThrow(exception).when(message).doLocalClear(region);
+    doReturn(sender).when(message).getSender();
+    doReturn(processorId).when(message).getProcessorId();
+
+    // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to
+    // do nothing and verify later that it was called with proper input
+    doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
+
+    message.operateOnPartitionedRegion(distributionManager, region, startTime);
+
+    verify(message, times(1)).sendReply(any(), anyInt(), any(), notNull(), any(), anyLong());
+  }
+
+  @Test
+  public void sendReplyEndsMessageProcessingIfWeHaveARegionAndHaveStartedProcessing() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+    PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
+    when(region.getPrStats()).thenReturn(partitionedRegionStats);
+
+    int processorId = 1000;
+    int startTime = 10000;
+    ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+    ReplySender replySender = mock(ReplySender.class);
+    doReturn(replySender).when(message).getReplySender(distributionManager);
+
+    message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
+
+    verify(partitionedRegionStats, times(1)).endPartitionMessagesProcessing(startTime);
+  }
+
+  @Test
+  public void sendReplyDoesNotEndMessageProcessingIfStartTimeIsZero() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    InternalDistributedMember recipient = mock(InternalDistributedMember.class);
+    PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class);
+    when(region.getPrStats()).thenReturn(partitionedRegionStats);
+
+    int processorId = 1000;
+    int startTime = 0;
+    ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+
+    ReplySender replySender = mock(ReplySender.class);
+    doReturn(replySender).when(message).getReplySender(distributionManager);
+
+    message.sendReply(recipient, processorId, distributionManager, exception, region, startTime);
+
+    verify(partitionedRegionStats, times(0)).endPartitionMessagesProcessing(startTime);
+  }
+
+  @Test
+  public void clearReplyMessageProcessCallsSetResponseIfReplyProcessorIsInstanceOfClearResponse() {
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    DMStats mockStats = mock(DMStats.class);
+    when(distributionManager.getStats()).thenReturn(mockStats);
+    ClearPRMessage.ClearReplyMessage clearReplyMessage = new ClearPRMessage.ClearReplyMessage();
+    ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class);
+
+    clearReplyMessage.process(distributionManager, mockProcessor);
+
+    verify(mockProcessor, times(1)).setResponse(clearReplyMessage);
+  }
+}
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index e00dd64..0f5ee97 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -57,6 +57,9 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
   // NOTE, codes < -65536 will take 4 bytes to serialize
   // NOTE, codes < -128 will take 2 bytes to serialize
 
+  short PR_CLEAR_REPLY_MESSAGE = -161;
+  short PR_CLEAR_MESSAGE = -160;
+
   short CREATE_REGION_MESSAGE_LUCENE = -159;
   short FINAL_CHECK_PASSED_MESSAGE = -158;
   short NETWORK_PARTITION_MESSAGE = -157;


[geode] 02/02: GEODE-7682: add PR.clear API (#4755)

Posted by nn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7b62741483b7a71c5e9af2f88d37a43c32290748
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Thu Mar 5 23:46:36 2020 -0800

    GEODE-7682: add PR.clear  API (#4755)
    
    * GEODE-7683: introduce BR.cmnClearRegion
    
    Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
 .../cache/PartitionedRegionClearDUnitTest.java     | 218 +++++++++++++++++++++
 .../PartitionedRegionPersistentClearDUnitTest.java |  26 +++
 ...itionedRegionSingleNodeOperationsJUnitTest.java |  66 -------
 .../codeAnalysis/sanctionedDataSerializables.txt   |   6 +-
 .../org/apache/geode/internal/DSFIDFactory.java    |   3 +
 .../geode/internal/cache/DistributedRegion.java    |   9 -
 .../apache/geode/internal/cache/LocalRegion.java   |  10 +
 .../geode/internal/cache/PartitionedRegion.java    | 214 ++++++++++++++++++--
 .../geode/internal/cache/RegionEventImpl.java      |   5 +
 .../internal/cache/partitioned/ClearPRMessage.java | 166 +++++-----------
 .../cache/partitioned/ClearPRMessageTest.java      |  50 ++---
 11 files changed, 523 insertions(+), 250 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
new file mode 100644
index 0000000..fb2a81b
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionClearDUnitTest implements Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 1000;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+  protected ClientVM client1, client2;
+
+  private static final Logger logger = LogManager.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+    dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+    dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+    accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+    client1 = cluster.startClientVM(5,
+        c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(6,
+        c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    dataStore3.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+  }
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT;
+  }
+
+  protected Properties getProperties() {
+    Properties properties = new Properties();
+    properties.setProperty("log-level", "info");
+    return properties;
+  }
+
+  private Region getRegion(boolean isClient) {
+    if (isClient) {
+      return getClientCache().getRegion(REGION_NAME);
+    } else {
+      return getCache().getRegion(REGION_NAME);
+    }
+  }
+
+  private void verifyRegionSize(boolean isClient, int expectedNum) {
+    assertThat(getRegion(isClient).size()).isEqualTo(expectedNum);
+  }
+
+  private void initClientCache() {
+    Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+        .create(REGION_NAME);
+    region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+  }
+
+  private void initDataStore() {
+    getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    RegionShortcut shortcut = getRegionShortCut();
+    if (shortcut.isPersistent()) {
+      if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION;
+      } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_OVERFLOW;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT;
+      } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) {
+        shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+      } else {
+        fail("Wrong region type:" + shortcut);
+      }
+    }
+    getCache().createRegionFactory(shortcut)
+        .setPartitionAttributes(
+            new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .addCacheListener(new CountingCacheListener())
+        .create(REGION_NAME);
+  }
+
+  private void feed(boolean isClient) {
+    Region region = getRegion(isClient);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+  }
+
+  private void verifyServerRegionSize(int expectedNum) {
+    accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+    dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+  }
+
+  private void verifyClientRegionSize(int expectedNum) {
+    client1.invoke(() -> verifyRegionSize(true, expectedNum));
+    // TODO: notify register clients
+    // client2.invoke(()->verifyRegionSize(true, expectedNum));
+  }
+
+  private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+    SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+      CountingCacheListener countingCacheListener =
+          (CountingCacheListener) getRegion(false).getAttributes()
+              .getCacheListeners()[0];
+      return countingCacheListener.getClears();
+    };
+
+    int count = accessor.invoke(getListenerTriggerCount)
+        + dataStore1.invoke(getListenerTriggerCount)
+        + dataStore2.invoke(getListenerTriggerCount)
+        + dataStore3.invoke(getListenerTriggerCount);
+    assertThat(count).isEqualTo(1);
+
+    if (serverVM != null) {
+      assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+    }
+  }
+
+  @Test
+  public void normalClearFromDataStore() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    dataStore1.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(dataStore1);
+  }
+
+  @Test
+  public void normalClearFromAccessor() {
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+    verifyCacheListenerTriggerCount(accessor);
+  }
+
+  @Test
+  public void normalClearFromClient() {
+    client1.invoke(() -> feed(true));
+    verifyClientRegionSize(NUM_ENTRIES);
+    verifyServerRegionSize(NUM_ENTRIES);
+
+    client1.invoke(() -> getRegion(true).clear());
+    verifyServerRegionSize(0);
+    verifyClientRegionSize(0);
+    verifyCacheListenerTriggerCount(null);
+  }
+
+  private static class CountingCacheListener extends CacheListenerAdapter {
+    private final AtomicInteger clears = new AtomicInteger();
+
+    @Override
+    public void afterRegionClear(RegionEvent event) {
+      Region region = event.getRegion();
+      logger.info("Region " + region.getFullPath() + " is cleared.");
+      clears.incrementAndGet();
+    }
+
+    int getClears() {
+      return clears.get();
+    }
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
new file mode 100644
index 0000000..847699b
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+
+import org.apache.geode.cache.RegionShortcut;
+
+public class PartitionedRegionPersistentClearDUnitTest extends PartitionedRegionClearDUnitTest {
+
+  protected RegionShortcut getRegionShortCut() {
+    return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+  }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
index e8de2b5..e311ad4 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -1297,71 +1296,6 @@ public class PartitionedRegionSingleNodeOperationsJUnitTest {
     }
   }
 
-  @Test
-  public void test023UnsupportedOps() throws Exception {
-    Region pr = null;
-    try {
-      pr = PartitionedRegionTestHelper.createPartitionedRegion("testUnsupportedOps",
-          String.valueOf(200), 0);
-
-      pr.put(new Integer(1), "one");
-      pr.put(new Integer(2), "two");
-      pr.put(new Integer(3), "three");
-      pr.getEntry("key");
-
-      try {
-        pr.clear();
-        fail(
-            "PartitionedRegionSingleNodeOperationTest:testUnSupportedOps() operation failed on a blank PartitionedRegion");
-      } catch (UnsupportedOperationException expected) {
-      }
-
-      // try {
-      // pr.entries(true);
-      // fail();
-      // }
-      // catch (UnsupportedOperationException expected) {
-      // }
-
-      // try {
-      // pr.entrySet(true);
-      // fail();
-      // }
-      // catch (UnsupportedOperationException expected) {
-      // }
-
-      try {
-        HashMap data = new HashMap();
-        data.put("foo", "bar");
-        data.put("bing", "bam");
-        data.put("supper", "hero");
-        pr.putAll(data);
-        // fail("testPutAll() does NOT throw UnsupportedOperationException");
-      } catch (UnsupportedOperationException onse) {
-      }
-
-
-      // try {
-      // pr.values();
-      // fail("testValues() does NOT throw UnsupportedOperationException");
-      // }
-      // catch (UnsupportedOperationException expected) {
-      // }
-
-
-      try {
-        pr.containsValue("foo");
-      } catch (UnsupportedOperationException ex) {
-        fail("PartitionedRegionSingleNodeOperationTest:testContainsValue() operation failed");
-      }
-
-    } finally {
-      if (pr != null) {
-        pr.destroyRegion();
-      }
-    }
-  }
-
   /**
    * This method validates size operations. It verifies that it returns correct size of the
    * PartitionedRegion.
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index d6806f2..cacc46b 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1436,8 +1436,8 @@ fromData,27
 toData,27
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
-fromData,30
-toData,44
+fromData,19
+toData,36
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
 fromData,17
@@ -2101,4 +2101,4 @@ toData,105
 
 org/apache/geode/pdx/internal/PdxType,2
 fromData,109
-toData,124
\ No newline at end of file
+toData,124
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 993ad3e..51b350c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -285,6 +285,7 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
 import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage;
 import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
@@ -973,6 +974,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
     serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
         GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
     serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+    serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class);
+    serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class);
     serializer.registerDSFID(HOST_AND_PORT, HostAndPort.class);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 1465eef..5c8d264 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -189,10 +189,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
   @MutableForTesting
   public static boolean ignoreReconnect = false;
 
-  /**
-   * Lock to prevent multiple threads on this member from performing a clear at the same time.
-   */
-  private final Object clearLock = new Object();
   private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true);
 
   @MakeNotStatic
@@ -927,11 +923,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
     }
   }
 
-  private void lockCheckReadiness() {
-    cache.getCancelCriterion().checkCancelInProgress(null);
-    checkReadiness();
-  }
-
   @Override
   Object validatedDestroy(Object key, EntryEventImpl event)
       throws TimeoutException, EntryNotFoundException, CacheWriterException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 6a7e2d2..d5f9156 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -473,6 +473,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   private final Lock clientMetaDataLock = new ReentrantLock();
 
   /**
+   * Lock to prevent multiple threads on this member from performing a clear at the same time.
+   */
+  protected final Object clearLock = new Object();
+
+  /**
    * Lock for updating the cache service profile for the region.
    */
   private final Lock cacheServiceProfileUpdateLock = new ReentrantLock();
@@ -2750,6 +2755,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     checkRegionDestroyed(true);
   }
 
+  protected void lockCheckReadiness() {
+    cache.getCancelCriterion().checkCancelInProgress(null);
+    checkReadiness();
+  }
+
   /**
    * This method should be called when the caller cannot locate an entry and that condition is
    * unexpected. This will first double check the cache and region state before throwing an
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 2c1ec04..9f61202 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -179,6 +179,7 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
 import org.apache.geode.internal.cache.partitioned.DestroyMessage;
@@ -2144,18 +2145,202 @@ public class PartitionedRegion extends LocalRegion
     throw new UnsupportedOperationException();
   }
 
-  /**
-   * @since GemFire 5.0
-   * @throws UnsupportedOperationException OVERRIDES
-   */
   @Override
-  public void clear() {
-    throw new UnsupportedOperationException();
+  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    synchronized (clearLock) {
+      final DistributedLockService lockService = getPartitionedRegionLockService();
+      try {
+        lockService.lock("_clearOperation" + this.getFullPath().replace('/', '_'), -1, -1);
+      } catch (IllegalStateException e) {
+        lockCheckReadiness();
+        throw e;
+      }
+      try {
+        if (cache.isCacheAtShutdownAll()) {
+          throw cache.getCacheClosedException("Cache is shutting down");
+        }
+
+        // create ClearPRMessage per bucket
+        List<ClearPRMessage> clearMsgList = createClearPRMessages();
+        for (ClearPRMessage clearPRMessage : clearMsgList) {
+          int bucketId = clearPRMessage.getBucketId();
+          checkReadiness();
+          long sendMessagesStartTime = 0;
+          if (isDebugEnabled) {
+            sendMessagesStartTime = System.currentTimeMillis();
+          }
+          try {
+            sendClearMsgByBucket(bucketId, clearPRMessage);
+          } catch (PartitionOfflineException poe) {
+            // TODO add a PartialResultException
+            logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket "
+                + bucketId, poe);
+          } catch (Exception e) {
+            logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e);
+          }
+
+          if (isDebugEnabled) {
+            long now = System.currentTimeMillis();
+            logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId,
+                (now - sendMessagesStartTime));
+          }
+          // TODO add psStats
+        }
+      } finally {
+        try {
+          lockService.unlock("_clearOperation" + this.getFullPath().replace('/', '_'));
+        } catch (IllegalStateException e) {
+          lockCheckReadiness();
+        }
+      }
+
+      // notify bridge clients at PR level
+      regionEvent.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
+      boolean hasListener = hasListener();
+      if (hasListener) {
+        dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
+      }
+      notifyBridgeClients(regionEvent);
+      logger.info("Partitioned region {} finsihed clear operation.", this.getFullPath());
+    }
   }
 
-  @Override
-  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
-    throw new UnsupportedOperationException();
+  void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) {
+    RetryTimeKeeper retryTime = null;
+    InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
+    if (logger.isDebugEnabled()) {
+      logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId,
+          currentTarget);
+    }
+
+    long timeOut = 0;
+    int count = 0;
+    while (true) {
+      switch (count) {
+        case 0:
+          // Note we don't check for DM cancellation in common case.
+          // First time. Assume success, keep going.
+          break;
+        case 1:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // Second time (first failure). Calculate timeout and keep going.
+          timeOut = System.currentTimeMillis() + this.retryTimeout;
+          break;
+        default:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // test for timeout
+          long timeLeft = timeOut - System.currentTimeMillis();
+          if (timeLeft < 0) {
+            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId,
+                this.retryTimeout);
+            // NOTREACHED
+          }
+
+          // Didn't time out. Sleep a bit and then continue
+          boolean interrupted = Thread.interrupted();
+          try {
+            Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
+          } catch (InterruptedException ignore) {
+            interrupted = true;
+          } finally {
+            if (interrupted) {
+              Thread.currentThread().interrupt();
+            }
+          }
+          break;
+      } // switch
+      count++;
+
+      if (currentTarget == null) { // pick target
+        checkReadiness();
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+
+        currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false);
+        if (currentTarget == null) {
+          // the bucket does not exist, no need to clear
+          logger.info("Bucket " + bucketId + " does not contain data, no need to clear");
+          return;
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget);
+          }
+        }
+
+        // It's possible this is a GemFire thread e.g. ServerConnection
+        // which got to this point because of a distributed system shutdown or
+        // region closure which uses interrupt to break any sleep() or wait() calls
+        // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception
+        checkShutdown();
+        continue;
+      } // pick target
+
+      boolean result = false;
+      try {
+        final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId());
+        if (isLocal) {
+          result = clearPRMessage.doLocalClear(this);
+        } else {
+          ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this);
+          if (response != null) {
+            this.prStats.incPartitionMessagesSent();
+            result = response.waitForResult();
+          }
+        }
+        if (result) {
+          return;
+        }
+      } catch (ForceReattemptException fre) {
+        checkReadiness();
+        InternalDistributedMember lastTarget = currentTarget;
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+        currentTarget = getNodeForBucketWrite(bucketId, retryTime);
+        if (lastTarget.equals(currentTarget)) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}",
+                currentTarget, fre.getMessage());
+          }
+          if (retryTime.overMaximum()) {
+            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket",
+                this.retryTimeout);
+            // NOTREACHED
+          }
+          retryTime.waitToRetryNode();
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: Old target was {}, Retrying {}", lastTarget,
+                currentTarget);
+          }
+        }
+      }
+
+      // It's possible this is a GemFire thread e.g. ServerConnection
+      // which got to this point because of a distributed system shutdown or
+      // region closure which uses interrupt to break any sleep() or wait()
+      // calls
+      // e.g. waitForPrimary or waitForBucketRecovery in which case throw
+      // exception
+      checkShutdown();
+
+      // If we get here, the attempt failed...
+      if (count == 1) {
+        // TODO prStats add ClearPRMsg retried
+        this.prStats.incPutAllMsgsRetried();
+      }
+    }
+  }
+
+  List<ClearPRMessage> createClearPRMessages() {
+    ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
+    for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) {
+      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+      clearMsgList.add(clearPRMessage);
+    }
+    return clearMsgList;
   }
 
   @Override
@@ -2574,7 +2759,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
           if (isDebugEnabled) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2715,7 +2900,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
           if (logger.isDebugEnabled()) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2960,7 +3145,7 @@ public class PartitionedRegion extends LocalRegion
         if (retryTime == null) {
           retryTime = new RetryTimeKeeper(this.retryTimeout);
         }
-        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
 
         // It's possible this is a GemFire thread e.g. ServerConnection
         // which got to this point because of a distributed system shutdown or
@@ -3119,10 +3304,11 @@ public class PartitionedRegion extends LocalRegion
    * @param retryTime the RetryTimeKeeper to track retry times
    * @param event the event used to get the entry size in the event a new bucket should be created
    * @param bucketId the identity of the bucket should it be created
+   * @param createIfNotExist boolean to indicate if to create a bucket if found not exist
    * @return a Node which contains the bucket, potentially null
    */
   private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime,
-      EntryEventImpl event, Integer bucketId) {
+      EntryEventImpl event, Integer bucketId, boolean createIfNotExist) {
     InternalDistributedMember newNode;
     if (retryTime.overMaximum()) {
       PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket",
@@ -3132,7 +3318,7 @@ public class PartitionedRegion extends LocalRegion
 
     retryTime.waitForBucketsRecovery();
     newNode = getNodeForBucketWrite(bucketId, retryTime);
-    if (newNode == null) {
+    if (newNode == null && createIfNotExist) {
       newNode = createBucket(bucketId, getEntrySize(event), retryTime);
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
index 402b7f2..f155a7e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
@@ -119,6 +119,11 @@ public class RegionEventImpl
     return region;
   }
 
+  public void setRegion(LocalRegion region) {
+    this.region = region;
+    this.distributedMember = region.getMyId();
+  }
+
   @Override
   public Operation getOperation() {
     return this.op;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 1a8aba1..9fa8057 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -26,7 +26,8 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheException;
-import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DirectReplyProcessor;
@@ -44,7 +45,6 @@ import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.RegionEventImpl;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.serialization.DeserializationContext;
@@ -54,16 +54,10 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
 public class ClearPRMessage extends PartitionMessageWithDirectReply {
   private static final Logger logger = LogService.getLogger();
 
-  private RegionEventImpl regionEvent;
-
   private Integer bucketId;
 
-  /** The time in ms to wait for a lock to be obtained during doLocalClear() */
-  public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
   public static final String BUCKET_NON_PRIMARY_MESSAGE =
       "The bucket region on target member is no longer primary";
-  public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE =
-      "A lock for the bucket region could not be obtained.";
   public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
       "An exception was thrown during the local clear operation: ";
 
@@ -79,14 +73,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
 
   public ClearPRMessage(int bucketId) {
     this.bucketId = bucketId;
-
-    // These are both used by the parent class, but don't apply to this message type
-    this.notificationOnly = false;
-    this.posDup = false;
-  }
-
-  public void setRegionEvent(RegionEventImpl event) {
-    regionEvent = event;
   }
 
   public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
@@ -103,16 +89,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     }
   }
 
-  @Override
-  public boolean isSevereAlertCompatible() {
-    // allow forced-disconnect processing for all cache op messages
-    return true;
-  }
-
-  public RegionEventImpl getRegionEvent() {
-    return regionEvent;
-  }
-
   public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
       throws ForceReattemptException {
     Set<InternalDistributedMember> recipients =
@@ -125,7 +101,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
 
     Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
     if (failures != null && failures.size() > 0) {
-      throw new ForceReattemptException("Failed sending <" + this + ">");
+      throw new ForceReattemptException("Failed sending <" + this + "> due to " + failures);
     }
     return clearResponse;
   }
@@ -143,7 +119,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     } else {
       InternalDataSerializer.writeSignedVL(bucketId, out);
     }
-    DataSerializer.writeObject(regionEvent, out);
   }
 
   @Override
@@ -151,12 +126,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
-    this.regionEvent = DataSerializer.readObject(in);
   }
 
   @Override
   public EventID getEventID() {
-    return regionEvent.getEventId();
+    return null;
   }
 
   /**
@@ -169,60 +143,51 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
   protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
       PartitionedRegion region, long startTime) {
     try {
-      result = doLocalClear(region);
+      this.result = doLocalClear(region);
     } catch (ForceReattemptException ex) {
       sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
           startTime);
       return false;
     }
-    sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime);
-    return false;
+    return this.result;
   }
 
-  public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException {
+  public Integer getBucketId() {
+    return this.bucketId;
+  }
+
+  public boolean doLocalClear(PartitionedRegion region)
+      throws ForceReattemptException {
     // Retrieve local bucket region which matches target bucketId
-    BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId);
+    BucketRegion bucketRegion =
+        region.getDataStore().getInitializedBucketForId(null, this.bucketId);
 
-    // Check if we are primary, throw exception if not
-    if (!bucketRegion.isPrimary()) {
+    boolean lockedForPrimary = bucketRegion.doLockForPrimary(false);
+    // Check if we obtained primary lock, throw exception if not
+    if (!lockedForPrimary) {
       throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
     }
-
-    DistributedLockService lockService = getPartitionRegionLockService();
-    String lockName = bucketRegion.getFullPath();
     try {
-      boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
-
-      if (!locked) {
-        throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-      }
-
-      // Double check if we are still primary, as this could have changed between our first check
-      // and obtaining the lock
-      if (!bucketRegion.isPrimary()) {
-        throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
-      }
-
-      try {
-        bucketRegion.cmnClearRegion(regionEvent, true, true);
-      } catch (Exception ex) {
-        throw new ForceReattemptException(
-            EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
-      }
-
+      RegionEventImpl regionEvent = new RegionEventImpl();
+      regionEvent.setOperation(Operation.REGION_CLEAR);
+      regionEvent.setRegion(bucketRegion);
+      bucketRegion.cmnClearRegion(regionEvent, true, true);
+    } catch (PartitionOfflineException poe) {
+      logger.info(
+          "All members holding data for bucket {} are offline, no more retries will be attempted",
+          this.bucketId,
+          poe);
+      throw poe;
+    } catch (Exception ex) {
+      throw new ForceReattemptException(
+          EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
     } finally {
-      lockService.unlock(lockName);
+      bucketRegion.doUnlockForPrimary();
     }
 
     return true;
   }
 
-  // Extracted for testing
-  protected DistributedLockService getPartitionRegionLockService() {
-    return DistributedLockService
-        .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
-  }
-
   @Override
   public boolean canStartRemoteTransaction() {
     return false;
@@ -247,39 +212,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     buff.append("; bucketId=").append(this.bucketId);
   }
 
-  @Override
-  public String toString() {
-    StringBuilder buff = new StringBuilder();
-    String className = getClass().getName();
-    buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
-    buff.append("(prid="); // make sure this is the first one
-    buff.append(this.regionId);
-
-    // Append name, if we have it
-    String name = null;
-    try {
-      PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId);
-      if (region != null) {
-        name = region.getFullPath();
-      }
-    } catch (Exception ignore) {
-      /* ignored */
-    }
-    if (name != null) {
-      buff.append(" (name = \"").append(name).append("\")");
-    }
-
-    appendFields(buff);
-    buff.append(" ,distTx=");
-    buff.append(this.isTransactionDistributed);
-    buff.append(")");
-    return buff.toString();
-  }
-
   public static class ClearReplyMessage extends ReplyMessage {
-    /** Result of the Clear operation */
-    boolean result;
-
     @Override
     public boolean getInlineProcess() {
       return true;
@@ -293,16 +226,21 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
 
     private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
       super();
-      this.result = result;
       setProcessorId(processorId);
-      setException(ex);
+      if (ex != null) {
+        setException(ex);
+      } else {
+        setReturnValue(result);
+      }
     }
 
-    /** Send an ack */
+    /**
+     * Send an ack
+     */
     public static void send(InternalDistributedMember recipient, int processorId,
         ReplySender replySender,
         boolean result, ReplyException ex) {
-      Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message");
+      Assert.assertNotNull(recipient, "ClearReplyMessage recipient was NULL.");
       ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
       message.setRecipient(recipient);
       replySender.putOutgoing(message);
@@ -340,23 +278,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     }
 
     @Override
-    public void fromData(DataInput in,
-        DeserializationContext context) throws IOException, ClassNotFoundException {
-      super.fromData(in, context);
-      this.result = in.readBoolean();
-    }
-
-    @Override
-    public void toData(DataOutput out,
-        SerializationContext context) throws IOException {
-      super.toData(out, context);
-      out.writeBoolean(this.result);
-    }
-
-    @Override
     public String toString() {
-      return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result
-          + " exception=" + getException();
+      StringBuilder stringBuilder = new StringBuilder(super.toString());
+      stringBuilder.append(" returnValue=");
+      stringBuilder.append(getReturnValue());
+      return stringBuilder.toString();
     }
   }
 
@@ -372,7 +298,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
     }
 
     public void setResponse(ClearReplyMessage response) {
-      this.returnValue = response.result;
+      if (response.getException() == null) {
+        this.returnValue = (boolean) response.getReturnValue();
+      }
     }
 
     /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
index 2cf5231..acdd4fc 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doNothing;
@@ -38,7 +37,6 @@ import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -50,6 +48,7 @@ import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.internal.cache.RegionEventImpl;
 
 public class ClearPRMessageTest {
 
@@ -61,11 +60,14 @@ public class ClearPRMessageTest {
   @Before
   public void setup() throws ForceReattemptException {
     message = spy(new ClearPRMessage());
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
     region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
     dataStore = mock(PartitionedRegionDataStore.class);
     when(region.getDataStore()).thenReturn(dataStore);
+    when(region.getFullPath()).thenReturn("/test");
     bucketRegion = mock(BucketRegion.class);
     when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
+    RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class);
   }
 
   @Test
@@ -79,44 +81,19 @@ public class ClearPRMessageTest {
 
   @Test
   public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
-    when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false);
-    when(bucketRegion.isPrimary()).thenReturn(true);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
-        .isInstanceOf(ForceReattemptException.class)
-        .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-  }
-
-  @Test
-  public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(false);
 
     assertThatThrownBy(() -> message.doLocalClear(region))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
   public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
     NullPointerException exception = new NullPointerException("Error encountered");
     doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
 
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
 
     assertThatThrownBy(() -> message.doLocalClear(region))
         .isInstanceOf(ForceReattemptException.class)
@@ -129,21 +106,13 @@ public class ClearPRMessageTest {
   @Test
   public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
       throws ForceReattemptException {
-    DistributedLockService mockLockService = mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
 
     // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
     assertThat(message.doLocalClear(region)).isTrue();
 
     // Confirm that cmnClearRegion was called
     verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
@@ -197,6 +166,7 @@ public class ClearPRMessageTest {
     int processorId = 1000;
     int startTime = 0;
 
+    doReturn(0).when(message).getBucketId();
     doReturn(true).when(message).doLocalClear(region);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();
@@ -206,8 +176,9 @@ public class ClearPRMessageTest {
     doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
 
     message.operateOnPartitionedRegion(distributionManager, region, startTime);
+    assertThat(message.result).isTrue();
 
-    verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region,
+    verify(message, times(0)).sendReply(sender, processorId, distributionManager, null, region,
         startTime);
   }
 
@@ -222,6 +193,7 @@ public class ClearPRMessageTest {
     ForceReattemptException exception =
         new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
 
+    doReturn(0).when(message).getBucketId();
     doThrow(exception).when(message).doLocalClear(region);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();