You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/20 00:59:53 UTC
[35/51] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index fae381f,0000000..69f61c4
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@@ -1,2614 -1,0 +1,2622 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.CopyHelper;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.DeltaSerializationException;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.InvalidDeltaException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheWriter;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+import com.gemstone.gemfire.cache.DiskAccessException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAlgorithm;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.EvictionCriteria;
+import com.gemstone.gemfire.cache.ExpirationAction;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.cache.partition.PartitionListener;
+import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.AtomicLongWithTerminalState;
+import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.DistributionStats;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile;
+import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
+import com.gemstone.gemfire.internal.cache.control.MemoryEvent;
+import com.gemstone.gemfire.internal.cache.delta.Delta;
+import com.gemstone.gemfire.internal.cache.partitioned.Bucket;
+import com.gemstone.gemfire.internal.cache.partitioned.DestroyMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.InvalidateMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.LockObject;
+import com.gemstone.gemfire.internal.cache.partitioned.PRTombstoneMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
+import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientTombstoneMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage;
+import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.concurrent.Atomics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+import com.gemstone.gemfire.internal.concurrent.AtomicLong5;
+
+
+/**
+ * The storage used for a Partitioned Region.
+ * This class asserts distributed scope as well as a replicate data policy
+ * It does not support transactions
+ *
+ * Primary election for a BucketRegion can be found in the
+ * {@link com.gemstone.gemfire.internal.cache.BucketAdvisor} class
+ *
+ * @author Mitch Thomas
+ * @since 5.1
+ *
+ */
+public class BucketRegion extends DistributedRegion
+implements Bucket
+{
+ private static final Logger logger = LogService.getLogger();
+
+ public static final RawValue NULLVALUE = new RawValue(null);
+ public static final RawValue REQUIRES_ENTRY_LOCK = new RawValue(null);
+ /**
+ * A special value for the bucket size indicating that this bucket
+ * has been destroyed.
+ */
+ private static final long BUCKET_DESTROYED = Long.MIN_VALUE;
+ private AtomicLong counter = new AtomicLong();
+ private AtomicLong limit;
+ private final AtomicLong numOverflowOnDisk = new AtomicLong();
+ private final AtomicLong numOverflowBytesOnDisk = new AtomicLong();
+ private final AtomicLong numEntriesInVM = new AtomicLong();
+ private final AtomicLong evictions = new AtomicLong();
+
+ /**
+ * Contains size in bytes of the values stored
+ * in theRealMap. Sizes are tallied during put and remove operations.
+ */
+ private final AtomicLongWithTerminalState bytesInMemory = new AtomicLongWithTerminalState();
+
+ public static final class RawValue {
+ private final Object rawValue;
+ public RawValue(Object rawVal) {
+ this.rawValue = rawVal;
+ }
+
+ public final boolean isValueByteArray() {
+ return this.rawValue instanceof byte[];
+ }
+
+ public Object getRawValue() {
+ return this.rawValue;
+ }
+
+ public void writeAsByteArray(DataOutput out) throws IOException {
+ if (isValueByteArray()) {
+ DataSerializer.writeByteArray((byte[]) this.rawValue, out);
+ } else if (this.rawValue instanceof CachedDeserializable) {
+ ((CachedDeserializable)this.rawValue).writeValueAsByteArray(out);
+ } else if (Token.isInvalid(this.rawValue)) {
+ DataSerializer.writeByteArray(null, out);
+ } else if (this.rawValue == Token.TOMBSTONE) {
+ DataSerializer.writeByteArray(null, out);
+ } else {
+ DataSerializer.writeObjectAsByteArray(this.rawValue, out);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RawValue("+this.rawValue+")";
+ }
+
+ /**
+ * Return the de-serialized value without changing the stored form
+ * in the heap. This causes local access to create a de-serialized copy (extra work)
+ * in favor of keeping values in serialized form which is important because
+ * it makes remote access more efficient. This assumption is that remote
+ * access is much more frequent.
+ * TODO Unused, but keeping for potential performance boost when local Bucket
+ * access de-serializes the entry (which could hurt perf.)
+ *
+ * @return the de-serialized value
+ */
+ public Object getDeserialized(boolean copyOnRead) {
+ if (isValueByteArray()) {
+ if (copyOnRead) {
+ // TODO move this code to CopyHelper.copy?
+ byte[] src = (byte[])this.rawValue;
+ byte[] dest = new byte[src.length];
+ System.arraycopy(this.rawValue, 0, dest, 0, dest.length);
+ return dest;
+ } else {
+ return this.rawValue;
+ }
+ } else if (this.rawValue instanceof CachedDeserializable) {
+ if (copyOnRead) {
+ return ((CachedDeserializable)this.rawValue).getDeserializedWritableCopy(null, null);
+ } else {
+ return ((CachedDeserializable)this.rawValue).getDeserializedForReading();
+ }
+ } else if (Token.isInvalid(this.rawValue)) {
+ return null;
+ } else {
+ if (copyOnRead) {
+ return CopyHelper.copy(this.rawValue);
+ } else {
+ return this.rawValue;
+ }
+ }
+ }
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ private final int redundancy;
+
+ /** the partitioned region to which this bucket belongs */
+ private final PartitionedRegion partitionedRegion;
+ private final Map<Object, ExpiryTask> pendingSecondaryExpires = new HashMap<Object, ExpiryTask>();
+
+ /* one map per bucket region */
+ public HashMap allKeysMap = new HashMap();
+
+ static final boolean FORCE_LOCAL_LISTENERS_INVOCATION =
+ Boolean.getBoolean("gemfire.BucketRegion.alwaysFireLocalListeners");
+ // gemfire.BucktRegion.alwaysFireLocalListeners=true
+
+ private volatile AtomicLong5 eventSeqNum = null;
+
+ public AtomicLong5 getEventSeqNum() {
+ return eventSeqNum;
+ }
+
+ protected final AtomicReference<HoplogOrganizer> hoplog = new AtomicReference<HoplogOrganizer>();
+
+ public BucketRegion(String regionName, RegionAttributes attrs,
+ LocalRegion parentRegion, GemFireCacheImpl cache,
+ InternalRegionArguments internalRegionArgs) {
+ super(regionName, attrs, parentRegion, cache, internalRegionArgs);
+ if(PartitionedRegion.DISABLE_SECONDARY_BUCKET_ACK) {
+ Assert.assertTrue(attrs.getScope().isDistributedNoAck());
+ }
+ else {
+ Assert.assertTrue(attrs.getScope().isDistributedAck());
+ }
+ Assert.assertTrue(attrs.getDataPolicy().withReplication());
+ Assert.assertTrue( ! attrs.getEarlyAck());
+ Assert.assertTrue(isUsedForPartitionedRegionBucket());
+ Assert.assertTrue( ! isUsedForPartitionedRegionAdmin());
+ Assert.assertTrue(internalRegionArgs.getBucketAdvisor() != null);
+ Assert.assertTrue(internalRegionArgs.getPartitionedRegion() != null);
+ this.redundancy = internalRegionArgs.getPartitionedRegionBucketRedundancy();
+ this.partitionedRegion = internalRegionArgs.getPartitionedRegion();
+ }
+
+ // Attempt to direct the GII process to the primary first
+ @Override
+ protected void initialize(InputStream snapshotInputStream,
+ InternalDistributedMember imageTarget,
+ InternalRegionArguments internalRegionArgs)
+ throws TimeoutException, IOException, ClassNotFoundException
+ {
+ // Set this region in the ProxyBucketRegion early so that profile exchange will
+ // perform the correct fillInProfile method
+ getBucketAdvisor().getProxyBucketRegion().setBucketRegion(this);
+ boolean success = false;
+ try {
+ if (this.partitionedRegion.isShadowPR()
+ && this.partitionedRegion.getColocatedWith() != null) {
+ PartitionedRegion parentPR = ColocationHelper
+ .getLeaderRegion(this.partitionedRegion);
+ BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById(
+ getId());
+ // needs to be set only once.
+ if (parentBucket.eventSeqNum == null) {
+ parentBucket.eventSeqNum = new AtomicLong5(getId());
+ }
+ }
+ if (this.partitionedRegion.getColocatedWith() == null) {
+ this.eventSeqNum = new AtomicLong5(getId());
+ } else {
+ PartitionedRegion parentPR = ColocationHelper
+ .getLeaderRegion(this.partitionedRegion);
+ BucketRegion parentBucket = parentPR.getDataStore().getLocalBucketById(
+ getId());
+ if (parentBucket == null && logger.isDebugEnabled()) {
+ logger.debug("The parentBucket of region {} bucketId {} is NULL", this.partitionedRegion.getFullPath(), getId());
+ }
+ Assert.assertTrue(parentBucket != null);
+ this.eventSeqNum = parentBucket.eventSeqNum;
+ }
+
+ final InternalDistributedMember primaryHolder =
+ getBucketAdvisor().basicGetPrimaryMember();
+ if (primaryHolder != null && ! primaryHolder.equals(getMyId())) {
+ // Ignore the provided image target, use an existing primary (if any)
+ super.initialize(snapshotInputStream, primaryHolder, internalRegionArgs);
+ } else {
+ super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
+ }
+
+ success = true;
+ } finally {
+ if(!success) {
+ removeFromPeersAdvisors(false);
+ getBucketAdvisor().getProxyBucketRegion().clearBucketRegion(this);
+ }
+ }
+ }
+
+
+
+ @Override
+ public void initialized() {
+ //announce that the bucket is ready
+ //setHosting performs a profile exchange, so there
+ //is no need to call super.initialized() here.
+ }
+
+ @Override
+ protected DiskStoreImpl findDiskStore(RegionAttributes ra, InternalRegionArguments internalRegionArgs) {
+ return internalRegionArgs.getPartitionedRegion().getDiskStore();
+ }
+
+ @Override
+ public void createEventTracker() {
+ this.eventTracker = new EventTracker(this);
+ this.eventTracker.start();
+ }
+
+ @Override
+ protected CacheDistributionAdvisor createDistributionAdvisor(
+ InternalRegionArguments internalRegionArgs){
+ return internalRegionArgs.getBucketAdvisor();
+ }
+
+ public BucketAdvisor getBucketAdvisor() {
+ return (BucketAdvisor) getDistributionAdvisor();
+ }
+
+ public boolean isHosting() {
+ return getBucketAdvisor().isHosting();
+ }
+
+ @Override
+ protected EventID distributeTombstoneGC(Set<Object> keysRemoved) {
+ EventID eventId = super.distributeTombstoneGC(keysRemoved);
+ if (keysRemoved != null && keysRemoved.size() > 0 && getFilterProfile() != null) {
+ // send the GC to members that don't have the bucket but have the PR so they
+ // can forward the event to clients
+ PRTombstoneMessage.send(this, keysRemoved, eventId);
+ }
+ return eventId;
+ }
+
+ @Override
+ protected void notifyClientsOfTombstoneGC(Map<VersionSource, Long> regionGCVersions, Set<Object>removedKeys, EventID eventID, FilterInfo routing) {
+ if (CacheClientNotifier.getInstance() != null) {
+ // Only route the event to clients interested in the partitioned region.
+ // We do this by constructing a region-level event and then use it to
+ // have the filter profile ferret out all of the clients that have interest
+ // in this region
+ FilterProfile fp = getFilterProfile();
+ if ((removedKeys != null && removedKeys.size() > 0) // bug #51877 - NPE in clients
+ && (routing != null || fp != null)) { // fix for bug #46309 - don't send null/empty key set to clients
+ RegionEventImpl regionEvent = new RegionEventImpl(getPartitionedRegion(), Operation.REGION_DESTROY, null, true, getMyId());
+ FilterInfo clientRouting = routing;
+ if (clientRouting == null) {
+ clientRouting = fp.getLocalFilterRouting(regionEvent);
+ }
+ regionEvent.setLocalFilterInfo(clientRouting);
+
+ ClientUpdateMessage clientMessage = ClientTombstoneMessage.gc(getPartitionedRegion(), removedKeys,
+ eventID);
+ CacheClientNotifier.notifyClients(regionEvent, clientMessage);
+ }
+ }
+ }
+
+ /**
+ * Search the CM for keys. If found any, return the first found one
+ * Otherwise, save the keys into the CM, and return null
+ * The thread will acquire the lock before searching.
+ *
+ * @param keys
+ * @return first key found in CM
+ * null means not found
+ */
+ private LockObject searchAndLock(Object keys[]) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ LockObject foundLock = null;
+
+ synchronized(allKeysMap) {
+ // check if there's any key in map
+ for (int i=0; i<keys.length; i++) {
+ if (allKeysMap.containsKey(keys[i])) {
+ foundLock = (LockObject)allKeysMap.get(keys[i]);
+ if (isDebugEnabled) {
+ logger.debug("LockKeys: found key: {}:{}", keys[i], foundLock.lockedTimeStamp);
+ }
+ break;
+ }
+ }
+
+ // save the keys when still locked
+ if (foundLock == null) {
+ for (int i=0; i<keys.length; i++) {
+ LockObject lockValue = new LockObject(keys[i], isDebugEnabled?System.currentTimeMillis():0);
+ allKeysMap.put(keys[i], lockValue);
+ if (isDebugEnabled) {
+ logger.debug("LockKeys: add key: {}:{}", keys[i], lockValue.lockedTimeStamp);
+ }
+ }
+ }
+ }
+
+ return foundLock;
+ }
+
+ /**
+ * After processed the keys, this method will remove them from CM.
+ * And notifyAll for each key.
+ * The thread needs to acquire lock of CM first.
+ *
+ * @param keys
+ */
+ public void removeAndNotifyKeys(Object keys[]) {
+ final boolean isTraceEnabled = logger.isTraceEnabled();
+
+ synchronized(allKeysMap) {
+ for (int i=0; i<keys.length; i++) {
+ LockObject lockValue = (LockObject)allKeysMap.remove(keys[i]);
+ if (lockValue != null) {
+ // let current thread become the monitor of the key object
+ synchronized (lockValue) {
+ lockValue.setRemoved();
+ if (isTraceEnabled) {
+ long waitTime = System.currentTimeMillis()-lockValue.lockedTimeStamp;
+ logger.trace("LockKeys: remove key {}, notifyAll for {}. It waited", keys[i], lockValue, waitTime);
+ }
+ lockValue.notifyAll();
+ }
+ }
+ } // for
+ }
+ }
+
+ /**
+ * Keep checking if CM has contained any key in keys. If yes, wait for notify,
+ * then retry again. This method will block current thread for long time.
+ * It only exits when current thread successfully save its keys into CM.
+ *
+ * @param keys
+ */
+ public void waitUntilLocked(Object keys[]) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+
+ final String title = "BucketRegion.waitUntilLocked:";
+ while (true) {
+ LockObject foundLock = searchAndLock(keys);
+
+ if (foundLock != null) {
+ synchronized(foundLock) {
+ try {
+ while (!foundLock.isRemoved()) {
+ this.partitionedRegion.checkReadiness();
+ foundLock.wait(1000);
+ // primary could be changed by prRebalancing while waiting here
+ checkForPrimary();
+ }
+ }
+ catch (InterruptedException e) {
+ // TODO this isn't a localizable string and it's being logged at info level
+ if (isDebugEnabled) {
+ logger.debug("{} interrupted while waiting for {}", title, foundLock, e.getMessage());
+ }
+ }
+ if (isDebugEnabled) {
+ long waitTime = System.currentTimeMillis()-foundLock.lockedTimeStamp;
+ logger.debug("{} waited {} ms to lock", title, waitTime, foundLock);
+ }
+ }
+ } else {
+ // now the keys have been locked by this thread
+ break;
+ } // to lock and process
+ } // while
+ }
+
+ // Entry (Put/Create) rules
+ // If this is a primary for the bucket
+ // 1) apply op locally, aka update or create entry
+ // 2) distribute op to bucket secondaries and bridge servers with synchrony on local entry
+ // 3) cache listener with synchrony on entry
+ // Else not a primary
+ // 1) apply op locally
+ // 2) update local bs, gateway
+ @Override
+ protected
+ boolean virtualPut(EntryEventImpl event,
+ boolean ifNew,
+ boolean ifOld,
+ Object expectedOldValue,
+ boolean requireOldValue,
+ long lastModified,
+ boolean overwriteDestroyed)
+ throws TimeoutException,
+ CacheWriterException {
+ beginLocalWrite(event);
+
+ try {
+ if (this.partitionedRegion.isParallelWanEnabled()) {
+ handleWANEvent(event);
+ }
+ if (!hasSeenEvent(event)) {
+ forceSerialized(event);
+ RegionEntry oldEntry = this.entries
+ .basicPut(event, lastModified, ifNew, ifOld, expectedOldValue,
+ requireOldValue, overwriteDestroyed);
+ return oldEntry != null;
+ }
+ if (event.getDeltaBytes() != null && event.getRawNewValue() == null) {
+ // This means that this event has delta bytes but no full value.
+ // Request the full value of this event.
+ // The value in this vm may not be same as this event's value.
+ throw new InvalidDeltaException(
+ "Cache encountered replay of event containing delta bytes for key "
+ + event.getKey());
+ }
+ // Forward the operation and event messages
+ // to members with bucket copies that may not have seen the event. Their
+ // EventTrackers will keep them from applying the event a second time if
+ // they've already seen it.
+ if (logger.isTraceEnabled(LogMarker.DM)) {
+ logger.trace(LogMarker.DM, "BR.virtualPut: this cache has already seen this event {}", event);
+ }
- distributeUpdateOperation(event, lastModified);
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ distributeUpdateOperation(event, lastModified);
++ }
+ return true;
+ } finally {
+ endLocalWrite(event);
+ }
+ }
+
+
+ public long generateTailKey() {
+ long key = this.eventSeqNum.addAndGet(this.partitionedRegion
+ .getTotalNumberOfBuckets());
+ if (key < 0
+ || key % getPartitionedRegion().getTotalNumberOfBuckets() != getId()) {
+ logger
+ .error(LocalizedMessage
+ .create(
+ LocalizedStrings.GatewaySender_SEQUENCENUMBER_GENERATED_FOR_EVENT_IS_INVALID,
+ new Object[] { key, getId() }));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("WAN: On primary bucket {}, setting the seq number as {}",
+ getId(), this.eventSeqNum.get());
+ }
+ return eventSeqNum.get();
+ }
+
+ public void handleWANEvent(EntryEventImpl event) {
+ if (this.eventSeqNum == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The bucket corresponding to this user bucket is not created yet. This event will not go to remote wan site. Event: {}", event);
+ }
+ }
+
+ if (!(this instanceof AbstractBucketRegionQueue)) {
+ if (getBucketAdvisor().isPrimary()) {
+ long key = this.eventSeqNum.addAndGet(this.partitionedRegion.getTotalNumberOfBuckets());
+ if (key < 0
+ || key % getPartitionedRegion().getTotalNumberOfBuckets() != getId()) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.GatewaySender_SEQUENCENUMBER_GENERATED_FOR_EVENT_IS_INVALID,
+ new Object[] { key, getId() }));
+ }
+ event.setTailKey(key);
+ if (logger.isDebugEnabled()) {
+ logger.debug("WAN: On primary bucket {}, setting the seq number as {}", getId(), this.eventSeqNum.get());
+ }
+ } else {
+ // Can there be a race here? Like one thread has done put in primary but
+ // its update comes later
+ // in that case its possible that a tail key is missed.
+ // we can handle that by only incrementing the tailKey and never
+ // setting it less than the current value.
+ Atomics.setIfGreater(this.eventSeqNum, event.getTailKey());
+ if (logger.isDebugEnabled()) {
+ logger.debug("WAN: On secondary bucket {}, setting the seq number as {}", getId(), event.getTailKey());
+ }
+ }
+ }
+ }
+ /**
+ * Fix for Bug#45917
+ * We are updating the seqNumber so that new seqNumbers are
+ * generated starting from the latest in the system.
+ * @param l
+ */
+
+ public void updateEventSeqNum(long l) {
+ Atomics.setIfGreater(this.eventSeqNum, l);
+ if (logger.isDebugEnabled()) {
+ logger.debug("WAN: On bucket {}, setting the seq number as {} before GII", getId(), l);
+ }
+ }
+
+ protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) {
+ if (!event.isOriginRemote()
+ && !event.isNetSearch()
+ && getBucketAdvisor().isPrimary()) {
+ if (event.isBulkOpInProgress()) {
+ // consolidate the UpdateOperation for each entry into a PutAllMessage
+ // since we did not call basicPutPart3(), so we have to explicitly addEntry here
+ event.getPutAllOperation().addEntry(event, this.getId());
+ } else {
+ new UpdateOperation(event, lastModified).distribute();
+ if (logger.isDebugEnabled()) {
+ logger.debug("sent update operation : for region : {}: with event: {}", this.getName(), event);
+ }
+ }
+ }
+ if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later
+ event.invokeCallbacks(this, true, true);
+ }
+ }
+
+ /**
+ * distribute the operation in basicPutPart2 so the region entry lock is
+ * held
+ */
+ @Override
+ protected long basicPutPart2(EntryEventImpl event, RegionEntry entry, boolean isInitialized,
+ long lastModified, boolean clearConflict) {
+ // Assumed this is called with entry synchrony
+
+ // Typically UpdateOperation is called with the
+ // timestamp returned from basicPutPart2, but as a bucket we want to do
+ // distribution *before* we do basicPutPart2.
+ final long modifiedTime = event.getEventTime(lastModified);
+ // Update the get stats if necessary.
+ if (this.partitionedRegion.getDataStore().hasClientInterest(event)) {
+ updateStatsForGet(entry, true);
+ }
+ if (!event.isOriginRemote()) {
+ if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+ boolean eventHasDelta = event.getDeltaBytes() != null;
+ VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event);
+ if (v != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("generated version tag {} in region {}", v, this.getName());
+ }
+ }
+ }
+
+ // This code assumes it is safe ignore token mode (GII in progress)
+ // because it assumes when the origin of the event is local,
+ // the GII has completed and the region is initialized and open for local
+ // ops
+ if (!event.isBulkOpInProgress()) {
+ long start = this.partitionedRegion.getPrStats().startSendReplication();
+ try {
+ UpdateOperation op = new UpdateOperation(event, modifiedTime);
+ op.distribute();
+ } finally {
+ this.partitionedRegion.getPrStats().endSendReplication(start);
+ }
+ } else {
+ // consolidate the UpdateOperation for each entry into a PutAllMessage
+ // basicPutPart3 takes care of this
+ }
+ }
+
+ return super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict);
+ }
+
+ protected void notifyGatewaySender(EnumListenerEvent operation,
+ EntryEventImpl event) {
+ // We don't need to clone the event for new Gateway Senders.
+ // Preserve the bucket reference for resetting it later.
+ LocalRegion bucketRegion = event.getRegion();
+ try {
+ event.setRegion(this.partitionedRegion);
+ this.partitionedRegion.notifyGatewaySender(operation, event);
+ }
+ finally {
+ // reset the event region back to bucket region.
+ // This should work as gateway queue create GatewaySenderEvent for
+ // queueing.
+ event.setRegion(bucketRegion);
+ }
+ }
+
+ public void checkForPrimary() {
+ final boolean isp = getBucketAdvisor().isPrimary();
+ if (! isp){
+ this.partitionedRegion.checkReadiness();
+ checkReadiness();
+ InternalDistributedMember primaryHolder = getBucketAdvisor().basicGetPrimaryMember();
+ throw new PrimaryBucketException("Bucket " + getName()
+ + " is not primary. Current primary holder is "+primaryHolder);
+ }
+ }
+
+ /**
+ * Checks to make sure that this node is primary, and locks the bucket
+ * to make sure the bucket stays the primary bucket while the write
+ * is in progress. Any call to this method must be followed with a call
+ * to endLocalWrite().
+ * @param event
+ */
+ private boolean beginLocalWrite(EntryEventImpl event) {
+ if(!needWriteLock(event)) {
+ return false;
+ }
+
+ if (cache.isCacheAtShutdownAll()) {
+ throw new CacheClosedException("Cache is shutting down");
+ }
+
+ Object keys[] = new Object[1];
+ keys[0] = event.getKey();
+ waitUntilLocked(keys); // it might wait for long time
+
+ boolean lockedForPrimary = false;
+ try {
+ doLockForPrimary(false);
+ return lockedForPrimary = true;
+ } finally {
+ if (!lockedForPrimary) {
+ removeAndNotifyKeys(keys);
+ }
+ }
+ }
+
+ /**
+ * lock this bucket and, if present, its colocated "parent"
+ * @param tryLock - whether to use tryLock (true) or a blocking lock (false)
+ * @return true if locks were obtained and are still held
+ */
+ public boolean doLockForPrimary(boolean tryLock) {
+ boolean locked = lockPrimaryStateReadLock(tryLock);
+ if(!locked) {
+ return false;
+ }
+
+ boolean isPrimary = false;
+ try {
+ // Throw a PrimaryBucketException if this VM is assumed to be the
+ // primary but isn't, preventing update and distribution
+ checkForPrimary();
+
+ if (cache.isCacheAtShutdownAll()) {
+ throw new CacheClosedException("Cache is shutting down");
+ }
+
+ isPrimary = true;
+ } finally {
+ if(!isPrimary) {
+ doUnlockForPrimary();
+ }
+ }
+
+ return true;
+ }
+
+ private boolean lockPrimaryStateReadLock(boolean tryLock) {
+ Lock activeWriteLock = this.getBucketAdvisor().getActiveWriteLock();
+ Lock parentLock = this.getBucketAdvisor().getParentActiveWriteLock();
+ for (;;) {
+ boolean interrupted = Thread.interrupted();
+ try {
+ //Get the lock. If we have to wait here, it's because
+ //this VM is actively becoming "not primary". We don't want
+ //to throw an exception until this VM is actually no longer
+ //primary, so we wait here for not primary to complete. See bug #39963
+ if (parentLock != null) {
+ if (tryLock) {
+ boolean locked = parentLock.tryLock();
+ if (!locked) {
+ return false;
+ }
+ } else {
+ parentLock.lockInterruptibly();
+ }
+ if (tryLock) {
+ boolean locked = activeWriteLock.tryLock();
+ if (!locked) {
+ parentLock.unlock();
+ return false;
+ }
+ } else {
+ activeWriteLock.lockInterruptibly();
+ }
+ }
+ else {
+ if (tryLock) {
+ boolean locked = activeWriteLock.tryLock();
+ if (!locked) {
+ return false;
+ }
+ } else {
+ activeWriteLock.lockInterruptibly();
+ }
+ }
+ break; // success
+ } catch (InterruptedException e) {
+ interrupted = true;
+ cache.getCancelCriterion().checkCancelInProgress(null);
+ // don't throw InternalGemFireError to fix bug 40102
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public void doUnlockForPrimary() {
+ Lock activeWriteLock = this.getBucketAdvisor().getActiveWriteLock();
+ activeWriteLock.unlock();
+ Lock parentLock = this.getBucketAdvisor().getParentActiveWriteLock();
+ if(parentLock!= null){
+ parentLock.unlock();
+ }
+ }
+
+ /**
+ * Release the lock on the bucket that makes the bucket
+ * stay the primary during a write.
+ */
+ private void endLocalWrite(EntryEventImpl event) {
+ if(!needWriteLock(event)) {
+ return;
+ }
+
+
+ doUnlockForPrimary();
+
+ Object keys[] = new Object[1];
+ keys[0] = event.getKey();
+ removeAndNotifyKeys(keys);
+ }
+
+ protected boolean needWriteLock(EntryEventImpl event) {
+ return !(event.isOriginRemote()
+ || event.isNetSearch()
+ || event.getOperation().isLocal()
+ || event.getOperation().isPutAll()
+ || event.getOperation().isRemoveAll()
+ || (event.isExpiration() && isEntryEvictDestroyEnabled()
+ || event.isPendingSecondaryExpireDestroy()));
+ }
+
+ // this is stubbed out because distribution is done in basicPutPart2 while
+ // the region entry is still locked
+ @Override
+ protected void distributeUpdate(EntryEventImpl event, long lastModified, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue) {
+ }
+
+ // Entry Invalidation rules
+ // If this is a primary for the bucket
+ // 1) apply op locally, aka update entry
+ // 2) distribute op to bucket secondaries and bridge servers with synchrony on local entry
+ // 3) cache listener with synchrony on entry
+ // 4) update local bs, gateway
+ // Else not a primary
+ // 1) apply op locally
+ // 2) update local bs, gateway
+ @Override
+ void basicInvalidate(EntryEventImpl event) throws EntryNotFoundException
+ {
+ basicInvalidate(event, isInitialized(), false);
+ }
+
+ @Override
+ void basicInvalidate(final EntryEventImpl event, boolean invokeCallbacks,
+ boolean forceNewEntry)
+ throws EntryNotFoundException {
+ // disallow local invalidation
+ Assert.assertTrue(! event.isLocalInvalid());
+ Assert.assertTrue(!isTX());
+ Assert.assertTrue(event.getOperation().isDistributed());
+
+ beginLocalWrite(event);
+ try {
+ // increment the tailKey so that invalidate operations are written to HDFS
+ if (this.partitionedRegion.hdfsStoreName != null) {
+ /* MergeGemXDHDFSToGFE Disabled this while porting. Is this required? */
+ //assert this.partitionedRegion.isLocalParallelWanEnabled();
+ handleWANEvent(event);
+ }
+ // which performs the local op.
+ // The ARM then calls basicInvalidatePart2 with the entry synchronized.
+ if ( !hasSeenEvent(event) ) {
+ if (event.getOperation().isExpiration()) { // bug 39905 - invoke listeners for expiration
+ DistributedSystem sys = cache.getDistributedSystem();
+ EventID newID = new EventID(sys);
+ event.setEventId(newID);
+ event.setInvokePRCallbacks(getBucketAdvisor().isPrimary());
+ }
+ boolean forceCallbacks = isEntryEvictDestroyEnabled();
+ boolean done = this.entries.invalidate(event, invokeCallbacks, forceNewEntry, forceCallbacks);
+ ExpirationAction expirationAction = getEntryExpirationAction();
+ if (done && !getBucketAdvisor().isPrimary() && expirationAction != null
+ && expirationAction.isInvalidate()) {
+ synchronized(pendingSecondaryExpires) {
+ pendingSecondaryExpires.remove(event.getKey());
+ }
+ }
+ return;
+ }
+ else {
+ if (logger.isTraceEnabled(LogMarker.DM)) {
+ logger.trace(LogMarker.DM, "LR.basicInvalidate: this cache has already seen this event {}", event);
+ }
- if (!event.isOriginRemote()
- && getBucketAdvisor().isPrimary()) {
- // This cache has processed the event, forward operation
- // and event messages to backup buckets
- new InvalidateOperation(event).distribute();
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ if (!event.isOriginRemote()
++ && getBucketAdvisor().isPrimary()) {
++ // This cache has processed the event, forward operation
++ // and event messages to backup buckets
++ new InvalidateOperation(event).distribute();
++ }
++ event.invokeCallbacks(this,true, false);
+ }
- event.invokeCallbacks(this,true, false);
+ return;
+ }
+ } finally {
+ endLocalWrite(event);
+ }
+ }
+ @Override
+ void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event,
+ boolean clearConflict, boolean invokeCallbacks)
+ {
+ // Assumed this is called with the entry synchronized
+ if (!event.isOriginRemote()) {
+ if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+ VersionTag v = re.generateVersionTag(null, false, this, event);
+ if (logger.isDebugEnabled() && v != null) {
+ logger.debug("generated version tag {} in region {}", v, this.getName());
+ }
+ event.setVersionTag(v);
+ }
+
+ // This code assumes it is safe ignore token mode (GII in progress)
+ // because it assumes when the origin of the event is local,
+ // the GII has completed and the region is initialized and open for local
+ // ops
+
+ // This code assumes that this bucket is primary
+ // distribute op to bucket secondaries and event to other listeners
+ InvalidateOperation op = new InvalidateOperation(event);
+ op.distribute();
+ }
+ super.basicInvalidatePart2(re, event, clearConflict /*Clear conflict occurred */, invokeCallbacks);
+ }
+
+ @Override
+ void distributeInvalidate(EntryEventImpl event) {
+ }
+
+ @Override
+ protected void distributeInvalidateRegion(RegionEventImpl event) {
+ // switch region in event so that we can have distributed region
+ // send InvalidateRegion message.
+ event.region = this;
+ super.distributeInvalidateRegion(event);
+ event.region = this.partitionedRegion;
+ }
+
+ @Override
+ protected boolean shouldDistributeInvalidateRegion(RegionEventImpl event) {
+ return getBucketAdvisor().isPrimary();
+ }
+
+ @Override
+ protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
+ if (event.getOperation().isLocal()) { // bug #45402 - localDestroy generated a version tag
+ return false;
+ }
+ return this.concurrencyChecksEnabled && ((event.getVersionTag() == null) || event.getVersionTag().isGatewayTag());
+ }
+
+ @Override
+ void expireDestroy(EntryEventImpl event, boolean cacheWrite) {
+
+ /* Early out before we throw a PrimaryBucketException because we're not primary */
+ if(needWriteLock(event) && !getBucketAdvisor().isPrimary()) {
+ return;
+ }
+ try {
+ super.expireDestroy(event, cacheWrite);
+ return;
+ } catch(PrimaryBucketException e) {
+ //must have concurrently removed the primary
+ return;
+ }
+ }
+
+ @Override
+ void expireInvalidate(EntryEventImpl event) {
+ if(!getBucketAdvisor().isPrimary()) {
+ return;
+ }
+ try {
+ super.expireInvalidate(event);
+ } catch (PrimaryBucketException e) {
+ //must have concurrently removed the primary
+ }
+ }
+
+ @Override
+ final void performExpiryTimeout(ExpiryTask p_task) throws CacheException
+ {
+ ExpiryTask task = p_task;
+ boolean isEvictDestroy = isEntryEvictDestroyEnabled();
+ //Fix for bug 43805 - get the primary lock before
+ //synchronizing on pendingSecondaryExpires, to match the lock
+ //ordering in other place (like acquiredPrimaryLock)
+ lockPrimaryStateReadLock(false);
+ try {
+ // Why do we care if evict destroy is configured?
+ // See bug 41096 for the answer.
+ if(!getBucketAdvisor().isPrimary() && !isEvictDestroy) {
+ synchronized (this.pendingSecondaryExpires) {
+ if (task.isPending()) {
+ Object key = task.getKey();
+ if (key != null) {
+ this.pendingSecondaryExpires.put(key, task);
+ }
+ }
+ }
+ } else {
+ super.performExpiryTimeout(task);
+ }
+ } finally {
+ doUnlockForPrimary();
+ }
+ }
+
+ protected boolean isEntryEvictDestroyEnabled() {
+ return getEvictionAttributes() != null && EvictionAction.LOCAL_DESTROY.equals(getEvictionAttributes().getAction());
+ }
+
+ protected final void processPendingSecondaryExpires()
+ {
+ ExpiryTask[] tasks;
+ while (true) {
+ // note we just keep looping until no more pendingExpires exist
+ synchronized (this.pendingSecondaryExpires) {
+ if (this.pendingSecondaryExpires.isEmpty()) {
+ return;
+ }
+ tasks = new ExpiryTask[this.pendingSecondaryExpires.size()];
+ tasks = this.pendingSecondaryExpires.values().toArray(tasks);
+ this.pendingSecondaryExpires.clear();
+ }
+ try {
+ if (isCacheClosing() || isClosed() || this.isDestroyed) {
+ return;
+ }
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ for (int i = 0; i < tasks.length; i++) {
+ try {
+ if (isDebugEnabled) {
+ logger.debug("{} fired at {}", tasks[i], System.currentTimeMillis());
+ }
+ tasks[i].basicPerformTimeout(true);
+ if (isCacheClosing() || isClosed() || isDestroyed()) {
+ return;
+ }
+ }
+ catch (EntryNotFoundException ignore) {
+ // ignore and try the next expiry task
+ }
+ }
+ }
+ catch (RegionDestroyedException re) {
+ // Ignore - our job is done
+ }
+ catch (CancelException ex) {
+ // ignore
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable ex) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.fatal(LocalizedMessage.create(LocalizedStrings.LocalRegion_EXCEPTION_IN_EXPIRATION_TASK), ex);
+ }
+ }
+ }
+
+ /**
+ * Creates an event for the EVICT_DESTROY operation so that events will fire
+ * for Partitioned Regions.
+ * @param key - the key that this event is related to
+ * @return an event for EVICT_DESTROY
+ */
+ @Override
+ protected EntryEventImpl generateEvictDestroyEvent(Object key) {
+ EntryEventImpl event = super.generateEvictDestroyEvent(key);
+ event.setInvokePRCallbacks(true); //see bug 40797
+ return event;
+ }
+
+ // Entry Destruction rules
+ // If this is a primary for the bucket
+ // 1) apply op locally, aka destroy entry (REMOVED token)
+ // 2) distribute op to bucket secondaries and bridge servers with synchrony on local entry
+ // 3) cache listener with synchrony on local entry
+ // 4) update local bs, gateway
+ // Else not a primary
+ // 1) apply op locally
+ // 2) update local bs, gateway
+ @Override
+ protected
+ void basicDestroy(final EntryEventImpl event,
+ final boolean cacheWrite,
+ Object expectedOldValue)
+ throws EntryNotFoundException, CacheWriterException, TimeoutException {
+
+ Assert.assertTrue(!isTX());
+ Assert.assertTrue(event.getOperation().isDistributed());
+
+ beginLocalWrite(event);
+ try {
+ // increment the tailKey for the destroy event
+ if (this.partitionedRegion.isParallelWanEnabled()) {
+ handleWANEvent(event);
+ }
+ // In GemFire EVICT_DESTROY is not distributed, so in order to remove the entry
+ // from memory, allow the destroy to proceed. fixes #49784
+ if (event.isLoadedFromHDFS() && !getBucketAdvisor().isPrimary()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Put the destory event in HDFS queue on secondary "
+ + "and return as event is HDFS loaded " + event);
+ }
+ notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
+ return;
+ }else{
+ if (logger.isDebugEnabled()) {
+ logger.debug("Going ahead with the destroy on GemFire system");
+ }
+ }
+ // This call should invoke AbstractRegionMap (aka ARM) destroy method
+ // which calls the CacheWriter, then performs the local op.
+ // The ARM then calls basicDestroyPart2 with the entry synchronized.
+ if ( !hasSeenEvent(event) ) {
+ if (event.getOperation().isExpiration()) { // bug 39905 - invoke listeners for expiration
+ DistributedSystem sys = cache.getDistributedSystem();
+ if (event.getEventId() == null) { // Fix for #47388
+ EventID newID = new EventID(sys);
+ event.setEventId(newID);
+ }
+ event.setInvokePRCallbacks(getBucketAdvisor().isPrimary());
+ }
+ boolean done = mapDestroy(event,
+ cacheWrite,
+ false, // isEviction //merge44610: In cheetah instead of false event.getOperation().isEviction() is used. We kept the cedar change as it is.
+ expectedOldValue);
+ if(done && !getBucketAdvisor().isPrimary() && isEntryExpiryPossible()) {
+ synchronized(pendingSecondaryExpires) {
+ pendingSecondaryExpires.remove(event.getKey());
+ }
+ }
+ return;
+ }
+ else {
- distributeDestroyOperation(event);
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ distributeDestroyOperation(event);
++ }
+ return;
+ }
+ } finally {
+ endLocalWrite(event);
+ }
+ }
+
+ protected void distributeDestroyOperation (EntryEventImpl event) {
+ if (logger.isTraceEnabled(LogMarker.DM)) {
+ logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}", event);
+ }
+ if (!event.isOriginRemote()
+ && getBucketAdvisor().isPrimary()) {
+ if (event.isBulkOpInProgress()) {
+ // consolidate the DestroyOperation for each entry into a RemoveAllMessage
+ event.getRemoveAllOperation().addEntry(event, this.getId());
+ } else {
+ // This cache has processed the event, forward operation
+ // and event messages to backup buckets
+ event.setOldValueFromRegion();
+ new DestroyOperation(event).distribute();
+ }
+ }
+
+ if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later
+ event.invokeCallbacks(this,true, false);
+ }
+ }
+
+ @Override
+ protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
+ // Assumed this is called with entry synchrony
+ if (!event.isOriginRemote()
+ && !event.isBulkOpInProgress()
+ && !event.getOperation().isLocal()
+ && !Operation.EVICT_DESTROY.equals(event.getOperation())
+ && !(event.isExpiration() && isEntryEvictDestroyEnabled())) {
+
+ if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+ VersionTag v = entry.generateVersionTag(null, false, this, event);
+ if (logger.isDebugEnabled() && v != null) {
+ logger.debug("generated version tag {} in region {}", v, this.getName());
+ }
+ }
+
+ // This code assumes it is safe ignore token mode (GII in progress)
+ // because it assume when the origin of the event is local,
+ // then GII has completed (the region has been completely initialized)
+
+ // This code assumes that this bucket is primary
+ new DestroyOperation(event).distribute();
+ }
+ super.basicDestroyBeforeRemoval(entry, event);
+ }
+
+ @Override
+ void distributeDestroy(EntryEventImpl event, Object expectedOldValue) {
+ }
+
+
+// impl removed - not needed for listener invocation alterations
+// void basicDestroyPart2(RegionEntry re, EntryEventImpl event, boolean inTokenMode, boolean invokeCallbacks)
+
+ @Override
+ protected void validateArguments(Object key, Object value, Object aCallbackArgument)
+ {
+ Assert.assertTrue(!isTX());
+ super.validateArguments(key, value, aCallbackArgument);
+ }
+
+ public void forceSerialized(EntryEventImpl event) {
+ event.makeSerializedNewValue();
+// Object obj = event.getRawNewValue();
+// if (obj instanceof byte[]
+// || obj == null
+// || obj instanceof CachedDeserializable
+// || obj == NotAvailable.NOT_AVAILABLE
+// || Token.isInvalidOrRemoved(obj)) {
+// // already serialized
+// return;
+// }
+// throw new InternalGemFireError("event did not force serialized: " + event);
+ }
+
+ /**
+ * This method is called when a miss from a get ends up
+ * finding an object through a cache loader or from a server.
+ * In that case we want to make sure that we don't move
+ * this bucket while putting the value in the ache.
+ * @see LocalRegion#basicPutEntry(EntryEventImpl, long)
+ */
+ @Override
+ protected RegionEntry basicPutEntry(final EntryEventImpl event,
+ final long lastModified) throws TimeoutException,
+ CacheWriterException {
+ beginLocalWrite(event);
+ try {
+ event.setInvokePRCallbacks(true);
+ forceSerialized(event);
+ return super.basicPutEntry(event, lastModified);
+ } finally {
+ endLocalWrite(event);
+ }
+ }
+
+ @Override
+ void basicUpdateEntryVersion(EntryEventImpl event)
+ throws EntryNotFoundException {
+
+ Assert.assertTrue(!isTX());
+ Assert.assertTrue(event.getOperation().isDistributed());
+
+ beginLocalWrite(event);
+ try {
+
+ if (!hasSeenEvent(event)) {
+ this.entries.updateEntryVersion(event);
+ } else {
+ if (logger.isTraceEnabled(LogMarker.DM)) {
+ logger.trace(LogMarker.DM, "BR.basicUpdateEntryVersion: this cache has already seen this event {}", event);
+ }
+ }
+ if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+ // This cache has processed the event, forward operation
+ // and event messages to backup buckets
- new UpdateEntryVersionOperation(event).distribute();
++ if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
++ new UpdateEntryVersionOperation(event).distribute();
++ }
+ }
+ return;
+ } finally {
+ endLocalWrite(event);
+ }
+ }
+
+ public int getRedundancyLevel()
+ {
+ return this.redundancy;
+ }
+
+ public boolean isPrimary() {
+ throw new UnsupportedOperationException(LocalizedStrings.BucketRegion_THIS_SHOULD_NEVER_BE_CALLED_ON_0.toLocalizedString(getClass()));
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ //TODO prpersist - Added this if null check for the partitioned region
+ // because we create the disk store for a bucket *before* in the constructor
+ // for local region, which is before this final field is assigned. This is why
+ // we shouldn't do some much work in the constructors! This is a temporary
+ // hack until I move must of the constructor code to region.initialize.
+ return isBucketDestroyed()
+ || (this.partitionedRegion != null
+ && this.partitionedRegion.isLocallyDestroyed && !isInDestroyingThread());
+ }
+
+ /**
+ * Return true if this bucket has been destroyed.
+ * Don't bother checking to see if the PR that owns this bucket was destroyed;
+ * that has already been checked.
+ * @since 6.0
+ */
+ public boolean isBucketDestroyed() {
+ return super.isDestroyed();
+ }
+
+ @Override
+ public boolean isHDFSRegion() {
+ return this.partitionedRegion.isHDFSRegion();
+ }
+
+ @Override
+ public boolean isHDFSReadWriteRegion() {
+ return this.partitionedRegion.isHDFSReadWriteRegion();
+ }
+
+ @Override
+ protected boolean isHDFSWriteOnly() {
+ return this.partitionedRegion.isHDFSWriteOnly();
+ }
+
+ @Override
+ public int sizeEstimate() {
+ if (isHDFSReadWriteRegion()) {
+ try {
+ checkForPrimary();
+ ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+ if (q == null) return 0;
+ int hdfsBucketRegionSize = q.getBucketRegionQueue(
+ partitionedRegion, getId()).size();
+ int hoplogEstimate = (int) getHoplogOrganizer().sizeEstimate();
+ if (logger.isDebugEnabled()) {
+ logger.debug("for bucket " + getName() + " estimateSize returning "
+ + (hdfsBucketRegionSize + hoplogEstimate));
+ }
+ return hdfsBucketRegionSize + hoplogEstimate;
+ } catch (ForceReattemptException e) {
+ throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+ }
+ }
+ return size();
+ }
+
+ @Override
+ public void checkReadiness()
+ {
+ super.checkReadiness();
+ if (isDestroyed()) {
+ throw new RegionDestroyedException(toString(), getFullPath());
+ }
+ }
+
+ @Override
+ public PartitionedRegion getPartitionedRegion(){
+ return this.partitionedRegion;
+ }
+
+ /**
+ * is the current thread involved in destroying the PR that
+ * owns this region?
+ */
+ private final boolean isInDestroyingThread() {
+ return this.partitionedRegion.locallyDestroyingThread
+ == Thread.currentThread();
+ }
+// public int getSerialNumber() {
+// String s = "This should never be called on " + getClass();
+// throw new UnsupportedOperationException(s);
+// }
+
+ @Override
+ public void fillInProfile(Profile profile) {
+ super.fillInProfile(profile);
+ BucketProfile bp = (BucketProfile) profile;
+ bp.isInitializing = this.initializationLatchAfterGetInitialImage.getCount() > 0;
+ }
+
+ /** check to see if the partitioned region is locally destroyed or closed */
+ public boolean isPartitionedRegionOpen() {
+ return !this.partitionedRegion.isLocallyDestroyed &&
+ !this.partitionedRegion.isClosed && !this.partitionedRegion.isDestroyed();
+ }
+
+ /**
+ * Horribly plagiarized from the similar method in LocalRegion
+ *
+ * @param key
+ * @param updateStats
+ * @param clientEvent holder for client version tag
+ * @param returnTombstones whether Token.TOMBSTONE should be returned for destroyed entries
+ * @return serialized form if present, null if the entry is not in the cache,
+ * or INVALID or LOCAL_INVALID re is a miss (invalid)
+ * @throws IOException
+ * if there is a serialization problem
+ * see LocalRegion#getDeserializedValue(RegionEntry, KeyInfo, boolean, boolean, boolean, EntryEventImpl, boolean, boolean, boolean)
+ */
+ private RawValue getSerialized(Object key, boolean updateStats, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ throws EntryNotFoundException, IOException {
+ RegionEntry re = null;
+ if (allowReadFromHDFS) {
+ re = this.entries.getEntry(key);
+ } else {
+ re = this.entries.getOperationalEntryInVM(key);
+ }
+ if (re == null) {
+ return NULLVALUE;
+ }
+ if (re.isTombstone() && !returnTombstones) {
+ return NULLVALUE;
+ }
+ Object v = null;
+
+ try {
+ v =re.getValue(this); // TODO OFFHEAP: todo v ends up in a RawValue. For now this can be a copy of the offheap onto the heap. But it might be easy to track lifetime of RawValue
+ if(doNotLockEntry) {
+ if(v == Token.NOT_AVAILABLE || v == null) {
+ return REQUIRES_ENTRY_LOCK;
+ }
+ }
+ if (clientEvent != null) {
+ VersionStamp stamp = re.getVersionStamp();
+ if (stamp != null) {
+ clientEvent.setVersionTag(stamp.asVersionTag());
+ }
+ }
+ }catch(DiskAccessException dae) {
+ this.handleDiskAccessException(dae);
+ throw dae;
+ }
+
+ if (v == null) {
+ return NULLVALUE;
+ } else {
+ if (updateStats) {
+ updateStatsForGet(re, true);
+ }
+ return new RawValue(v);
+ }
+ }
+
+ /**
+ * Return serialized form of an entry
+ * <p>
+ * Horribly plagiarized from the similar method in LocalRegion
+ *
+ * @param keyInfo
+ * @param generateCallbacks
+ * @param clientEvent holder for the entry's version information
+ * @param returnTombstones TODO
+ * @return serialized (byte) form
+ * @throws IOException if the result is not serializable
+ * @see LocalRegion#get(Object, Object, boolean, EntryEventImpl)
+ */
+ public RawValue getSerialized(KeyInfo keyInfo, boolean generateCallbacks, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws IOException {
+ checkReadiness();
+ checkForNoAccess();
+ CachePerfStats stats = getCachePerfStats();
+ long start = stats.startGet();
+
+ boolean miss = true;
+ try {
+ RawValue valueBytes = NULLVALUE;
+ boolean isCreate = false;
+ RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS);
+ isCreate = result == NULLVALUE || (result.getRawValue() == Token.TOMBSTONE && !returnTombstones);
+ miss = (result == NULLVALUE || Token.isInvalid(result.getRawValue()));
+ if (miss) {
+ // if scope is local and there is no loader, then
+ // don't go further to try and get value
+ if (hasServerProxy() ||
+ basicGetLoader() != null) {
+ if(doNotLockEntry) {
+ return REQUIRES_ENTRY_LOCK;
+ }
+ // TODO OFFHEAP: optimze
+ Object value = nonTxnFindObject(keyInfo, isCreate,
+ generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false, allowReadFromHDFS);
+ if (value != null) {
+ result = new RawValue(value);
+ }
+ }
+ else { // local scope with no loader, still might need to update stats
+ if (isCreate) {
+ recordMiss(null, keyInfo.getKey());
+ }
+ }
+ }
+ return result; // changed in 7.0 to return RawValue(Token.INVALID) if the entry is invalid
+ }
+ finally {
+ stats.endGet(start, miss);
+ }
+
+ } // getSerialized
+
+ @Override
+ public String toString()
+ {
+ return new StringBuilder()
+ .append("BucketRegion")
+ .append("[path='").append(getFullPath())
+ .append(";serial=").append(getSerialNumber())
+ .append(";primary=").append(getBucketAdvisor().getProxyBucketRegion().isPrimary())
+ .append(";indexUpdater=").append(getIndexUpdater())
+ .append("]")
+ .toString();
+ }
+
+ @Override
+ protected void distributedRegionCleanup(RegionEventImpl event)
+ {
+ // No need to close advisor, assume its already closed
+ // However we need to remove our listener from the advisor (see bug 43950).
+ this.distAdvisor.removeMembershipListener(this.advisorListener);
+ }
+
+ /**
+ * Tell the peers that this VM has destroyed the region.
+ *
+ * Also marks the local disk files as to be deleted before
+ * sending the message to peers.
+ *
+ *
+ * @param rebalance true if this is due to a rebalance removing the bucket
+ */
+ public void removeFromPeersAdvisors(boolean rebalance) {
+ if(getPersistenceAdvisor() != null) {
+ getPersistenceAdvisor().releaseTieLock();
+ }
+
+ DiskRegion diskRegion = getDiskRegion();
+
+ //Tell our peers whether we are destroying this region
+ //or just closing it.
+ boolean shouldDestroy = rebalance || diskRegion == null
+ || !diskRegion.isRecreated();
+ Operation op = shouldDestroy ? Operation.REGION_LOCAL_DESTROY
+ : Operation.REGION_CLOSE;
+
+ RegionEventImpl event = new RegionEventImpl(this, op, null, false,
+ getMyId(), generateEventID()/* generate EventID */);
+ // When destroying the whole partitioned region, there's no need to
+ // distribute the region closure/destruction, the PR RegionAdvisor.close()
+ // has taken care of it
+ if (isPartitionedRegionOpen()) {
+
+
+ //Only delete the files on the local disk if
+ //this is a rebalance, or we are creating the bucket
+ //for the first time
+ if (diskRegion != null && shouldDestroy) {
+ diskRegion.beginDestroyDataStorage();
+ }
+
+ //Send out the destroy op to peers
+ new DestroyRegionOperation(event, true).distribute();
+ }
+ }
+
+ @Override
+ protected void distributeDestroyRegion(RegionEventImpl event,
+ boolean notifyOfRegionDeparture) {
+ //No need to do this when we actually destroy the region,
+ //we already distributed this info.
+ }
+
+ EntryEventImpl createEventForPR(EntryEventImpl sourceEvent) {
+ EntryEventImpl e2 = new EntryEventImpl(sourceEvent);
+ boolean returned = false;
+ try {
+ e2.setRegion(this.partitionedRegion);
+ if (FORCE_LOCAL_LISTENERS_INVOCATION) {
+ e2.setInvokePRCallbacks(true);
+ }
+ else {
+ e2.setInvokePRCallbacks(sourceEvent.getInvokePRCallbacks());
+ }
+ DistributedMember dm = this.getDistributionManager().getDistributionManagerId();
+ e2.setOriginRemote(!e2.getDistributedMember().equals(dm));
+ returned = true;
+ return e2;
+ } finally {
+ if (!returned) {
+ e2.release();
+ }
+ }
+ }
+
+
+
+ @Override
+ public void invokeTXCallbacks(
+ final EnumListenerEvent eventType, final EntryEventImpl event,
+ final boolean callDispatchListenerEvent)
+ {
+ if (logger.isDebugEnabled()) {
+ logger.debug("BR.invokeTXCallbacks for event {}", event);
+ }
+ // bucket events may make it to this point even though the bucket is still
+ // initializing. We can't block while initializing or a GII state flush
+ // may hang, so we avoid notifying the bucket
+ if (this.isInitialized()) {
+ boolean callThem = callDispatchListenerEvent;
+ if (event.isPossibleDuplicate()
+ && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+ callThem = false;
+ }
+ super.invokeTXCallbacks(eventType, event, callThem);
+ }
+ final EntryEventImpl prevent = createEventForPR(event);
+ try {
+ this.partitionedRegion.invokeTXCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false);
+ } finally {
+ prevent.release();
+ }
+ }
+
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.LocalRegion#invokeDestroyCallbacks(com.gemstone.gemfire.internal.cache.EnumListenerEvent, com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean)
+ */
+ @Override
+ public void invokeDestroyCallbacks(
+ final EnumListenerEvent eventType, final EntryEventImpl event,
+ final boolean callDispatchListenerEvent, boolean notifyGateways)
+ {
+ // bucket events may make it to this point even though the bucket is still
+ // initializing. We can't block while initializing or a GII state flush
+ // may hang, so we avoid notifying the bucket
+ if (this.isInitialized()) {
+ boolean callThem = callDispatchListenerEvent;
+ if (event.isPossibleDuplicate()
+ && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+ callThem = false;
+ }
+ super.invokeDestroyCallbacks(eventType, event, callThem, notifyGateways);
+ }
+ final EntryEventImpl prevent = createEventForPR(event);
+ try {
+ this.partitionedRegion.invokeDestroyCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false, false);
+ } finally {
+ prevent.release();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.LocalRegion#invokeInvalidateCallbacks(com.gemstone.gemfire.internal.cache.EnumListenerEvent, com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean)
+ */
+ @Override
+ public void invokeInvalidateCallbacks(
+ final EnumListenerEvent eventType, final EntryEventImpl event,
+ final boolean callDispatchListenerEvent)
+ {
+ // bucket events may make it to this point even though the bucket is still
+ // initializing. We can't block while initializing or a GII state flush
+ // may hang, so we avoid notifying the bucket
+ if (this.isInitialized()) {
+ boolean callThem = callDispatchListenerEvent;
+ if (event.isPossibleDuplicate()
+ && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+ callThem = false;
+ }
+ super.invokeInvalidateCallbacks(eventType, event, callThem);
+ }
+ final EntryEventImpl prevent = createEventForPR(event);
+ try {
+ this.partitionedRegion.invokeInvalidateCallbacks(eventType, prevent, this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false);
+ } finally {
+ prevent.release();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.LocalRegion#invokePutCallbacks(com.gemstone.gemfire.internal.cache.EnumListenerEvent, com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean)
+ */
+ @Override
+ public void invokePutCallbacks(
+ final EnumListenerEvent eventType, final EntryEventImpl event,
+ final boolean callDispatchListenerEvent, boolean notifyGateways)
+ {
+ if (logger.isTraceEnabled()) {
+ logger.trace("invoking put callbacks on bucket for event {}", event);
+ }
+ // bucket events may make it to this point even though the bucket is still
+ // initializing. We can't block while initializing or a GII state flush
+ // may hang, so we avoid notifying the bucket
+ if (this.isInitialized()) {
+ boolean callThem = callDispatchListenerEvent;
+ if (callThem && event.isPossibleDuplicate()
+ && this.eventTracker.isInitialImageProvider(event.getDistributedMember())) {
+ callThem = false;
+ }
+ super.invokePutCallbacks(eventType, event, callThem, notifyGateways);
+ }
+
+ final EntryEventImpl prevent = createEventForPR(event);
+ try {
+ this.partitionedRegion.invokePutCallbacks(eventType, prevent,
+ this.partitionedRegion.isInitialized() ? callDispatchListenerEvent : false, false);
+ } finally {
+ prevent.release();
+ }
+ }
+
+ /**
+ * perform adjunct messaging for the given operation and return a set of
+ * members that should be attached to the operation's reply processor (if any)
+ * @param event the event causing this messaging
+ * @param cacheOpRecipients set of receiver which got cacheUpdateOperation.
+ * @param adjunctRecipients recipients that must unconditionally get the event
+ * @param filterRoutingInfo routing information for all members having the region
+ * @param processor the reply processor, or null if there isn't one
+ * @return the set of failed recipients
+ */
+ protected Set performAdjunctMessaging(EntryEventImpl event,
+ Set cacheOpRecipients, Set adjunctRecipients,
+ FilterRoutingInfo filterRoutingInfo,
+ DirectReplyProcessor processor,
+ boolean calculateDelta,
+ boolean sendDeltaWithFullValue) {
+
+ Set failures = Collections.EMPTY_SET;
+ PartitionMessage msg = event.getPartitionMessage();
+ if (calculateDelta) {
+ setDeltaIfNeeded(event);
+ }
+ if (msg != null) {
+ // The primary bucket member which is being modified remotely by a GemFire
+ // thread via a received PartitionedMessage
+ //Asif: Some of the adjunct recepients include those members which
+ // are sqlFabricHub & would need old value along with news
+ msg = msg.getMessageForRelayToListeners(event, adjunctRecipients);
+ msg.setSender(this.partitionedRegion.getDistributionManager()
+ .getDistributionManagerId());
+ msg.setSendDeltaWithFullValue(sendDeltaWithFullValue);
+
+ failures = msg.relayToListeners(cacheOpRecipients, adjunctRecipients,
+ filterRoutingInfo, event, this.partitionedRegion, processor);
+ }
+ else {
+ // The primary bucket is being modified locally by an application thread locally
+ Operation op = event.getOperation();
+ if (op.isCreate() || op.isUpdate()) {
+ // note that at this point ifNew/ifOld have been used to update the
+ // local store, and the event operation should be correct
+ failures = PutMessage.notifyListeners(cacheOpRecipients,
+ adjunctRecipients, filterRoutingInfo, this.partitionedRegion,
+ event, op.isCreate(), !op.isCreate(), processor,
+ sendDeltaWithFullValue);
+ }
+ else if (op.isDestroy()) {
+ failures = DestroyMessage.notifyListeners(cacheOpRecipients,
+ adjunctRecipients, filterRoutingInfo,
+ this.partitionedRegion, event, processor);
+ }
+ else if (op.isInvalidate()) {
+ failures = InvalidateMessage.notifyListeners(cacheOpRecipients,
+ adjunctRecipients, filterRoutingInfo,
+ this.partitionedRegion, event, processor);
+ }
+ else {
+ failures = adjunctRecipients;
+ }
+ }
+ return failures;
+ }
+
+ private void setDeltaIfNeeded(EntryEventImpl event) {
+ if (this.partitionedRegion.getSystem().getConfig().getDeltaPropagation()
+ && event.getOperation().isUpdate() && event.getDeltaBytes() == null) {
+ @Unretained Object rawNewValue = event.getRawNewValue();
+ if (!(rawNewValue instanceof CachedDeserializable)) {
+ return;
+ }
+ if (rawNewValue instanceof StoredObject && !((StoredObject) rawNewValue).isSerialized()) {
+ // it is a byte[]; not a Delta
+ return;
+ }
+ Object instance = ((CachedDeserializable)rawNewValue).getValue();
+ if (instance instanceof com.gemstone.gemfire.Delta
+ && ((com.gemstone.gemfire.Delta)instance).hasDelta()) {
+ try {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ long start = DistributionStats.getStatTime();
+ ((com.gemstone.gemfire.Delta)instance).toDelta(hdos);
+ event.setDeltaBytes(hdos.toByteArray());
+ this.partitionedRegion.getCachePerfStats().endDeltaPrepared(start);
+ }
+ catch (RuntimeException re) {
+ throw re;
+ }
+ catch (Exception e) {
+ throw new DeltaSerializationException(
+ LocalizedStrings.DistributionManager_CAUGHT_EXCEPTION_WHILE_SENDING_DELTA
+ .toLocalizedString(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * create a PutAllPRMessage for notify-only and send it to all adjunct nodes.
+ * return a set of members that should be attached to the operation's reply processor (if any)
+ * @param dpao DistributedPutAllOperation object for PutAllMessage
+ * @param cacheOpRecipients set of receiver which got cacheUpdateOperation.
+ * @param adjunctRecipients recipients that must unconditionally get the event
+ * @param filterRoutingInfo routing information for all members having the region
+ * @param processor the reply processor, or null if there isn't one
+ * @return the set of failed recipients
+ */
+ public Set performPutAllAdjunctMessaging(DistributedPutAllOperation dpao,
+ Set cacheOpRecipients, Set adjunctRecipients, FilterRoutingInfo filterRoutingInfo,
+ DirectReplyProcessor processor) {
+ // create a PutAllPRMessage out of PutAllMessage to send to adjunct nodes
+ PutAllPRMessage prMsg = dpao.createPRMessagesNotifyOnly(getId());
+ prMsg.initMessage(this.partitionedRegion, adjunctRecipients, true, processor);
+ prMsg.setSender(this.partitionedRegion.getDistributionManager()
+ .getDistributionManagerId());
+
+ // find members who have clients subscribed to this event and add them
+ // to the recipients list. Also determine if there are any FilterInfo
+ // routing tables for any of the receivers
+// boolean anyWithRouting = false;
+ Set recipients = null;
+ Set membersWithRouting = filterRoutingInfo.getMembers();
+ for (Iterator it=membersWithRouting.iterator(); it.hasNext(); ) {
+ Object mbr = it.next();
+ if (!cacheOpRecipients.contains(mbr)) {
+// anyWithRouting = true;
+ if (!adjunctRecipients.contains(mbr)) {
+ if (recipients == null) {
+ recipients = new HashSet();
+ recipients.add(mbr);
+ }
+ }
+ }
+ }
+ if (recipients == null) {
+ recipients = adjunctRecipients;
+ } else {
+ recipients.addAll(adjunctRecipients);
+ }
+
+// Set failures = Collections.EMPTY_SET;
+
+// if (!anyWithRouting) {
+ Set failures = this.partitionedRegion.getDistributionManager().putOutgoing(prMsg);
+
+// } else {
+// // Send message to each member. We set a FilterRoutingInfo serialization
+// // target so that serialization of the PutAllData objects held in the
+// // message will only serialize the routing entry for the message recipient
+// Iterator rIter = recipients.iterator();
+// failures = new HashSet();
+// while (rIter.hasNext()){
+// InternalDistributedMember member = (InternalDistributedMember)rIter.next();
+// FilterRoutingInfo.setSerializationTarget(member);
+// try {
+// prMsg.resetRecipients();
+// prMsg.setRecipient(member);
+// Set fs = this.partitionedRegion.getDistributionManager().putOutgoing(prMsg);
+// if (fs != null && !fs.isEmpty()) {
+// failures.addAll(fs);
+// }
+// } finally {
+// FilterRoutingInfo.clearSerializationTarget();
+// }
+// }
+// }
+
+ return failures;
+ }
+
+ /**
+ * create a RemoveAllPRMessage for notify-only and send it to all adjunct nodes.
+ * return a set of members that should be attached to the operation's reply processor (if any)
+ * @param op DistributedRemoveAllOperation object for RemoveAllMessage
+ * @param cacheOpRecipients set of receiver which got cacheUpdateOperation.
+ * @param adjunctRecipients recipients that must unconditionally get the event
+ * @param filterRoutingInfo routing information for all members having the region
+ * @param processor the reply processor, or null if there isn't one
+ * @return the set of failed recipients
+ */
+ public Set performRemoveAllAdjunctMessaging(DistributedRemoveAllOperation op,
+ Set cacheOpRecipients, Set adjunctRecipients, FilterRoutingInfo filterRoutingInfo,
+ DirectReplyProcessor processor) {
+ // create a RemoveAllPRMessage out of RemoveAllMessage to send to adjunct nodes
+ RemoveAllPRMessage prMsg = op.createPRMessagesNotifyOnly(getId());
+ prMsg.initMessage(this.partitionedRegion, adjunctRecipients, true, processor);
+ prMsg.setSender(this.partitionedRegion.getDistributionManager()
+ .getDistributionManagerId());
+
+ // find members who have clients subscribed to this event and add them
+ // to the recipients list. Also determine if there are any FilterInfo
+ // routing tables for any of the receivers
+ Set recipients = null;
+ Set membersWithRouting = filterRoutingInfo.getMembers();
+ for (Iterator it=membersWithRouting.iterator(); it.hasNext(); ) {
+ Object mbr = it.next();
+ if (!cacheOpRecipients.contains(mbr)) {
+// anyWithRouting = true;
+ if (!adjunctRecipients.contains(mbr)) {
+ if (recipients == null) {
+ recipients = new HashSet();
+ recipients.add(mbr);
+ }
+ }
+ }
+ }
+ if (recipients == null) {
+ recipients = adjunctRecipients;
+ } else {
+ recipients.addAll(adjunctRecipients);
+ }
+
+ Set failures = this.partitionedRegion.getDistributionManager().putOutgoing(prMsg);
+ return failures;
+ }
+
+ /**
+ * return the set of recipients for adjunct operations
+ */
+ protected Set getAdjunctReceivers(EntryEventImpl event, Set cacheOpReceivers,
+ Set twoMessages, FilterRoutingInfo routing) {
+ Operation op = event.getOperation();
+ if (op.isUpdate() || op.isCreate() || op.isDestroy() || op.isInvalidate()) {
+ // this method can safely assume that the operation is being distributed from
+ // the primary bucket holder to other nodes
+ Set r = this.partitionedRegion.getRegionAdvisor()
+ .adviseRequiresNotification(event);
+
+ if (r.size() > 0) {
+ r.removeAll(cacheOpReceivers);
+ }
+
+ // buckets that are initializing may transition out of token mode during
+ // message transmission and need both cache-op and adjunct messages to
+ // ensure that listeners are invoked
+ if (twoMessages.size() > 0) {
+ if (r.size() == 0) { // can't add to Collections.EMPTY_SET
+ r = twoMessages;
+ }
+ else {
+ r.addAll(twoMessages);
+ }
+ }
+ if (routing != null) {
+ // add adjunct messages to members with client routings
+ for (InternalDistributedMember id: routing.getMembers()) {
+ if (!cacheOpReceivers.contains(id)) {
+ if (r.isEmpty()) {
+ r = new HashSet();
+ }
+ r.add(id);
+ }
+ }
+ }
+ return r;
+ }
+ else {
+ return Collections.EMPTY_SET;
+ }
+ }
+
+ public int getId() {
+ return getBucketAdvisor().getProxyBucketRegion().getId();
+ }
+
+ @Override
+ protected void cacheWriteBeforePut(EntryEventImpl event, Set netWriteRecipients,
+ CacheWriter localWriter,
+ boolean requireOldValue, Object expectedOldValue)
+ throws CacheWriterException, TimeoutException {
+
+ boolean origRemoteState = false;
+ try {
+ if (event.getPartitionMessage() != null || event.hasClientOrigin()) {
+ origRemoteState=event.isOriginRemote();
+ event.setOriginRemote(true);
+ }
+ event.setRegion(this.partitionedRegion);
+ this.partitionedRegion.cacheWriteBeforePut(event, netWriteRecipients,
+ localWriter, requireOldValue, expectedOldValue);
+ } finally {
+ if (event.getPartitionMessage() != null || event.hasClientOrigin()) {
+ event.setOriginRemote(origRemoteState);
+ }
+ event.setRegion(this);
+ }
+ }
+
+ @Override
+ boolean cacheWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
+ throws CacheWriterException, EntryNotFoundException, TimeoutException {
+
+ boolean origRemoteState = false;
+ boolean ret = false;
+ try {
+ if (event.getPartitionMessage() != null || event.hasClientOrigin()) {
+ origRemoteState=event.isOriginRemote();
+ event.setOriginRemote(true);
+ }
+ event.setRegion(this.partitionedRegion);
+ ret = this.partitionedRegion.cacheWriteBeforeDestroy(event, expectedOldValue);
+ } finally {
+ if (event.getPartitionMessage() != null || event.hasClientOrigin()) {
+ event.setOriginRemote(origRemoteState);
+ }
+ event.setRegion(this);
+ }
+ return ret;
+ // return super.cacheWriteBeforeDestroy(event);
+ }
+
+ @Override
+ public CacheWriter basicGetWriter() {
+ return this.partitionedRegion.basicGetWriter();
+ }
+ @Override
+ void cleanUpOnIncompleteOp(EntryEventImpl event, RegionEntry re,
+ boolean eventRecorded, boolean updateStats, boolean isReplace) {
+
+
+ if(!eventRecorded || isReplace) {
+ //No indexes updated so safe to remove.
+ this.entries.removeEntry(event.getKey(), re, updateStats) ;
+ }/*else {
+ //if event recorded is true, that means as per event tracker entry is in
+ //system. As per sqlfabric, indexes have been updated. What is not done
+ // is basicPutPart2( distribution etc). So we do nothing as PR's re-attempt
+ // will do the required basicPutPart2. If we remove the entry here, than
+ //event tracker will not allow re insertion. So either we do nothing or
+ //if we remove ,than we have to update sqlfindexes as well as undo recording
+ // of event.
+ //TODO:OQL indexes? : Hope they get updated during retry. The issue is that oql indexes
+ // get updated after distribute , so it is entirely possible that oql index are
+ // not updated. what if retry fails?
+
+ }*/
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.partitioned.Bucket#getBucketOwners()
+ * @since gemfire59poc
+ */
+ public Set getBucketOwners() {
+ return getBucketAdvisor().getProxyBucketRegion().getBucketOwners();
+ }
+
+ public long getCounter() {
+ return counter.get();
+ }
+
+ public void setCounter(AtomicLong counter) {
+ this.counter = counter;
+ }
+
+ public void updateCounter(long delta) {
+ if (delta != 0) {
+ this.counter.getAndAdd(delta);
+ }
+ }
+
+ public void resetCounter() {
+ if (this.counter.get() != 0) {
+ this.counter.set(0);
+ }
+ }
+
+ public long getLimit() {
+ if (this.limit == null) {
+ return 0;
+ }
+ return limit.get();
+ }
+
+ public void setLimit(long limit) {
+ // This method can be called before object of this class is created
+ if (this.limit == null) {
+ this.limit = new AtomicLong();
+ }
+ this.limit.set(limit);
+ }
+
+ static int calcMemSize(Object value) {
+ if (value != null && (value instanceof GatewaySenderEventImpl)) {
+ return ((GatewaySenderEventImpl)value).getSerializedValueSize();
+ }
+ if (value == null || value instanceof Token) {
+ return 0;
+ }
+ if (!(value instanceof byte[]) && !(value instanceof CachedDeserializable)
+ && !(value instanceof com.gemstone.gemfire.Delta) && !(value instanceof Delta)) {
+ // ezoerner:20090401 it's possible this value is a Delta
+ throw new InternalGemFireError("DEBUG: calcMemSize: weird value (class "
+ + value.getClass() + "): " + value);
+ }
+
+ try {
+ return CachedDeserializableFactory.calcMemSize(value);
+ } catch (IllegalArgumentException e) {
+ return 0;
+ }
+ }
+
+ boolean isDestroyingDiskRegion;
+
+ @Override
+ protected void updateSizeOnClearRegion(int sizeBeforeClear) {
+ // This method is only called when the bucket is destroyed. If we
+ // start supporting clear of partitioned regions, this logic needs to change
+ // we can't just set these counters to zero, because there could be
+ // concurrent operations that are also updating these stats. For example,
+ //a destroy could have already been applied to the map, and then updates
+ //the stat after we reset it, making the state negative.
+
+ final PartitionedRegionDataStore prDs = this.partitionedRegion.getDataStore();
+ long oldMemValue;
+
+ if(this.isDestroyed || this.isDestroyingDiskRegion) {
+ //If this region is destroyed, mark the stat as destroyed.
+ oldMemValue = this.bytesInMemory.getAndSet(BUCKET_DESTROYED);
+
+ } else if(!this.isInitialized()) {
+ //This case is rather special. We clear the region if the GII failed.
+ //In the case of bucket regions, we know that there will be no concurrent operations
+ //if GII has failed, because there is not primary. So it's safe to set these
+ //counters to 0.
+ oldMemValue = this.bytesInMemory.getAndSet(0);
+ }
+ // Gemfire PRs don't support clear. allowing it via a hack for tests
+ else if (LocalRegion.simulateClearForTests) {
+ oldMemValue = this.bytesInMemory.getAndSet(0);
+ }
+ else {
+ throw new InternalGemFireError("Trying to clear a bucket region that was not destroyed or in initialization.");
+ }
+ if(oldMemValue != BUCKET_DESTROYED) {
+ this.partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear);
+ prDs.updateMemoryStats(-oldMemValue);
+ }
+ }
+
+ @Override
+ public int calculateValueSize(Object val) {
+ // Only needed by BucketRegion
+ return calcMemSize(val);
+ }
+ @Override
+ public int calculateRegionEntryValueSize(RegionEntry re) {
+ return calcMemSize(re._getValue()); // OFFHEAP _getValue ok
+ }
+
+ @Override
+ void updateSizeOnPut(Object key, int oldSize, int newSize) {
+ updateBucket2Size(oldSize, newSize, SizeOp.UPDATE);
+ }
+
+ @Override
+ void updateSizeOnCreate(Object key, int newSize) {
+ this.partitionedRegion.getPrStats().incDataStoreEntryCount(1);
+ updateBucket2Size(0, newSize, SizeOp.CREATE);
+ }
+
+ @Override
+ void updateSizeOnRemove(Object key, int oldSize) {
+ this.partitionedRegion.getPrStats().incDataStoreEntryCount(-1);
+ updateBucket2Size(oldSize, 0, SizeOp.DESTROY);
+ }
+
+ @Override
+ int updateSizeOnEvict(Object key, int oldSize) {
+ int newDiskSize = oldSize;
+ updateBucket2Size(oldSize, newDiskSize, SizeOp.EVICT);
+ return newDiskSize;
+ }
+
+ @Override
+ public void updateSizeOnFaultIn(Object key, int newMemSize, int oldDiskSize) {
+ updateBucket2Size(oldDiskSize, newMemSize, SizeOp.FAULT_IN);
+ }
+
+ @Override
+ public void initializeStats(long numEntriesInVM, long
<TRUNCATED>