You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:35:58 UTC
[36/83] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 98e72bd,0000000..a6649c3
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@@ -1,3002 -1,0 +1,3002 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.client.internal.locator.SerializationHelper;
+import com.gemstone.gemfire.cache.partition.PartitionListener;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
+import com.gemstone.gemfire.distributed.LockNotHeldException;
+import com.gemstone.gemfire.distributed.LockServiceDestroyedException;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.MembershipListener;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+import com.gemstone.gemfire.distributed.internal.locks.DistributedMemberLock;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.partitioned.Bucket;
+import com.gemstone.gemfire.internal.cache.partitioned.BucketProfileUpdateMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.DeposePrimaryBucketMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.DeposePrimaryBucketMessage.DeposePrimaryBucketResponse;
+import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.util.StopWatch;
+
+/**
+ * Specialized {@link CacheDistributionAdvisor} for {@link BucketRegion
+ * BucketRegions}. The <code>BucketAdvisor</code> is owned by a {@link
+ * ProxyBucketRegion} and may outlive a <code>BucketRegion</code>.
+ *
+ * @author Kirk Lund
+ */
+@SuppressWarnings("synthetic-access")
- public final class BucketAdvisor extends CacheDistributionAdvisor {
++public class BucketAdvisor extends CacheDistributionAdvisor {
+ private static final Logger logger = LogService.getLogger();
+
+ public static final boolean ENFORCE_SAFE_CLOSE = false;
+ //TODO: Boolean.getBoolean("gemfire.BucketAdvisor.debug.enforceSafeClose");
+
+ /** Reference to the InternalDistributedMember that is primary. */
+ private final AtomicReference primaryMember = new AtomicReference();
+
+ /**
+ * Advice requests for {@link #adviseProfileUpdate()} delegate to the
+ * partitioned region's <code>RegionAdvisor</code> to include members with
+ * {@link ProxyBucketRegion}s as well as real {@link BucketRegion}s.
+ */
+ protected final RegionAdvisor regionAdvisor;
+
+ /**
+ * The bucket primary will be holding this distributed lock. Protected by
+ * synchronized(this).
+ */
+ private DistributedMemberLock primaryLock;
+
+ //private static final byte MASK_HOSTING = 1; // 0001
+ //private static final byte MASK_VOLUNTEERING = 2; // 0010
+ //private static final byte MASK_OTHER_PRIMARY = 4; // 0100
+ //private static final byte MASK_IS_PRIMARY = 8; // 1000
+
+ private static final byte NO_PRIMARY_NOT_HOSTING = 0; // 0000_0000
+ private static final byte NO_PRIMARY_HOSTING = 1; // 0000_0001
+ private static final byte OTHER_PRIMARY_NOT_HOSTING = 4; // 0000_0100
+ private static final byte OTHER_PRIMARY_HOSTING = 5; // 0000_0101
+ private static final byte VOLUNTEERING_HOSTING = 3; // 0000_0011
+ private static final byte BECOMING_HOSTING = 15; // 0000_1111
+ private static final byte IS_PRIMARY_HOSTING = 9; // 0000_1001
+ private static final byte CLOSED = 16; // 0001_0000
+
+ /**
+ * The current state of this BucketAdvisor which tracks which member is
+ * primary and whether or not this member is hosting a real Bucket.
+ */
+ private byte primaryState = NO_PRIMARY_NOT_HOSTING;
+
+ /**
+ * This delegate handles all volunteering for primary status. Lazily created.
+ * Protected by synchronization(this).
+ */
+ private VolunteeringDelegate volunteeringDelegate;
+
+ /**
+ * A random number generator
+ *
+ * @see #getPreferredNode()
+ */
+ static private final Random myRand = new Random();
+
+ /**
+ * Used by {@link #updateRedundancy()} to determine if stat change is
+ * required. Access and mutation are done while synchronized on this
+ * advisor.
+ *
+ * @guarded.By this
+ */
+ private boolean redundancySatisfied = true;
+
+ /**
+ * Used by {@link #incLowRedundancyBucketCount(int)} to determine if
+ * redundancy for this bucket has ever been satisfied. Only buckets which
+ * lose redundancy after having redundancy will generate a redundancy loss
+ * alert.
+ * <p>
+ * Access and mutation are done while synchronized on this advisor.
+ *
+ * @guarded.By this
+ */
+ private boolean redundancyEverSatisfied = false;
+
+ /**
+ * A read/write lock to prevent making this bucket not primary while a write
+ * is in progress on the bucket.
+ */
+ private final ReadWriteLock primaryMoveLock = new ReentrantReadWriteLock();
+ private final Lock activeWriteLock = primaryMoveLock.readLock();
+ private final Lock activePrimaryMoveLock = primaryMoveLock.writeLock();
+
+ /**
+ * The advisor for the bucket region that we are colocated with, if this region
+ * is a colocated region.
+ */
+ private BucketAdvisor parentAdvisor;
+
+ private volatile int redundancy = -1;
+
+ /**
+ * The member that is responsible for choosing the primary
+ * for this bucket. While this field is set and this member
+ * exists, this bucket won't try to become primary.
+ */
+ private volatile InternalDistributedMember primaryElector;
+
+ private volatile BucketProfile localProfile;
+
+ private volatile boolean everHadPrimary = false;
+
+ private BucketAdvisor startingBucketAdvisor;
+
+ private PartitionedRegion pRegion;
+
+ private volatile boolean shadowBucketDestroyed;
+
+ /**
+ * Constructs a new BucketAdvisor for the Bucket owned by RegionAdvisor.
+ *
+ * @param bucket the bucket to provide metadata and advice for
+ * @param regionAdvisor advisor for the PartitionedRegion
+ */
+ private BucketAdvisor(Bucket bucket,
+ RegionAdvisor regionAdvisor) {
+ super(bucket);
+ this.regionAdvisor = regionAdvisor;
+ this.pRegion = this.regionAdvisor.getPartitionedRegion();
+ resetParentAdvisor(bucket.getId());
+ }
+
+ public static BucketAdvisor createBucketAdvisor(Bucket bucket, RegionAdvisor regionAdvisor) {
+ BucketAdvisor advisor = new BucketAdvisor(bucket, regionAdvisor);
+ advisor.initialize();
+ return advisor;
+ }
+
+ // For SQLFabric ALTER TABLE that may change colocation
+ public void resetParentAdvisor(int bucketId) {
+ PartitionedRegion colocatedRegion = ColocationHelper
+ .getColocatedRegion(this.pRegion);
+ if (colocatedRegion != null) {
+ if (colocatedRegion.isFixedPartitionedRegion()) {
+ List<FixedPartitionAttributesImpl> fpas = colocatedRegion
+ .getFixedPartitionAttributesImpl();
+ if (fpas != null) {
+ for (FixedPartitionAttributesImpl fpa : fpas) {
+ if (fpa.hasBucket(bucketId)) {
+ this.parentAdvisor = colocatedRegion.getRegionAdvisor()
+ .getBucketAdvisor(fpa.getStartingBucketID());
+ break;
+ }
+ }
+ }
+ }
+ else {
+ this.parentAdvisor = colocatedRegion.getRegionAdvisor()
+ .getBucketAdvisor(bucketId);
+ }
+ }
+ else {
+ this.parentAdvisor = null;
+ }
+ }
+
+ private void assignStartingBucketAdvisor() {
+ if (this.pRegion.isFixedPartitionedRegion()) {
+ List<FixedPartitionAttributesImpl> fpas = this.pRegion
+ .getFixedPartitionAttributesImpl();
+ if (fpas != null) {
+ int bucketId = getBucket().getId();
+ for (FixedPartitionAttributesImpl fpa : fpas) {
+ if (fpa.hasBucket(bucketId) && bucketId != fpa.getStartingBucketID()) {
+ startingBucketAdvisor = this.regionAdvisor.getBucketAdvisor(
+ fpa.getStartingBucketID());
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the lock that prevents the primary from moving while active writes
+ * are in progress. This should be locked before checking if the local bucket
+ * is primary.
+ *
+ * @return the lock for in-progress write operations
+ */
+ public Lock getActiveWriteLock() {
+ return this.activeWriteLock;
+ }
+
+ /**
+ * Returns the lock that prevents the parent's primary from moving while
+ * active writes are in progress. This should be locked before checking if the
+ * local bucket is primary.
+ *
+ * @return the lock for in-progress write operations
+ */
+ Lock getParentActiveWriteLock() {
+ if (this.parentAdvisor != null) {
+ return this.parentAdvisor.getActiveWriteLock();
+ }
+ return null;
+ }
+
+ /**
+ * Try to lock the primary bucket to make sure no operation is on-going at
+ * current bucket.
+ *
+ */
+ public void tryLockIfPrimary() {
+ if (isPrimary()) {
+ try {
+ this.activePrimaryMoveLock.lock();
+ } finally {
+ this.activePrimaryMoveLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Makes this <code>BucketAdvisor</code> give up being a primary and become
+ * a secondary. Does nothing if not currently the primary.
+ *
+ * @return true if this advisor has been deposed as primary
+ */
+ public boolean deposePrimary() {
+ if (isPrimary()) {
+ this.activePrimaryMoveLock.lock();
+ boolean needToSendProfileUpdate = false;
+ try {
+ removePrimary(getDistributionManager().getId());
+ synchronized(this) {
+ if (!isPrimary()) {
+ //releasePrimaryLock();
+ needToSendProfileUpdate = true;
+ return true;
+ } else {
+ return false; // failed for some reason
+ }
+ }
+ } finally {
+ this.activePrimaryMoveLock.unlock();
+ if (needToSendProfileUpdate) {
+ if (this.getBucket() instanceof BucketRegionQueue) {
+ BucketRegionQueue brq = (BucketRegionQueue)this.getBucket();
+ brq.decQueueSize(brq.size());
+ }
+ sendProfileUpdate();
+ }
+ }
+ } else {
+ sendProfileUpdate();
+ return true;
+ }
+ }
+
+ /**
+ * This calls deposePrimary on every colocated child that is directly
+ * colocated to this bucket's PR. Those each in turn do the same to their
+ * child buckets and so on before returning. Each depose will send a dlock
+ * release message to the grantor, wait for reply, and then also send a
+ * profile update.
+ * <p>
+ * Caller must synchronize on this BucketAdvisor.
+
+ * @return true if children were all deposed as primaries
+ * @guarded.By this
+ */
+ private boolean deposePrimaryForColocatedChildren() {
+ boolean deposedChildPrimaries = true;
+
+ // getColocatedChildRegions returns only the child PRs directly colocated
+ // with thisPR...
+ List<PartitionedRegion> colocatedChildPRs =
+ ColocationHelper.getColocatedChildRegions(this.pRegion);
+ if (colocatedChildPRs != null) {
+ for (PartitionedRegion pr : colocatedChildPRs) {
+ Bucket b = pr.getRegionAdvisor().getBucket(getBucket().getId());
+ if (b != null) {
+ BucketAdvisor ba = b.getBucketAdvisor();
+ deposedChildPrimaries = ba.deposePrimary() && deposedChildPrimaries;
+ }
+ }
+ }
+ return deposedChildPrimaries;
+ }
+
+ private boolean deposeOtherPrimaryBucketForFixedPartition() {
+ boolean deposedOtherPrimaries = true;
+ int bucketId = getBucket().getId();
+ List<FixedPartitionAttributesImpl> fpas = this.pRegion
+ .getFixedPartitionAttributesImpl();
+ if (fpas != null) {
+ for (FixedPartitionAttributesImpl fpa : fpas) {
+ if (fpa.getStartingBucketID() == bucketId) {
+ for (int i = (bucketId+1); i <= fpa.getLastBucketID(); i++) {
+ Bucket b = regionAdvisor.getBucket(i);
+ if (b != null) {
+ BucketAdvisor ba = b.getBucketAdvisor();
+ deposedOtherPrimaries = ba.deposePrimary()
+ && deposedOtherPrimaries;
+ }
+ }
+ }
+ else {
+ continue;
+ }
+ }
+ }
+ return deposedOtherPrimaries;
+ }
+
+ void removeBucket() {
+ setHosting(false);
+ }
+
+ /**
+ * Increment or decrement lowRedundancyBucketCount stat and generate
+ * alert only once per loss of redundancy for PR but only if redundancy
+ * has ever been satisfied.
+ * <p>
+ * Caller must synchronize on this BucketAdvisor.
+ *
+ * @param val the value to increment or decrement by
+ * @guarded.By this
+ */
+ private void incLowRedundancyBucketCount(int val) {
+ final int HAS_LOW_REDUNDANCY = 0;
+ final int ALREADY_GENERATED_WARNING = 1;
+
+ final PartitionedRegionStats stats = getPartitionedRegionStats();
+ final boolean[] lowRedundancyFlags =
+ this.regionAdvisor.getLowRedundancyFlags();
+ final int configuredRedundancy =
+ this.pRegion.getRedundantCopies();
+
+ synchronized (lowRedundancyFlags) {
+ stats.incLowRedundancyBucketCount(val);
+
+ if (stats.getLowRedundancyBucketCount() == 0) {
+ // all buckets are fully redundant
+ lowRedundancyFlags[HAS_LOW_REDUNDANCY] = false; // reset
+ lowRedundancyFlags[ALREADY_GENERATED_WARNING] = false; // reset
+ stats.setActualRedundantCopies(configuredRedundancy);
+ }
+
+ else {
+ // one or more buckets are not fully redundant
+ int numBucketHosts = getBucketRedundancy() + 1;
+ int actualRedundancy = Math.max(numBucketHosts - 1, 0); // zero or more
+
+ if (actualRedundancy < stats.getActualRedundantCopies()) {
+ // need to generate an alert for this lower actual redundancy
+ lowRedundancyFlags[ALREADY_GENERATED_WARNING] = false;
+ }
+
+ if (!lowRedundancyFlags[HAS_LOW_REDUNDANCY] ||
+ !lowRedundancyFlags[ALREADY_GENERATED_WARNING]) {
+ // either we have lower redundancy or we never generated an alert
+
+ lowRedundancyFlags[HAS_LOW_REDUNDANCY] = true;
+ stats.setActualRedundantCopies(actualRedundancy);
+
+ // this bucket will only generate alert if redundancyEverSatisfied
+ if (!lowRedundancyFlags[ALREADY_GENERATED_WARNING] &&
+ this.redundancyEverSatisfied) {
+
+ lowRedundancyFlags[ALREADY_GENERATED_WARNING] = true;
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.BucketAdvisor_REDUNDANCY_HAS_DROPPED_BELOW_0_CONFIGURED_COPIES_TO_1_ACTUAL_COPIES_FOR_2,
+ new Object[] { Integer.valueOf(configuredRedundancy), Integer.valueOf(actualRedundancy), this.pRegion.getFullPath()}));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Return (and possibly choose) a thread-sticky member from whose data store
+ * this bucket's values should be read
+ * @return member to use for reads, null if none available
+ */
+ public InternalDistributedMember getPreferredNode() {
+
+ if (isHosting()) {
+ getPartitionedRegionStats().incPreferredReadLocal();
+ return getDistributionManager().getId();
+ }
+
+ Profile locProfiles[] = this.profiles; // volatile read
+ if (locProfiles.length == 0) {
+ return null;
+ }
+ getPartitionedRegionStats().incPreferredReadRemote();
+
+ if (locProfiles.length == 1) { // only one choice!
+ return locProfiles[0].peerMemberId;
+ }
+
+ // Pick one at random.
+ int i = myRand.nextInt(locProfiles.length);
+ return locProfiles[i].peerMemberId;
+ }
+
+ /**
+ * Returns the thread-safe queue of primary volunteering tasks for the
+ * parent Partitioned Region.
+ *
+ * @return the queue of primary volunteering tasks
+ */
+ Queue getVolunteeringQueue() {
+ return this.regionAdvisor.getVolunteeringQueue();
+ }
+
+ /**
+ * Returns the semaphore which controls the number of threads allowed to
+ * consume from the {@link #getVolunteeringQueue volunteering queue}.
+ *
+ * @return the semaphore which controls the number of volunteering threads
+ */
+ Semaphore getVolunteeringSemaphore() {
+ return this.regionAdvisor.getVolunteeringSemaphore();
+ }
+
+ /**
+ * Returns the PartitionedRegionStats.
+ *
+ * @return the PartitionedRegionStats
+ */
+ PartitionedRegionStats getPartitionedRegionStats() {
+ return this.regionAdvisor.getPartitionedRegionStats();
+ }
+
+ /**
+ * Concurrency: protected by synchronizing on *this*
+ */
+ @Override
+ protected void profileCreated(Profile profile) {
+ this.regionAdvisor.incrementBucketCount(profile);
+ super.profileCreated(profile);
+ if (updateRedundancy() > 0) {
+ // wake up any threads in waitForRedundancy or waitForPrimary
+ this.notifyAll();
+ }
+ this.regionAdvisor.updateBucketStatus(this.getBucket().getId(),
+ profile.peerMemberId, false);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Profile added {} Profile : {}", getBucket().getFullPath(), profile);
+ }
+ synchronized (this){
+ updateServerBucketProfile();
+ }
+ }
+
+ /**
+ * Concurrency: protected by synchronizing on *this*
+ */
+ @Override
+ protected void profileUpdated(Profile profile) {
+ super.profileUpdated(profile);
+ if (updateRedundancy() > 0) {
+ // wake up any threads in waitForRedundancy or waitForPrimary
+ this.notifyAll();
+ }
+ this.regionAdvisor.updateBucketStatus(this.getBucket().getId(),
+ profile.peerMemberId, false);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Profile updated {} Profile : {}", getBucket().getFullPath(), profile);
+ }
+ synchronized (this) {
+ updateServerBucketProfile();
+ }
+ }
+
+ /**
+ * Concurrency: protected by synchronizing on *this*
+ */
+ @Override
+ protected void profileRemoved(Profile profile) {
+ if (profile != null) {
+ this.regionAdvisor.updateBucketStatus(this.getBucket().getId(),
+ profile.getDistributedMember(), true);
+ this.regionAdvisor.decrementsBucketCount(profile);
+ }
+ updateRedundancy();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Profile removed {} the member lost {} Profile : {}", getBucket().getFullPath(),
+ profile.getDistributedMember(), profile);
+ }
+ synchronized (this) {
+ updateServerBucketProfile();
+ }
+ }
+
+ @Override
+ public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
+ BucketProfile profile = (BucketProfile)getProfile(id);
+ return (profile != null) && (profile.isPrimary);
+ }
+
+ @Override
+ public DistributedRegion getRegionForDeltaGII() {
+ DistributedRegion result = super.getRegionForDeltaGII();
+ if (result == null && getAdvisee() instanceof ProxyBucketRegion) {
+ result = ((ProxyBucketRegion)getAdvisee()).getHostedBucketRegion();
+ }
+ return result;
+ }
+
+
+
+
+ /**
+ * Called by the RegionAdvisor.profileRemoved, this method
+ * tests to see if the missing member is the primary elector
+ * for this bucket.
+ *
+ * We can't call this method from BucketAdvisor.profileRemoved,
+ * because the primaryElector may not actually host the bucket.
+ * @param profile
+ */
+ public void checkForLostPrimaryElector(Profile profile) {
+ //If the member that went away was in the middle of creating
+ //the bucket, finish the bucket creation.
+ if(this.primaryElector != null && this.primaryElector.equals(profile.getDistributedMember())) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Bucket {} lost the member responsible for electing the primary. Finishing bucket creation", getBucket().getFullPath());
+ }
+ this.primaryElector = getBucket().getDistributionManager().getId();
+ this.getBucket().getDistributionManager().getWaitingThreadPool().execute(new Runnable() {
+ public void run() {
+ getBucket().getPartitionedRegion().getRedundancyProvider().finishIncompleteBucketCreation(getBucket().getId());
+ }
+ });
+ }
+ }
+
+ /**
+ * Only allows profiles that actually hosting this bucket. If the profile is
+ * primary, then primaryMember will be set to that member but only
+ * if we are not already the primary.
+ *
+ * @param profile the profile to add (must be a BucketProfile)
+ * @param forceProfile true will force profile to be added even if member is
+ * not in distributed view
+ *
+ * @see #adviseProfileUpdate()
+ */
+ @Override
+ public boolean putProfile(Profile profile, boolean forceProfile) {
+ assert profile instanceof BucketProfile;
+ BucketProfile bp = (BucketProfile) profile;
+
+ // Only hosting buckets will be initializing, the isInitializing boolean is to
+ // allow for early entry into the advisor for GII purposes
+ if ( ! bp.isHosting && ! bp.isInitializing) {
+ if (logger.isTraceEnabled(LogMarker.DA)) {
+ logger.trace(LogMarker.DA, "BucketAdvisor#putProfile early out");
+ }
+ return false; // Do not allow introduction of proxy profiles, they don't provide anything useful
+ // isHosting = false, isInitializing = false
+ }
+ if (logger.isTraceEnabled(LogMarker.DA)) {
+ logger.trace(LogMarker.DA, "BucketAdvisor#putProfile profile=<{}> force={}; profile = {}", profile, forceProfile, bp);
+ }
+ // isHosting = false, isInitializing = true
+ // isHosting = true, isInitializing = false
+ // isHosting = true, isInitializing = true... (false state)
+
+ final boolean applied;
+ synchronized(this) {
+ // force new membership version in the advisor so that the
+ // state flush mechanism can capture any updates to the bucket
+ // MIN_VALUE is intended as a somewhat unique value for potential debug purposes
+ profile.initialMembershipVersion = Long.MIN_VALUE;
+ applied = super.putProfile(profile, forceProfile);
+ // skip following block if isPrimary to avoid race where we process late
+ // arriving OTHER_PRIMARY profile after we've already become primary
+ if (applied && !isPrimary()) { // TODO is it safe to change the bucket state if the profile was not applied? -- mthomas 2/13/08
+ if (bp.isPrimary) {
+ setPrimaryMember(bp.getDistributedMember());
+ } else {
+ notPrimary(bp.getDistributedMember());
+ }
+ } // if: !isPrimary
+
+ } // synchronized
+ return applied;
+ }
+
+ private static <E> Set<E> newSetFromMap(Map<E, Boolean> map) {
+ if (map.isEmpty()) {
+ return new SetFromMap<E>(map);
+ }
+ throw new IllegalArgumentException();
+ }
+
+ private static class SetFromMap<E> extends AbstractSet<E> implements
+ Serializable {
+ private static final long serialVersionUID = 2454657854757543876L;
+
+ // must named as it, to pass serialization compatibility test.
+ private Map<E, Boolean> m;
+
+ private transient Set<E> backingSet;
+
+ SetFromMap(final Map<E, Boolean> map) {
+ super();
+ m = map;
+ backingSet = map.keySet();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ return backingSet.equals(object);
+ }
+
+ @Override
+ public int hashCode() {
+ return backingSet.hashCode();
+ }
+
+ @Override
+ public boolean add(E object) {
+ return m.put(object, Boolean.TRUE) == null;
+ }
+
+ @Override
+ public void clear() {
+ m.clear();
+ }
+
+ @Override
+ public String toString() {
+ return backingSet.toString();
+ }
+
+ @Override
+ public boolean contains(Object object) {
+ return backingSet.contains(object);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> collection) {
+ return backingSet.containsAll(collection);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return m.isEmpty();
+ }
+
+ @Override
+ public boolean remove(Object object) {
+ return m.remove(object) != null;
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> collection) {
+ return backingSet.retainAll(collection);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return backingSet.toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] contents) {
+ return backingSet.toArray(contents);
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return backingSet.iterator();
+ }
+
+ @Override
+ public int size() {
+ return m.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void readObject(ObjectInputStream stream) throws IOException,
+ ClassNotFoundException {
+ stream.defaultReadObject();
+ backingSet = m.keySet();
+ }
+ }
+
+ private void updateServerBucketProfile() {
+ // check to see if it is clientBucketProfile add it to the RegionAdvisor
+ // data structure which maintains the newly created clientBucketProfile
+ // don't add more than once, but replace existing profile
+ int bucketId = this.getBucket().getId();
+ Set<ServerBucketProfile> serverProfiles = this.regionAdvisor
+ .getClientBucketProfiles(bucketId);
+ if (serverProfiles == null) {
+ serverProfiles = newSetFromMap(new ConcurrentHashMap<ServerBucketProfile, Boolean>());
+ this.regionAdvisor.setClientBucketProfiles(bucketId, serverProfiles);
+ }
+ serverProfiles.clear();
+ for (Profile p : this.profiles) {
+ if (p instanceof ServerBucketProfile) {
+ serverProfiles.add((ServerBucketProfile)p);
+ }
+ }
+ }
+
+ /**
+ * Only for local profile.
+ * @param p
+ */
+ public synchronized void updateServerBucketProfile(BucketProfile p) {
+ this.localProfile = p;
+ }
+
+ public BucketProfile getLocalProfile() {
+ return this.localProfile;
+ }
+
+ @Override
+ public boolean removeId(ProfileId memberId,
+ boolean crashed,
+ boolean destroyed,
+ boolean fromMembershipListener) {
+ boolean hadBucketRegion = super.removeId(memberId, crashed, destroyed, fromMembershipListener);
+ if (hadBucketRegion) {
+ // do NOT call notPrimary under synchronization
+ try {
+ notPrimary((InternalDistributedMember)memberId);
+ } catch (CancelException e) {
+ // must be closing the cache - no need to try to become primary
+ }
+ }
+ return hadBucketRegion;
+ }
+
+ /**
+ * Removes the profile for the specified member. If that profile is marked
+ * as primary, this will call {@link #notPrimary(InternalDistributedMember)}.
+ *
+ * @param memberId the member to remove the profile for
+ * @param serialNum specific serial number to remove
+ * @return true if a matching profile for the member was found
+ */
+ @Override
+ public boolean removeIdWithSerial(InternalDistributedMember memberId,
+ int serialNum, boolean regionDestroyed) {
+ boolean hadBucketRegion = super.removeIdWithSerial(memberId, serialNum, regionDestroyed);
+ if (hadBucketRegion) {
+ // do NOT call notPrimary under synchronization
+ notPrimary(memberId);
+ }
+ forceNewMembershipVersion();
+ return hadBucketRegion;
+ }
+
+ @Override
+ public Set adviseProfileExchange() {
+ // delegate up to RegionAdvisor to include members that might have
+ // ProxyBucketRegion without a real BucketRegion
+ Assert.assertTrue(this.regionAdvisor.isInitialized());
+ return this.regionAdvisor.adviseBucketProfileExchange();
+ }
+
+ @Override
+ public Set adviseProfileUpdate() {
+ // delegate up to RegionAdvisor to include members that might have
+ // ProxyBucketRegion without a real BucketRegion
+ return this.regionAdvisor.adviseGeneric();
+ }
+
+ /**
+ * Sets hosting to false and returns without closing. Calling closeAdvisor
+ * will actually close this advisor.
+ */
+ @Override
+ public void close() {
+ // ok, so BucketRegion extends DistributedRegion which calls close on the
+ // advisor, BUT the ProxyBucketRegion truly owns the advisor and only it
+ // should be able to close the advisor...
+ //
+ // if a BucketRegion closes, it will call this method and we want to change
+ // our state to NOT_HOSTING
+ //
+ // see this.closeAdvisor()
+ setHosting(false);
+ }
+
+ /**
+ * Blocks until there is a known primary and return that member, but only
+ * if there are real bucket regions that exist. If there are no real
+ * bucket regions within the distribution config's member-timeout setting
+ * * 3 (time required to eject a member) + 15000, then
+ * this returns null.
+ *
+ * kbanks: reworked this method to avoid JIT issue #40639
+ *
+ * @return the member who is primary for this bucket
+ */
+ public final InternalDistributedMember getPrimary() {
+ InternalDistributedMember primary = getExistingPrimary();
+ if(primary == null){
+ primary = waitForNewPrimary();
+ }
+ return primary;
+ }
+
+ /**
+ * This method was split out from getPrimary() due to bug #40639
+ * and is only intended to be called from within that method.
+ * @see #getPrimary()
+ * @return the existing primary (if it is still in the view) otherwise null
+ */
+ private final InternalDistributedMember getExistingPrimary() {
+ return basicGetPrimaryMember();
+ }
+
+ /**
+ * If the current member is primary for this bucket return true, otherwise,
+ * give some time for the current member to become primary and
+ * then return whether it is a primary (true/false).
+ */
+ public final boolean isPrimaryWithWait() {
+ if (this.isPrimary()) {
+ return true;
+ }
+ // wait for the current member to become primary holder
+ InternalDistributedMember primary = waitForNewPrimary();
+ if(primary != null) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * This method was split out from getPrimary() due to bug #40639
+ * and is only intended to be called from within that method.
+ * @see #getPrimary()
+ * @return the new primary
+ */
+ private final InternalDistributedMember waitForNewPrimary() {
+ DM dm = this.regionAdvisor.getDistributionManager();
+ DistributionConfig config = dm.getConfig();
+ // failure detection period
+ long timeout = config.getMemberTimeout() * 3;
+ // plus time for a new member to become primary
+ timeout += Long.getLong("gemfire.BucketAdvisor.getPrimaryTimeout",
+ 15 * 1000);
+ InternalDistributedMember newPrimary = waitForPrimaryMember(timeout);
+ return newPrimary;
+ }
+
+ /**
+ * Marks member as not primary. Initiates volunteerForPrimary if this
+ * member is hosting a real bucket. This method does nothing
+ * if the member parameter is the current member.
+ *
+ * @param member the member who is not primary
+ */
+ public void notPrimary(InternalDistributedMember member) {
+ //Fix for 43569. Only the deposePrimary call should
+ //make the local member drop the primary lock.
+ if(!member.equals(getDistributionManager().getId())) {
+ removePrimary(member);
+ }
+ }
+
+ /**
+ * Marks member as not primary. Initiates volunteerForPrimary if this
+ * member is hosting a real bucket.
+ *
+ * @param member the member who is not primary
+ */
+ public void removePrimary(InternalDistributedMember member) {
+ boolean needToVolunteerForPrimary = false;
+ if (!isClosed()) { // hole: requestPrimaryState not hosting
+ initializationGate();
+ }
+ boolean lostPrimary = false;
+ try {
+ synchronized(this) {
+ boolean wasPrimary = isPrimary() && this.getDistributionManager().getId().equals(member);
+ final InternalDistributedMember currentPrimary = (InternalDistributedMember) this.primaryMember.get();
+ if (currentPrimary != null && currentPrimary.equals(member)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("[BucketAdvisor.notPrimary] {} for {}", member, this);
+ }
+ this.primaryMember.set(null);
+ }
+ else {
+ return;
+ }
+
+ if (isClosed()) {
+ // possibly closed if caller comes from outside this advisor
+ return; // return quietly
+ }
+ // member is primary... need to change state to NO_PRIMARY_xxx
+ if (isHosting()) {
+ requestPrimaryState(NO_PRIMARY_HOSTING);
+ if (this.pRegion.isFixedPartitionedRegion()) {
+ InternalDistributedMember primaryMember = this.regionAdvisor
+ .adviseFixedPrimaryPartitionDataStore(this.getBucket().getId());
+ if (primaryMember == null || primaryMember.equals(member)) {
+ needToVolunteerForPrimary = true;
+ }
+ else {
+ needToVolunteerForPrimary = false;
+ }
+ }
+ else {
+ needToVolunteerForPrimary = true;
+ }
+ }
+ else {
+ requestPrimaryState(NO_PRIMARY_NOT_HOSTING);
+ }
+ if(wasPrimary) {
+ lostPrimary = true;
+ }
+
+ // try to fix yet another cause of bug 36881
+ findAndSetPrimaryMember();
+ }
+ } finally {
+ if (lostPrimary) {
+ Bucket br = this.regionAdvisor.getBucket(getBucket().getId());
+ if( br != null && br instanceof BucketRegion) {
+ ((BucketRegion)br).beforeReleasingPrimaryLockDuringDemotion();
+ }
+
+ releasePrimaryLock();
+ // this was a deposePrimary call so we need to depose children as well
+ deposePrimaryForColocatedChildren();
+ if (this.pRegion.isFixedPartitionedRegion()) {
+ deposeOtherPrimaryBucketForFixedPartition();
+ }
+ }
+ }
+
+ if (needToVolunteerForPrimary) {
+ volunteerForPrimary();
+ }
+ }
+
+ /**
+ * Returns the ProxyBucketRegion which owns this advisor.
+ *
+ * @return the ProxyBucketRegion which owns this advisor
+ */
+ public ProxyBucketRegion getProxyBucketRegion() {
+ return (ProxyBucketRegion)getAdvisee();
+ }
+
+ /**
+ * Actually close this advisor for real. Called by ProxyBucketRegion only.
+ * Calling this method actually closes this advisor whereas {@link #close()}
+ * only sets hosting to false.
+ */
+ protected void closeAdvisor() {
+ boolean wasPrimary;
+ synchronized(this) {
+ if (isClosed()) {
+ return;
+ }
+ wasPrimary = isPrimary();
+ super.close();
+ this.requestPrimaryState(CLOSED);
+ if (!this.redundancySatisfied){
+ incLowRedundancyBucketCount(-1);
+ this.redundancySatisfied = true;
+ }
+ this.localProfile = null;
+ }
+ if(wasPrimary) {
+ releasePrimaryLock();
+ }
+ }
+
+ /**
+ * Returns true if this advisor has been closed.
+ *
+ * @return true if this advisor has been closed
+ */
+ protected boolean isClosed() {
+ synchronized(this) {
+ return this.primaryState == CLOSED;
+ }
+ }
+
+ /**
+ * Returns true if this member is currently marked as primary.
+ *
+ * @return true if this member is currently marked as primary
+ */
+ public boolean isPrimary() {
+ synchronized(this) {
+ return this.primaryState == IS_PRIMARY_HOSTING;
+ }
+ }
+
+ /**
+ * Returns true if this member is currently volunteering for primary.
+ *
+ * @return true if this member is currently volunteering for primary
+ */
+ protected boolean isVolunteering() {
+ synchronized(this) {
+ return this.primaryState == VOLUNTEERING_HOSTING;
+ }
+ }
+
+ /**
+ * Returns true if this member is currently attempting to become primary.
+ *
+ * @return true if this member is currently attempting to become primary
+ */
+ protected boolean isBecomingPrimary() {
+ synchronized(this) {
+ return this.primaryState == BECOMING_HOSTING &&
+ this.volunteeringDelegate != null &&
+ this.volunteeringDelegate.isAggressive();
+ }
+ }
+
+ /**
+ * Returns true if this member is currently hosting real bucket.
+ *
+ * @return true if this member is currently hosting real bucket
+ */
+ public boolean isHosting() {
+ synchronized(this) {
+ return this.primaryState == NO_PRIMARY_HOSTING ||
+ this.primaryState == OTHER_PRIMARY_HOSTING ||
+ this.primaryState == VOLUNTEERING_HOSTING ||
+ this.primaryState == BECOMING_HOSTING ||
+ this.primaryState == IS_PRIMARY_HOSTING;
+ }
+ }
+
+ /**
+ * Attempt to acquire lock for primary until a primary exists. Caller hands
+ * off responsibility to an executor (waiting pool) and returns early.
+ */
+ public void volunteerForPrimary() {
+ if (primaryElector != null) {
+ return;
+ }
+ initializationGate();
+
+ synchronized (this) {
+ if (isVolunteering() || isClosed() || !isHosting()) {
+ // only one thread should be attempting to volunteer at one time
+ return;
+ }
+ // if member is still not initialized then don't volunteer for primary
+ final GemFireCacheImpl cache = (GemFireCacheImpl)getBucket().getCache();
+ if (!cache.doVolunteerForPrimary(this)) {
+ return;
+ }
+ if (this.volunteeringDelegate == null) {
+ this.volunteeringDelegate = new VolunteeringDelegate();
+ }
+ this.volunteeringDelegate.volunteerForPrimary();
+ }
+ }
+
+ /**
+ * Makes this <code>BucketAdvisor</code> become the primary if it is already
+ * a secondary.
+ *
+ * @param isRebalance true if directed to become primary by rebalancing
+ * @return true if this advisor succeeds in becoming the primary
+ */
+ public boolean becomePrimary(boolean isRebalance) {
+ initializationGate();
+
+ long startTime
+ = getPartitionedRegionStats().startPrimaryTransfer(isRebalance);
+ try {
+ long waitTime = 2000; // time each iteration will wait
+ while (!isPrimary()) {
+ this.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
+ boolean attemptToBecomePrimary = false;
+ boolean attemptToDeposePrimary = false;
+
+ if (Thread.currentThread().isInterrupted()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Breaking from becomePrimary loop due to thread interrupt flag being set");
+ }
+ break;
+ }
+ if (isClosed() || !isHosting()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Breaking from becomePrimary loop because {} is closed or not hosting", this);
+ }
+ break;
+ }
+
+ VolunteeringDelegate vDelegate = null;
+ synchronized (this) {
+ if (isVolunteering()) {
+ // standard volunteering attempt already in progress...
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for volunteering thread {}. Time left: {} ms", this, waitTime);
+ }
+ this.wait(waitTime); // spurious wakeup ok
+ continue;
+
+ } else if (isBecomingPrimary()) {
+ // reattempt to depose otherPrimary...
+ attemptToDeposePrimary = true;
+
+ } else {
+ // invoke becomePrimary AFTER sync is released in this thread...
+ vDelegate = this.volunteeringDelegate;
+ if (vDelegate == null) {
+ vDelegate = new VolunteeringDelegate();
+ this.volunteeringDelegate = vDelegate;
+ }
+ } // else
+ } // synchronized
+
+ if (vDelegate != null) {
+ // Use the snapshot 'vDelegate' instead of 'this.volunteeringDelegate' since we are not synced here.
+ attemptToBecomePrimary = vDelegate.reserveForBecomePrimary(); // no sync!
+ }
+
+ // release synchronization and then call becomePrimary
+ if (attemptToBecomePrimary) {
+ synchronized (this) {
+ if (this.volunteeringDelegate == null) {
+ this.volunteeringDelegate = new VolunteeringDelegate();
+ }
+ this.volunteeringDelegate.volunteerForPrimary();
+ attemptToDeposePrimary = true;
+ } // synchronized
+ Thread.sleep(10);
+ } // attemptToBecomePrimary
+
+ // RACE: slight race condition with thread that's actually requesting the lock
+
+ if (attemptToDeposePrimary) {
+ InternalDistributedMember otherPrimary = getPrimary();
+ if (otherPrimary != null &&
+ !getDistributionManager().getId().equals(otherPrimary)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Attempting to depose primary on {} for {}", otherPrimary, this);
+ }
+ DeposePrimaryBucketResponse response =
+ DeposePrimaryBucketMessage.send(
+ otherPrimary,
+ this.pRegion,
+ getBucket().getId());
+ if (response != null) {
+ response.waitForRepliesUninterruptibly();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Deposed primary on {}", otherPrimary);
+ }
+ }
+ }
+ Thread.sleep(10);
+ } // attemptToDeposePrimary
+
+ } // while
+
+ } catch (InterruptedException e) {
+ // abort and return null
+ Thread.currentThread().interrupt();
+
+ } finally {
+ getPartitionedRegionStats().endPrimaryTransfer(
+ startTime, isPrimary(), isRebalance);
+ }
+
+ return isPrimary();
+ }
+
+ /**
+ * Check the primary member shortcut. Does not query the advisor. Should
+ * only be used when the advisor should not be consulted directly.
+ *
+ * @return the member or null if no primary exists
+ */
+ public final InternalDistributedMember basicGetPrimaryMember() {
+ return (InternalDistributedMember) this.primaryMember.get();
+ }
+
+ /**
+ * Invoked when the primary lock has been acquired by this VM.
+ *
+ * @return true if successfully changed state to IS_PRIMARY
+ */
+ protected boolean acquiredPrimaryLock() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Acquired primary lock for BucketID {} PR : {}", getBucket().getId(), regionAdvisor.getPartitionedRegion().getFullPath());
+ }
+ boolean changedStateToIsPrimary = false;
+ //Hold the primary move lock until we send a
+ //profile update. This will prevent writes
+ //from occurring until all members know that
+ //this member is now the primary.
+ boolean shouldInvokeListeners = false;
+ activePrimaryMoveLock.lock();
+ try {
+ synchronized (this) {
+ if (isHosting() && (isVolunteering() || isBecomingPrimary())) {
+ Bucket br = this.regionAdvisor.getBucket(getBucket().getId());
+ if (br != null && br instanceof BucketRegion) {
+ ((BucketRegion)br).beforeAcquiringPrimaryState();
+ }
+ if (requestPrimaryState(IS_PRIMARY_HOSTING)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Acquired primary lock for setting primary now BucketID {} PR : {}",
+ getBucket().getId(), regionAdvisor.getPartitionedRegion().getFullPath());
+ }
+ setPrimaryMember(getDistributionManager().getId());
+ changedStateToIsPrimary = true;
+ if (hasPrimary() && isPrimary()) {
+ shouldInvokeListeners = true;
+ }
+ }
+ }
+ }
+
+ if(shouldInvokeListeners) {
+ invokePartitionListeners();
+ }
+ return changedStateToIsPrimary;
+ }
+ finally {
+ try {
+ if (changedStateToIsPrimary) {
+ // send profile update AFTER releasing sync
+ sendProfileUpdate();
+
+ Bucket br = this.regionAdvisor.getBucket(getBucket().getId());
+ if( br != null && br instanceof BucketRegion) {
+ ((BucketRegion)br).processPendingSecondaryExpires();
+ }
+ if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
+ // i.e. this stats is not getting incremented for HDFSBucketRegionQueue!!
+ BucketRegionQueue brq = (BucketRegionQueue)br;
+ brq.incQueueSize(brq.size());
+ }
+ if( br != null && br instanceof BucketRegion) {
+ ((BucketRegion)br).afterAcquiringPrimaryState();
+ }
+ }
+ else {
+ // release primary lock AFTER releasing sync
+ releasePrimaryLock();
+ }
+ } finally {
+ activePrimaryMoveLock.unlock();
+ }
+ }
+ }
+
+ private void invokePartitionListeners() {
+ PartitionListener[] listeners = this.pRegion
+ .getPartitionListeners();
+ if (listeners == null || listeners.length == 0) {
+ return;
+ }
+ for (int i = 0; i < listeners.length; i++) {
+ PartitionListener listener = listeners[i];
+ if (listener != null) {
+ listener.afterPrimary(getBucket().getId());
+ }
+ }
+ }
+
+ /**
+ * Lazily gets the lock for acquiring primary lock. Caller must handle null.
+ * If DLS, Cache, or DistributedSystem are shutting down then null will be
+ * returned. If DLS does not yet exist and createDLS is false then null will
+ * be returned.
+ *
+ * @param createDLS true will create DLS if it does not exist
+ * @return distributed lock indicating primary member or null
+ */
+ DistributedMemberLock getPrimaryLock(boolean createDLS) {
+ synchronized(this) {
+ if (this.primaryLock == null) {
+ DistributedLockService dls = DistributedLockService.getServiceNamed(
+ PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+ if (dls == null) {
+ if (!createDLS || getProxyBucketRegion().getCache().isClosed()) {
+ return null; // cache closure has destroyed the DLS
+ }
+ try { // TODO: call GemFireCache#getPartitionedRegionLockService
+ dls = DLockService.create(
+ PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME,
+ getAdvisee().getSystem(),
+ true /*distributed*/,
+ true /*destroyOnDisconnect*/,
+ true /*automateFreeResources*/);
+ }
+ catch (IllegalArgumentException e) {
+ // indicates that the DLS is already created
+ dls = DistributedLockService.getServiceNamed(
+ PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
+ if (dls == null) {
+ // another thread destroyed DLS after this thread called create
+ return null; // ok, caller will loop if necessary
+ }
+ }
+ // TODO: we need a good NotConnectedException to replace
+ // IllegalStateException and ShutdownException
+ // perhaps: DistributedSystemUnavailableException
+ catch (IllegalStateException e) {
+ // create still throws IllegalStateException if isDisconnecting is true
+ return null;
+ }
+ catch (DistributedSystemDisconnectedException e) {
+ // this would certainly prevent us from creating a DLS... messy
+ return null;
+ }
+ }
+ this.primaryLock = new DistributedMemberLock(
+ dls,
+ getAdvisee().getName(), DistributedMemberLock.NON_EXPIRING_LEASE,
+ DistributedMemberLock.LockReentryPolicy.PREVENT_SILENTLY);
+ }
+ return this.primaryLock;
+ }
+ }
+
+ protected void acquirePrimaryRecursivelyForColocated() {
+ final List<PartitionedRegion> colocatedWithList = ColocationHelper
+ .getColocatedChildRegions(regionAdvisor.getPartitionedRegion());
+ if (colocatedWithList != null) {
+ for (PartitionedRegion childPR : colocatedWithList) {
+ Bucket b = childPR.getRegionAdvisor().getBucket(getBucket().getId());
+ BucketAdvisor childBA = b.getBucketAdvisor();
+ Assert.assertHoldsLock(childBA, false);
+ boolean acquireForChild = false;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("BucketAdvisor.acquirePrimaryRecursivelyForColocated: about to take lock for bucket: {} of PR: {} with isHosting={}",
+ getBucket().getId(), childPR.getFullPath(), childBA.isHosting());
+ }
+ childBA.activePrimaryMoveLock.lock();
+ try {
+ if (childBA.isHosting()) {
+ if (isPrimary()) {
+ if (!childBA.isPrimary()) {
+ childBA.setVolunteering();
+ boolean acquired = childBA.acquiredPrimaryLock();
+ acquireForChild = true;
+ if (acquired && this.pRegion.isFixedPartitionedRegion()) {
+ childBA.acquirePrimaryForRestOfTheBucket();
+ }
+ }else {
+ acquireForChild = true;
+ }
+ }
+ } // if isHosting
+ if (acquireForChild) {
+ childBA.acquirePrimaryRecursivelyForColocated();
+ }
+ }
+ finally {
+ childBA.activePrimaryMoveLock.unlock();
+ }
+ }
+ }
+ }
+
+ protected void acquirePrimaryForRestOfTheBucket() {
+ List<FixedPartitionAttributesImpl> fpas = this.pRegion
+ .getFixedPartitionAttributesImpl();
+ if (fpas != null) {
+ int bucketId = getBucket().getId();
+ for (FixedPartitionAttributesImpl fpa : fpas) {
+ if (fpa.getStartingBucketID() == bucketId) {
+ for (int i = bucketId + 1; i <= fpa.getLastBucketID();) {
+ Bucket b = regionAdvisor.getBucket(i++);
+ if (b != null) {
+ BucketAdvisor ba = b.getBucketAdvisor();
+ ba.activePrimaryMoveLock.lock();
+ try {
+ if (ba.isHosting()) {
+ if (!ba.isPrimary()) {
+ ba.setVolunteering();
+ ba.acquiredPrimaryLock();
+ }
+ }
+ } finally {
+ ba.activePrimaryMoveLock.unlock();
+ }
+ }
+ }
+ }
+ else {
+ continue;
+ }
+ }
+ }
+ }
+
+ /**
+ * Sets volunteering to true. Returns true if the state of volunteering was
+ * changed. Returns false if voluntering was already equal to true. Caller
+ * should do nothing if false is returned.
+ */
+ protected boolean setVolunteering() {
+ synchronized(this) {
+ return requestPrimaryState(VOLUNTEERING_HOSTING);
+ }
+ }
+
+ /**
+ * Sets becoming primary to true. Returns true if the state of becoming was
+ * changed. Returns false if becoming was already equal to true. Caller
+ * should do nothing if false is returned.
+ */
+ protected boolean setBecoming() {
+ synchronized(this) {
+ return requestPrimaryState(BECOMING_HOSTING);
+ }
+ }
+
+ /**
+ * Wait briefly for a primary member to be identified.
+ *
+ * @param timeout time in milliseconds to wait for a primary
+ * @return the primary bucket host
+ */
+ protected InternalDistributedMember waitForPrimaryMember(long timeout) {
+ synchronized (this) {
+ // let's park this thread and wait for a primary!
+ StopWatch timer = new StopWatch(true);
+ long warnTime = getDistributionManager().getConfig().getAckWaitThreshold() * 1000L;
+ boolean loggedWarning = false;
+ try {
+ for (;;) {
+ // bail out if the system starts closing
+ this.getAdvisee().getCancelCriterion().checkCancelInProgress(null);
+ final GemFireCacheImpl cache = (GemFireCacheImpl)getBucket().getCache();
+ if (cache != null && cache.isCacheAtShutdownAll()) {
+ throw new CacheClosedException("Cache is shutting down");
+ }
+
+ if (getBucketRedundancy() == -1 ) {
+ // there are no real buckets in other vms... no reason to wait
+ return null;
+ }
+ getProxyBucketRegion().getPartitionedRegion().checkReadiness();
+ if (isClosed()) {
+ break;
+ }
+ long elapsed = timer.elapsedTimeMillis();
+ long timeLeft = timeout - elapsed;
+ if (timeLeft <= 0) {
+ break;
+ }
+ if (getBucketRedundancy() == -1 || isClosed() ) {
+ break; // early out... all bucket regions are gone or we closed
+ }
+ InternalDistributedMember primary = basicGetPrimaryMember();
+ if (primary != null) {
+ return primary;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for bucket {}. Time left :{} ms", this, timeLeft);
+ }
+
+ //Log a warning if we have waited for the ack wait threshold time.
+ if(!loggedWarning) {
+ long timeUntilWarning = warnTime - elapsed;
+ if(timeUntilWarning <= 0 ) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.BucketAdvisor_WAITING_FOR_PRIMARY,
+ new Object[] {warnTime / 1000L, this, this.adviseInitialized()}));
+ //log a warning;
+ loggedWarning=true;
+ } else {
+ timeLeft = timeLeft > timeUntilWarning ? timeUntilWarning : timeLeft;
+ }
+ }
+ this.wait(timeLeft); // spurious wakeup ok
+ }
+ }
+ catch (InterruptedException e) {
+ // abort and return null
+ Thread.currentThread().interrupt();
+ } finally {
+ if(loggedWarning) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.BucketAdvisor_WAITING_FOR_PRIMARY_DONE));
+ }
+ }
+ return null;
+ }
+ }
+
+ /**
+ * How long to wait, in millisecs, for redundant buckets to exist
+ */
+ private final static long BUCKET_REDUNDANCY_WAIT = 15000L; // 15 seconds
+
+ /**
+ * Wait the desired redundancy to be met.
+ * @param minRedundancy the amount of desired redundancy.
+ * @return true if desired redundancy is detected
+ */
+ public boolean waitForRedundancy(int minRedundancy) {
+ synchronized (this) {
+ // let's park this thread and wait for redundancy!
+ StopWatch timer = new StopWatch(true);
+ try {
+ for (;;) {
+ if (getBucketRedundancy() >= minRedundancy) {
+ return true;
+ }
+ getProxyBucketRegion().getPartitionedRegion().checkReadiness();
+ if (isClosed()) {
+ return false;
+ }
+ long timeLeft = BUCKET_REDUNDANCY_WAIT - timer.elapsedTimeMillis();
+ if (timeLeft <= 0) {
+ return false;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for bucket {}", this);
+ }
+ this.wait(timeLeft); // spurious wakeup ok
+ }
+ }
+ catch (InterruptedException e) {
+ // abort and return null
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+ }
+
+ private final static long BUCKET_STORAGE_WAIT = Long.getLong("gemfire.BUCKET_STORAGE_WAIT", 15000).longValue(); // 15 seconds
+
+ public boolean waitForStorage() {
+ synchronized (this) {
+ // let's park this thread and wait for storage!
+ StopWatch timer = new StopWatch(true);
+ try {
+ for (;;) {
+ if (this.regionAdvisor.isBucketLocal(getBucket().getId())) {
+ return true;
+ }
+ getProxyBucketRegion().getPartitionedRegion().checkReadiness();
+ if (isClosed()) {
+ return false;
+ }
+ long timeLeft = BUCKET_STORAGE_WAIT - timer.elapsedTimeMillis();
+ if (timeLeft <= 0) {
+ return false;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Waiting for bucket storage" + this);
+ }
+ this.wait(timeLeft); // spurious wakeup ok
+ }
+ }
+ catch (InterruptedException e) {
+ // abort and return null
+ Thread.currentThread().interrupt();
+ }
+ return false;
+ }
+ }
+ public void clearPrimaryElector() {
+ synchronized(this) {
+ primaryElector = null;
+ }
+ }
+
+ public void setPrimaryElector(InternalDistributedMember newPrimaryElector) {
+ synchronized(this) {
+ //Only set the new primary elector if we have not yet seen
+ //a primary for this bucket.
+ if(primaryElector != null) {
+ this.primaryElector = newPrimaryElector;
+ }
+ }
+ }
+
+
+ public synchronized void initializePrimaryElector(InternalDistributedMember primaryElector) {
+ //For child buckets, we want the parent bucket to take care'
+ //of finishing an incomplete bucket creation, so only set the elector for
+ //the leader region.
+ if(parentAdvisor == null) {
+ this.primaryElector = primaryElector;
+ }
+ }
+
+ /**
+ * Invoked when real bucket is created for hosting in this VM.
+ *
+ * @param value true to begin hosting; false to end hosting
+ */
+ protected void setHosting(boolean value) {
+ //boolean needToNotPrimarySelf = false;
+ boolean needToVolunteerForPrimary = false;
+ boolean wasPrimary = false;
+ synchronized(this) {
+ wasPrimary = isPrimary();
+ if (isClosed()) {
+ return;
+ }
+ if (value) { // setting to HOSTING...
+ if (hasPrimary()) {
+ requestPrimaryState(OTHER_PRIMARY_HOSTING);
+ }
+ else {
+ requestPrimaryState(NO_PRIMARY_HOSTING);
+ needToVolunteerForPrimary = true;
+ }
+ }
+
+ else { // setting to NOT_HOSTING...
+ if (hasPrimary()) { // has primary...
+ if (isPrimary()) {
+ requestPrimaryState(NO_PRIMARY_NOT_HOSTING);
+ this.primaryMember.set(null);
+ findAndSetPrimaryMember();
+ }
+ else {
+ requestPrimaryState(OTHER_PRIMARY_NOT_HOSTING);
+ }
+ }
+ else { // no primary...
+ // acquiredPrimaryLock will check isHosting and release if not hosting
+ requestPrimaryState(NO_PRIMARY_NOT_HOSTING);
+ }
+ }
+ this.volunteeringDelegate = null;
+
+ //Note - checkRedundancy has the side effect that it updates the stats.
+ //We need to invoke checkRedundancy here, regardless of whether we
+ //need this notify.
+ if (updateRedundancy() > 0 && isHosting()) {
+ // wake up any threads in waitForRedundancy or waitForPrimary
+ this.notifyAll();
+ }
+ }
+ if(wasPrimary) {
+ releasePrimaryLock();
+ }
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("setHosting: {} needToVolunteerForPrimary={} primaryElector: {}", this, needToVolunteerForPrimary, primaryElector);
+ }
+ /*if (needToNotPrimarySelf) {
+ notPrimary(getAdvisee().getDistributionManager().getId());
+ }*/
+ if (needToVolunteerForPrimary) {
+ if(this.primaryElector == null) {
+ volunteerForPrimary();
+ }
+ }
+
+ sendProfileUpdate();
+ }
+
+ /**
+ * Sends updated profile for this member to every member with the
+ * <code>PartitionedRegion</code>.
+ * <p>
+ * Never call this method while synchronized on this BucketAdvisor. This will
+ * result in distributed deadlocks.
+ */
+ private void sendProfileUpdate() {
+ if (this.getDistributionManager().getSystem().isLoner()) {
+ // no one to send the profile update... return to prevent bug 39760
+ return;
+ }
+ // make sure caller is not synchronized or we'll deadlock
+ Assert.assertTrue( ! Thread.holdsLock(this),
+ "Attempting to sendProfileUpdate while synchronized may result in deadlock");
+ // NOTE: if this assert fails, you COULD use the WaitingThreadPool in DM
+
+ final int partitionedRegionId = this.pRegion.getPRId();
+ final int bucketId = ((ProxyBucketRegion) getAdvisee()).getBucketId();
+
+ BucketProfile bp = (BucketProfile) createProfile();
+ updateServerBucketProfile(bp);
+ InternalDistributedMember primary = basicGetPrimaryMember();
+ HashSet hostsAndProxyMembers = new HashSet();
+ if (primary != null && !primary.equals(getDistributionManager().getId())) {
+ hostsAndProxyMembers.add(primary); // Add the primary
+ }
+ hostsAndProxyMembers.addAll(adviseGeneric()); // Add all members hosting the bucket
+ hostsAndProxyMembers.addAll(adviseProfileUpdate()); // Add all proxy instances that could use the bucket
+ ReplyProcessor21 reply = BucketProfileUpdateMessage.send(hostsAndProxyMembers, getDistributionManager(),
+ partitionedRegionId, bucketId, bp, true);
+ if(reply != null) {
+ reply.waitForRepliesUninterruptibly();
+ }
+ }
+
+ /**
+ * Returns true if the a primary is known.
+ */
+ private boolean hasPrimary() {
+ synchronized(this) {
+ return this.primaryState == OTHER_PRIMARY_NOT_HOSTING ||
+ this.primaryState == OTHER_PRIMARY_HOSTING ||
+ this.primaryState ==IS_PRIMARY_HOSTING;
+ }
+ }
+
+ @Override
+ protected Profile instantiateProfile(InternalDistributedMember memberId,
+ int version) {
+ if (!this.pRegion.isShadowPR()) {
+ GemFireCacheImpl c = getProxyBucketRegion().getCache();
+ List servers = null;
+ servers = c.getCacheServers();
+
+ HashSet<BucketServerLocation66> serverLocations = new HashSet<BucketServerLocation66>();
+ for (Object object : servers) {
+ CacheServerImpl server = (CacheServerImpl)object;
+ if (server.isRunning() && (server.getExternalAddress() != null)) {
+ BucketServerLocation66 location = new BucketServerLocation66(
+ getBucket().getId(), server.getPort(), server
+ .getExternalAddress()
+ /* .getExternalAddress(false/ checkServerRunning ) */,
+ getBucket().isPrimary(), Integer.valueOf(version).byteValue(),
+ server.getCombinedGroups());
+ serverLocations.add(location);
+ }
+ }
+ if (serverLocations.size() > 0) {
+ return new ServerBucketProfile(memberId, version, getBucket(),
+ serverLocations);
+ }
+ }
+ return new BucketProfile(memberId, version, getBucket());
+ }
+
+ /**
+ * Sets primaryMember and notifies all. Caller must be synced on this.
+ *
+ * @param id the member to use as primary for this bucket
+ */
+ void setPrimaryMember(InternalDistributedMember id) {
+ if (!getDistributionManager().getId().equals(id)) {
+ // volunteerForPrimary handles primary state change if its our id
+ if (isHosting()) {
+ requestPrimaryState(OTHER_PRIMARY_HOSTING);
+ }
+ else {
+ requestPrimaryState(OTHER_PRIMARY_NOT_HOSTING);
+ }
+ }
+ this.primaryMember.set(id);
+ this.everHadPrimary = true;
+
+ if(id != null && id.equals(primaryElector)) {
+ primaryElector = null;
+ }
+ this.notifyAll(); // wake up any threads in waitForPrimaryMember
+ }
+
+ public void setHadPrimary() {
+ this.everHadPrimary = true;
+ }
+
+ public boolean getHadPrimary() {
+ return this.everHadPrimary;
+ }
+
+ public InternalDistributedMember getPrimaryElector() {
+ return primaryElector;
+ }
+
+ /**
+ * Determine if there has been a change in redundancy and alter the
+ * lowRedundancyBucketCount stat as needed.
+ *
+ * Also updates a counter used to track the redundancy of this member
+ *
+ * @return current number of hosts for this bucket
+ * @see #redundancySatisfied
+ * @see PartitionedRegionStats#incLowRedundancyBucketCount(int)
+ * @guarded.By this
+ */
+ private int updateRedundancy() {
+ int desiredRedundancy =
+ this.pRegion.getRedundantCopies();
+ int numBucketHosts = getNumInitializedBuckets();
+ if (isClosed()) {
+ return numBucketHosts;
+ }
+ int actualRedundancy = numBucketHosts - 1;
+ this.redundancy = actualRedundancy;
+ if (this.redundancySatisfied &&
+ numBucketHosts > 0 &&
+ actualRedundancy < desiredRedundancy) {
+ incLowRedundancyBucketCount(1);
+ this.redundancySatisfied = false;
+ }
+ else if (!this.redundancySatisfied &&
+ numBucketHosts > 0 &&
+ actualRedundancy >= desiredRedundancy){
+ incLowRedundancyBucketCount(-1);
+ this.redundancySatisfied = true;
+ this.redundancyEverSatisfied = true;
+ }
+ return numBucketHosts;
+ }
+
+ /**
+ * Returns all {@link InternalDistributedMember}s currently
+ * flagged as primary.
+ * <p>
+ * Since profile messages may arrive out of order from different members,
+ * more than one member may temporarily be flagged as primary.
+ * <p>
+ * The user of this BucketAdvisor should simply assume that the first
+ * profile is primary until the dust settles, leaving only one primary
+ * profile.
+ *
+ * @return zero or greater array of primary members
+ */
+ private InternalDistributedMember[] findPrimaryMembers() {
+ Set primaryMembers = adviseFilter(new Filter() {
+ public boolean include(Profile profile) {
+ assert profile instanceof BucketProfile;
+ BucketProfile srp = (BucketProfile) profile;
+ return srp.isPrimary;
+ }
+ });
+ if (primaryMembers.size() > 1 && logger.isDebugEnabled()) {
+ logger.debug("[findPrimaryProfiles] found the following primary members for {}: {}", getAdvisee().getName(), primaryMembers);
+ }
+ return (InternalDistributedMember[]) primaryMembers.toArray(
+ new InternalDistributedMember[primaryMembers.size()]);
+ }
+
+ /**
+ * Searches through profiles to find first profile that is flagged as
+ * primary and sets {@link #primaryMember} to it. Caller must synchronize
+ * on this BucketAdvisor.
+ *
+ * @return true if a primary member was found and used
+ * @see #findAndSetPrimaryMember()
+ */
+ boolean findAndSetPrimaryMember() {
+ if(isPrimary()) {
+ setPrimaryMember(this.getDistributionManager().getDistributionManagerId());
+ return true;
+ }
+ InternalDistributedMember[] primaryMembers = findPrimaryMembers();
+ if (primaryMembers.length > 0) {
+ setPrimaryMember(primaryMembers[0]);
+ return true;
+ }
+ else {
+ return false;
+ }
+ }
+
+ /**
+ * Returns the current redundancy of the this bucket, including the locally
+ * hosted bucket if it exists.
+ *
+ * @return current number of hosts of this bucket ; -1 if there are no hosts
+ */
+ public final int getBucketRedundancy() {
+ return redundancy;
+ }
+
+ public Set<InternalDistributedMember> adviseInitialized() {
+ return adviseFilter(new Filter() {
+ public boolean include(Profile profile) {
+ assert profile instanceof BucketProfile;
+ BucketProfile bucketProfile = (BucketProfile) profile;
+ return bucketProfile.isHosting;
+ }
+ });
+
+ }
+
+ public Set<InternalDistributedMember> adviseRecoveredFromDisk() {
+ return regionAdvisor.adviseInitializedDataStore();
+ }
+
+ /**
+ * Get the number of members that are hosting the bucket, and have
+ * finished initialization.
+ *
+ * This method is currently only used to check the bucket
+ * redundancy just before creating the bucket. If it is used
+ * more frequently, it might be better to cache this count.
+ */
+ private int getNumInitializedBuckets() {
+ Profile[] locProfiles = this.profiles; // grab current profiles
+ int count = 0;
+ for(Profile profile: locProfiles) {
+ BucketProfile bucketProfile = (BucketProfile) profile;
+ if(bucketProfile.isHosting) {
+ count++;
+ }
+ }
+ if(isHosting()) {
+ count++;
+ }
+ return count;
+ }
+
+ private Bucket getBucket() {
+ return (Bucket)getAdvisee();
+ }
+
+ /**
+ * Releases the primary lock for this bucket.
+ */
+ protected void releasePrimaryLock() {
+ //We don't have a lock if we have a parent advisor
+ if(parentAdvisor != null) {
+ return;
+ }
+ if (startingBucketAdvisor == null) {
+ assignStartingBucketAdvisor();
+ if (startingBucketAdvisor != null) {
+ return;
+ }
+ } else {
+ return;
+ }
+ // TODO fix this method to not release any locks if the
+ // redundancy is zero, since no locks are grabbed.
+ try {
+ DistributedMemberLock thePrimaryLock = getPrimaryLock(false);
+ if (thePrimaryLock != null) {
+ thePrimaryLock.unlock();
+ }
+ else {
+ // InternalDistributedSystem.isDisconnecting probably prevented us from
+ // creating the DLS... hope there's a thread closing this advisor but
+ // it's probably not safe to assert that it already happened
+ }
+ }
+ catch (LockNotHeldException e) {
+ Assert.assertTrue(!isHosting(),
+ "Got LockNotHeldException for Bucket = " + this);
+ }
+ catch (LockServiceDestroyedException e) {
+ Assert.assertTrue(isClosed(),
+ "BucketAdvisor was not closed before destroying PR lock service");
+ }
+ }
+
+ private String primaryStateToString() {
+ return primaryStateToString(this.primaryState);
+ }
+
+ /**
+ * Returns string representation of the primary state value.
+ * @param value the primary state to return string for
+ * @return string representation of primaryState
+ */
+ private String primaryStateToString(byte value) {
+ switch (value) {
+ case NO_PRIMARY_NOT_HOSTING:
+ return "NO_PRIMARY_NOT_HOSTING";
+ case NO_PRIMARY_HOSTING:
+ return "NO_PRIMARY_HOSTING";
+ case OTHER_PRIMARY_NOT_HOSTING:
+ return "OTHER_PRIMARY_NOT_HOSTING";
+ case OTHER_PRIMARY_HOSTING:
+ return "OTHER_PRIMARY_HOSTING";
+ case VOLUNTEERING_HOSTING:
+ return "VOLUNTEERING_HOSTING";
+ case BECOMING_HOSTING:
+ return "BECOMING_HOSTING";
+ case IS_PRIMARY_HOSTING:
+ return "IS_PRIMARY_HOSTING";
+ case CLOSED:
+ return "CLOSED";
+ default:
+ return "<unhandled primaryState " + value + " >";
+ }
+ }
+
+ /**
+ * Requests change to the requested primary state. Controls all state
+ * changes pertaining to primary state. Caller must be synchronized on this.
+ *
+ * @param requestedState primaryState to change to
+ * @return true if the requestedState change was completed
+ * @throws IllegalStateException if an illegal state change was attempted
+ */
+ private boolean requestPrimaryState(byte requestedState) {
+ final byte fromState = this.primaryState;
+ switch (fromState) {
+ case NO_PRIMARY_NOT_HOSTING:
+ switch (requestedState) {
+ case NO_PRIMARY_NOT_HOSTING:
+ // race condition ok, return false
+ return false;
+ case NO_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ break;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ this.primaryState = requestedState;
+ break;
+ case OTHER_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ break;
+ case BECOMING_HOSTING:
+ // race condition during close is ok, return false
+ return false;
+ case VOLUNTEERING_HOSTING:
+ // race condition during close is ok, return false
+ return false;
+ case CLOSED:
+ this.primaryState = requestedState;
+ break;
+ default:
+ throw new IllegalStateException(LocalizedStrings.BucketAdvisor_CANNOT_CHANGE_FROM_0_TO_1.toLocalizedString(new Object[] {this.primaryStateToString(), this.primaryStateToString(requestedState)}));
+ }
+ break;
+ case NO_PRIMARY_HOSTING:
+ switch (requestedState) {
+ case NO_PRIMARY_NOT_HOSTING:
+ this.primaryState = requestedState;
+ break;
+// case OTHER_PRIMARY_NOT_HOSTING: -- enable for bucket migration
+// this.primaryState = requestedState;
+// break;
+ case NO_PRIMARY_HOSTING:
+ // race condition ok, return false
+ return false;
+ case VOLUNTEERING_HOSTING:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.putStartTime(this, stats.startVolunteering());
+ }
+ break;
+ case BECOMING_HOSTING:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.putStartTime(this, stats.startVolunteering());
+ }
+ break;
+ case OTHER_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ break;
+ case CLOSED:
+ this.primaryState = requestedState;
+ break;
+ default:
+ throw new IllegalStateException(LocalizedStrings.BucketAdvisor_CANNOT_CHANGE_FROM_0_TO_1.toLocalizedString(new Object[] {this.primaryStateToString(), this.primaryStateToString(requestedState)}));
+ }
+ break;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ switch (requestedState) {
+ case NO_PRIMARY_NOT_HOSTING:
+ this.primaryState = requestedState;
+ break;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ // race condition ok, return false
+ return false;
+ case OTHER_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ break;
+ case BECOMING_HOSTING:
+ // race condition during close is ok, return false
+ return false;
+ case VOLUNTEERING_HOSTING:
+ // race condition during close is ok, return false
+ return false;
+ case CLOSED:
+ this.primaryState = requestedState;
+ break;
+ default:
+ throw new IllegalStateException(LocalizedStrings.BucketAdvisor_CANNOT_CHANGE_FROM_0_TO_1.toLocalizedString(new Object[] {this.primaryStateToString(), this.primaryStateToString(requestedState)}));
+ }
+ break;
+ case OTHER_PRIMARY_HOSTING:
+ switch (requestedState) {
+// case NO_PRIMARY_NOT_HOSTING: -- enable for bucket migration
+// this.primaryState = requestedState;
+// break;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ // May occur when setHosting(false) is called
+ this.primaryState = requestedState;
+ break;
+ case OTHER_PRIMARY_HOSTING:
+ // race condition ok, return false
+ return false;
+ case NO_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ break;
+ case CLOSED:
+ this.primaryState = requestedState;
+ break;
+ case VOLUNTEERING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case BECOMING_HOSTING:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.putStartTime(this, stats.startVolunteering());
+ }
+ break;
+ case IS_PRIMARY_HOSTING:
+ // race condition ok, probably race in HA where other becomes
+ // primary and immediately leaves while we have try-lock message
+ // enroute to grantor
+ this.primaryState = requestedState;
+ break;
+ default:
+ throw new IllegalStateException(LocalizedStrings.BucketAdvisor_CANNOT_CHANGE_FROM_0_TO_1.toLocalizedString(new Object[] {this.primaryStateToString(), this.primaryStateToString(requestedState)}));
+ }
+ break;
+ case VOLUNTEERING_HOSTING:
+ switch (requestedState) {
+ case NO_PRIMARY_NOT_HOSTING:
+ // May occur when setHosting(false) is called
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.endVolunteeringClosed(stats.removeStartTime(this));
+ }
+ break;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ // May occur when setHosting(false) is called
+ // Profile update for other primary may have slipped in
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.endVolunteeringClosed(stats.removeStartTime(this));
+ }
+ break;
+ case NO_PRIMARY_HOSTING:
+ // race condition occurred, return false and stay in volunteering
+ return false;
+ case IS_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.incPrimaryBucketCount(1);
+ stats.endVolunteeringBecamePrimary(stats.removeStartTime(this));
+ }
+ break;
+ case OTHER_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.endVolunteeringOtherPrimary(stats.removeStartTime(this));
+ }
+ break;
+ case VOLUNTEERING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case BECOMING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case CLOSED:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.endVolunteeringClosed(stats.removeStartTime(this));
+ }
+ break;
+ default:
+ throw new IllegalStateException(LocalizedStrings.BucketAdvisor_CANNOT_CHANGE_FROM_0_TO_1.toLocalizedString(new Object[] {this.primaryStateToString(), this.primaryStateToString(requestedState)}));
+ }
+ break;
+ case BECOMING_HOSTING:
+ switch (requestedState) {
+ case NO_PRIMARY_NOT_HOSTING:
+ // May occur when setHosting(false) is called
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.endVolunteeringClosed(stats.removeStartTime(this));
+ }
+ break;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ // May occur when setHosting(false) is called
+ // Profile update for other primary may have slipped in
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.endVolunteeringClosed(stats.removeStartTime(this));
+ }
+ break;
+ case NO_PRIMARY_HOSTING:
+ // race condition occurred, return false and stay in volunteering
+ return false;
+ case IS_PRIMARY_HOSTING:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.incPrimaryBucketCount(1);
+ stats.endVolunteeringBecamePrimary(stats.removeStartTime(this));
+ }
+ break;
+ case OTHER_PRIMARY_HOSTING:
+ return false;
+ case VOLUNTEERING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case BECOMING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case CLOSED:
+ this.primaryState = requestedState;
+ {
+ PartitionedRegionStats stats = getPartitionedRegionStats();
+ stats.endVolunteeringClosed(stats.removeStartTime(this));
+ }
+ break;
+ default:
+ throw new IllegalStateException(LocalizedStrings.BucketAdvisor_CANNOT_CHANGE_FROM_0_TO_1.toLocalizedString(new Object[] {this.primaryStateToString(), this.primaryStateToString(requestedState)}));
+ }
+ break;
+ case IS_PRIMARY_HOSTING:
+ switch (requestedState) {
+ case NO_PRIMARY_HOSTING:
+ // rebalancing must have moved the primary
+ changeFromPrimaryTo(requestedState);
+ break;
+// case OTHER_PRIMARY_HOSTING: -- enable for bucket migration
+// // rebalancing must have moved the primary
+// changeFromPrimaryTo(requestedState);
+// break;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ // rebalancing must have moved the primary and primary
+ changeFromPrimaryTo(requestedState);
+ break;
+ case NO_PRIMARY_NOT_HOSTING:
+ // May occur when setHosting(false) is called due to closing
+ changeFromPrimaryTo(requestedState);
+ break;
+ case VOLUNTEERING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case BECOMING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case CLOSED:
+ changeFromPrimaryTo(requestedState);
+ break;
+ default:
+ throw new IllegalStateException(
+ "Cannot change from " + this.primaryStateToString() +
+ " to " + this.primaryStateToString(requestedState));
+ }
+ break;
+ case CLOSED:
+ switch (requestedState) {
+ case CLOSED:
+ Exception e = new Exception(LocalizedStrings.BucketAdvisor_ATTEMPTED_TO_CLOSE_BUCKETADVISOR_THAT_IS_ALREADY_CLOSED.toLocalizedString());
+ logger.warn(LocalizedMessage.create(LocalizedStrings.BucketAdvisor_ATTEMPTED_TO_CLOSE_BUCKETADVISOR_THAT_IS_ALREADY_CLOSED), e);
+ break;
+ case VOLUNTEERING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case BECOMING_HOSTING:
+ // race condition ok, return false to abort volunteering
+ return false;
+ case IS_PRIMARY_HOSTING:
+ // Commonly occurs when closing and volunteering thread is still running
+ return false;
+ case OTHER_PRIMARY_NOT_HOSTING:
+ // Commonly occurs when a putProfile occurs during closure
+ return false;
+ default:
+ throw new IllegalStateException(LocalizedStrings.BucketAdvisor_CANNOT_CHANGE_FROM_0_TO_1_FOR_BUCKET_2.toLocalizedString(new Object[] {this.primaryStateToString(), this.primaryStateToString(requestedState), getAdvisee().getName()}));
+ }
+ }
+ return this.primaryState == requestedState;
+ }
+
+ private void changeFromPrimaryTo(byte requestedState) {
+ try {
+ this.primaryState = requestedState;
+ }
+ finally {
+ getPartitionedRegionStats().incPrimaryBucketCount(-1);
+ }
+ }
+
+ @Override
+ public Set adviseDestroyRegion() {
+ // fix for bug 37604 - tell all owners of the pr that the bucket is being
+ // destroyed. This is needed when bucket cleanup is performed
+ return this.regionAdvisor.adviseAllPRNodes();
+ }
+
+ /**
+ * returns the set of all the members in the system which require both
+ * DistributedCacheOperation messages and notification-only partition
+ * messages
+
<TRUNCATED>