You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2018/01/08 19:49:34 UTC
[geode] 01/01: Merge pull request #1239 from
dschneider-pivotal/feature/GEODE-4051
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit f905ea2bd178d4301787b6d123780408202533b3
Merge: 83edc8d 61077fb
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Mon Jan 8 11:49:29 2018 -0800
Merge pull request #1239 from dschneider-pivotal/feature/GEODE-4051
GEODE-4051: change StateMarkerMessage to always reply
.../geode/internal/cache/StateFlushOperation.java | 60 ++++++++++------------
.../internal/cache/StateMarkerMessageTest.java | 32 ++++++++++++
2 files changed, 60 insertions(+), 32 deletions(-)
diff --cc geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index 8d95adf,c56fa26..25ddb40
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@@ -343,27 -344,40 +343,40 @@@ public class StateFlushOperation
}
@Override
- protected void process(DistributionManager dm) {
+ protected void process(ClusterDistributionManager dm) {
logger.trace(LogMarker.STATE_FLUSH_OP, "Processing {}", this);
if (dm.getDistributionManagerId().equals(relayRecipient)) {
- // wait for inflight operations to the aeqs even if the recipient is the primary
- Set<DistributedRegion> regions = getRegions(dm);
- for (DistributedRegion r : regions) {
- if (r != null) {
- if (this.allRegions && r.doesNotDistribute()) {
- // no need to flush a region that does no distribution
- continue;
+ try {
+ // wait for inflight operations to the aeqs even if the recipient is the primary
+ Set<DistributedRegion> regions = getRegions(dm);
+ for (DistributedRegion r : regions) {
+ if (r != null) {
+ if (this.allRegions && r.doesNotDistribute()) {
+ // no need to flush a region that does no distribution
+ continue;
+ }
+ waitForCurrentOperations(r, r.isInitialized());
}
- waitForCurrentOperations(r, r.isInitialized());
}
+ } catch (CancelException ignore) {
+ // cache is closed - no distribution advisor available for the region so nothing to do but
+ // send the stabilization message
+ } catch (Exception e) {
+ logger.fatal(LocalizedMessage.create(
+ LocalizedStrings.StateFlushOperation_0__EXCEPTION_CAUGHT_WHILE_DETERMINING_CHANNEL_STATE,
+ this), e);
+ } finally {
+ // no need to send a relay request to this process - just send the
+ // ack back to the sender
+ StateStabilizedMessage ga = new StateStabilizedMessage();
+ ga.sendingMember = relayRecipient;
+ ga.setRecipient(this.getSender());
+ ga.setProcessorId(processorId);
+ if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
+ logger.trace(LogMarker.STATE_FLUSH_OP, "Sending {}", ga);
+ }
+ dm.putOutgoing(ga);
}
- // no need to send a relay request to this process - just send the
- // ack back to the sender
- StateStabilizedMessage ga = new StateStabilizedMessage();
- ga.sendingMember = relayRecipient;
- ga.setRecipient(this.getSender());
- ga.setProcessorId(processorId);
- dm.putOutgoing(ga);
} else {
// 1) wait for all messages based on the membership version (or older)
// at which the sender "joined" this region to be put on the pipe
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.