You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:01 UTC
[15/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
index c5b5d3a..74efd51 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
@@ -96,6 +96,8 @@ public interface GatewaySender {
public static final int DEFAULT_DISPATCHER_THREADS = 5;
+ public static final int DEFAULT_HDFS_DISPATCHER_THREADS = 5;
+
public static final OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY;
/**
* The default maximum amount of memory (MB) to allow in the queue before
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index bd78f5a..77f24a3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -52,6 +52,7 @@ import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusRequest;
import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusResponse;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.internal.CqEntry;
import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults;
@@ -1022,6 +1023,8 @@ public final class DSFIDFactory implements DataSerializableFixedID {
RemoteFetchVersionMessage.FetchVersionReplyMessage.class);
registerDSFID(RELEASE_CLEAR_LOCK_MESSAGE, ReleaseClearLockMessage.class);
registerDSFID(PR_TOMBSTONE_MESSAGE, PRTombstoneMessage.class);
+ registerDSFID(HDFS_GATEWAY_EVENT_IMPL, HDFSGatewayEventImpl.class);
+
registerDSFID(REQUEST_RVV_MESSAGE, InitialImageOperation.RequestRVVMessage.class);
registerDSFID(RVV_REPLY_MESSAGE, InitialImageOperation.RVVReplyMessage.class);
registerDSFID(SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE, SnappyCompressedCachedDeserializable.class);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 7427f90..5d52346 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -103,6 +103,7 @@ public interface DataSerializableFixedID extends SerializationVersions {
public static final short JOIN_RESPONSE = -143;
public static final short JOIN_REQUEST = -142;
+ public static final short HDFS_GATEWAY_EVENT_IMPL = -141;
public static final short SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE = -140;
public static final short GATEWAY_EVENT_IMPL = -136;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
index f8740db..9b0446f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
@@ -33,6 +33,7 @@ import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.CustomExpiry;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Declarable;
@@ -49,7 +50,10 @@ import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.compression.CompressionException;
import com.gemstone.gemfire.compression.Compressor;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.EvictionAttributesImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -104,6 +108,8 @@ public class RemoteRegionAttributes implements RegionAttributes,
private String[] gatewaySendersDescs;
private boolean isGatewaySenderEnabled = false;
private String[] asyncEventQueueDescs;
+ private String hdfsStoreName;
+ private boolean hdfsWriteOnly;
private String compressorDesc;
private boolean offHeap;
@@ -155,6 +161,8 @@ public class RemoteRegionAttributes implements RegionAttributes,
this.isDiskSynchronous = attr.isDiskSynchronous();
this.gatewaySendersDescs = getDescs(attr.getGatewaySenderIds().toArray());
this.asyncEventQueueDescs = getDescs(attr.getAsyncEventQueueIds().toArray());
+ this.hdfsStoreName = attr.getHDFSStoreName();
+ this.hdfsWriteOnly = attr.getHDFSWriteOnly();
this.compressorDesc = getDesc(attr.getCompressor());
this.offHeap = attr.getOffHeap();
}
@@ -411,6 +419,7 @@ public class RemoteRegionAttributes implements RegionAttributes,
DataSerializer.writeString(this.compressorDesc, out);
out.writeBoolean(this.offHeap);
+ DataSerializer.writeString(this.hdfsStoreName, out);
}
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
@@ -459,6 +468,7 @@ public class RemoteRegionAttributes implements RegionAttributes,
this.compressorDesc = DataSerializer.readString(in);
this.offHeap = in.readBoolean();
+ this.hdfsStoreName = DataSerializer.readString(in);
}
private String[] getDescs(Object[] l) {
@@ -626,6 +636,15 @@ public class RemoteRegionAttributes implements RegionAttributes,
return this.evictionAttributes;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public CustomEvictionAttributes getCustomEvictionAttributes() {
+ // TODO: HDFS: no support for custom eviction attributes from remote yet
+ return null;
+ }
+
public boolean getCloningEnabled() {
// TODO Auto-generated method stub
return this.cloningEnable;
@@ -634,6 +653,12 @@ public class RemoteRegionAttributes implements RegionAttributes,
public String getDiskStoreName() {
return this.diskStoreName;
}
+ public String getHDFSStoreName() {
+ return this.hdfsStoreName;
+ }
+ public boolean getHDFSWriteOnly() {
+ return this.hdfsWriteOnly;
+ }
public boolean isDiskSynchronous() {
return this.isDiskSynchronous;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
index 92eaa01..1f8da88 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
@@ -34,6 +34,8 @@ import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.internal.cache.lru.LRUStatistics;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
@@ -457,8 +459,17 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
}
waitIfQueueFull();
+ int sizeOfHdfsEvent = -1;
try {
-
+ if (this instanceof HDFSBucketRegionQueue) {
+ // need to fetch the size before event is inserted in queue.
+ // fix for #50016
+ if (this.getBucketAdvisor().isPrimary()) {
+ HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
+ sizeOfHdfsEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
+ }
+ }
+
didPut = virtualPut(event, false, false, null, false, startPut, true);
checkReadiness();
@@ -481,7 +492,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
destroyKey(key);
didPut = false;
} else {
- addToEventQueue(key, didPut, event);
+ addToEventQueue(key, didPut, event, sizeOfHdfsEvent);
}
return didPut;
}
@@ -511,7 +522,8 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
}
protected abstract void clearQueues();
- protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event);
+ protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event,
+ int sizeOfHdfsEvent);
@Override
public void afterAcquiringPrimaryState() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
index d37f025..10644cb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.Logger;
@@ -45,6 +46,7 @@ import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheStatistics;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.CustomExpiry;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskWriteAttributes;
@@ -52,6 +54,7 @@ import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.EvictionAttributesMutator;
+import com.gemstone.gemfire.cache.EvictionCriteria;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.ExpirationAttributes;
import com.gemstone.gemfire.cache.MembershipAttributes;
@@ -97,6 +100,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.util.ArrayUtils;
import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
+import com.google.common.util.concurrent.Service.State;
/**
* Takes care of RegionAttributes, AttributesMutator, and some no-brainer method
@@ -232,6 +236,8 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
protected EvictionAttributesImpl evictionAttributes = new EvictionAttributesImpl();
+ protected CustomEvictionAttributes customEvictionAttributes;
+
/** The membership attributes defining required roles functionality */
protected MembershipAttributes membershipAttributes;
@@ -254,6 +260,10 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
protected String poolName;
+ protected String hdfsStoreName;
+
+ protected boolean hdfsWriteOnly;
+
protected Compressor compressor;
/**
@@ -888,6 +898,16 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
return this.subscriptionAttributes;
}
+ @Override
+ public final String getHDFSStoreName() {
+ return this.hdfsStoreName;
+ }
+
+ @Override
+ public final boolean getHDFSWriteOnly() {
+ return this.hdfsWriteOnly;
+ }
+
/**
* Get IndexManger for region
*/
@@ -1708,6 +1728,7 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
this.setEvictionController(this.evictionAttributes
.createEvictionController(this, attrs.getOffHeap()));
}
+ this.customEvictionAttributes = attrs.getCustomEvictionAttributes();
storeCacheListenersField(attrs.getCacheListeners());
assignCacheLoader(attrs.getCacheLoader());
assignCacheWriter(attrs.getCacheWriter());
@@ -1765,6 +1786,8 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
+ "when multiuser-authentication is true.");
}
}
+ this.hdfsStoreName = attrs.getHDFSStoreName();
+ this.hdfsWriteOnly = attrs.getHDFSWriteOnly();
this.diskStoreName = attrs.getDiskStoreName();
this.isDiskSynchronous = attrs.isDiskSynchronous();
@@ -1830,12 +1853,52 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
return this.evictionAttributes;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public CustomEvictionAttributes getCustomEvictionAttributes() {
+ return this.customEvictionAttributes;
+ }
+
public EvictionAttributesMutator getEvictionAttributesMutator()
{
return this.evictionAttributes;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public CustomEvictionAttributes setCustomEvictionAttributes(long newStart,
+ long newInterval) {
+ checkReadiness();
+
+ if (this.customEvictionAttributes == null) {
+ throw new IllegalArgumentException(
+ LocalizedStrings.AbstractRegion_NO_CUSTOM_EVICTION_SET
+ .toLocalizedString(getFullPath()));
+ }
+
+ if (newStart == 0) {
+ newStart = this.customEvictionAttributes.getEvictorStartTime();
+ }
+ this.customEvictionAttributes = new CustomEvictionAttributesImpl(
+ this.customEvictionAttributes.getCriteria(), newStart, newInterval,
+ newStart == 0 && newInterval == 0);
+
+// if (this.evService == null) {
+// initilializeCustomEvictor();
+// } else {// we are changing the earlier one which is already started.
+// EvictorService service = getEvictorTask();
+// service.changeEvictionInterval(newInterval);
+// if (newStart != 0)
+// service.changeStartTime(newStart);
+// }
+ return this.customEvictionAttributes;
+ }
+
public void setEvictionController(LRUAlgorithm evictionController)
{
this.evictionController = evictionController;
@@ -1974,6 +2037,7 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
/**
* @since 8.1
+ * property used to find region operations that reach out to HDFS multiple times
*/
@Override
public ExtensionPoint<Region<?, ?>> getExtensionPoint() {
@@ -1983,4 +2047,87 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
public boolean getOffHeap() {
return this.offHeap;
}
+ /**
+ * property used to find region operations that reach out to HDFS multiple times
+ */
+ private static final boolean DEBUG_HDFS_CALLS = Boolean.getBoolean("DebugHDFSCalls");
+
+ /**
+ * throws exception if region operation goes out to HDFS multiple times
+ */
+ private static final boolean THROW_ON_MULTIPLE_HDFS_CALLS = Boolean.getBoolean("throwOnMultipleHDFSCalls");
+
+ private ThreadLocal<CallLog> logHDFSCalls = DEBUG_HDFS_CALLS ? new ThreadLocal<CallLog>() : null;
+
+ public void hdfsCalled(Object key) {
+ if (!DEBUG_HDFS_CALLS) {
+ return;
+ }
+ logHDFSCalls.get().addStack(new Throwable());
+ logHDFSCalls.get().setKey(key);
+ }
+ public final void operationStart() {
+ if (!DEBUG_HDFS_CALLS) {
+ return;
+ }
+ if (logHDFSCalls.get() == null) {
+ logHDFSCalls.set(new CallLog());
+ //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationStart", new Throwable());
+ } else {
+ logHDFSCalls.get().incNestedCall();
+ //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:incNestedCall:", new Throwable());
+ }
+ }
+ public final void operationCompleted() {
+ if (!DEBUG_HDFS_CALLS) {
+ return;
+ }
+ //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationCompleted", new Throwable());
+ if (logHDFSCalls.get() != null && logHDFSCalls.get().decNestedCall() < 0) {
+ logHDFSCalls.get().assertCalls();
+ logHDFSCalls.set(null);
+ }
+ }
+
+ public static class CallLog {
+ private List<Throwable> stackTraces = new ArrayList<Throwable>();
+ private Object key;
+ private int nestedCall = 0;
+ public void incNestedCall() {
+ nestedCall++;
+ }
+ public int decNestedCall() {
+ return --nestedCall;
+ }
+ public void addStack(Throwable stack) {
+ this.stackTraces.add(stack);
+ }
+ public void setKey(Object key) {
+ this.key = key;
+ }
+ public void assertCalls() {
+ if (stackTraces.size() > 1) {
+ Throwable firstTrace = new Throwable();
+ Throwable lastTrace = firstTrace;
+ for (Throwable t : this.stackTraces) {
+ lastTrace.initCause(t);
+ lastTrace = t;
+ }
+ if (THROW_ON_MULTIPLE_HDFS_CALLS) {
+ throw new RuntimeException("SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
+ } else {
+ InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
+ }
+ }
+ }
+ }
+
+ public EvictionCriteria getEvictionCriteria() {
+ EvictionCriteria criteria = null;
+ if (this.customEvictionAttributes != null
+ && !this.customEvictionAttributes.isEvictIncoming()) {
+ criteria = this.customEvictionAttributes.getCriteria();
+ }
+ return criteria;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index 46a851d..b936e3f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -870,7 +870,15 @@ public abstract class AbstractRegionEntry implements RegionEntry,
removeEntry = true;
}
- if (removeEntry) {
+ // See #47887, we do not insert a tombstone for evicted HDFS
+ // entries since the value is still present in HDFS
+ // Check if we have to evict or just do destroy.
+ boolean forceRemoveEntry =
+ (event.isEviction() || event.isExpiration())
+ && event.getRegion().isUsedForPartitionedRegionBucket()
+ && event.getRegion().getPartitionedRegion().isHDFSRegion();
+
+ if (removeEntry || forceRemoveEntry) {
boolean isThisTombstone = isTombstone();
if(inTokenMode && !event.getOperation().isEviction()) {
setValue(region, Token.DESTROYED);
@@ -1390,7 +1398,27 @@ public abstract class AbstractRegionEntry implements RegionEntry,
/**
* {@inheritDoc}
*/
+ @Override
+ public final boolean isMarkedForEviction() {
+ return areAnyBitsSet(MARKED_FOR_EVICTION);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void setMarkedForEviction() {
+ setBits(MARKED_FOR_EVICTION);
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public final void clearMarkedForEviction() {
+ clearBits(~MARKED_FOR_EVICTION);
+ }
+
@Override
public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) {
if (TXManagerImpl.decRefCount(this)) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 75a1e32..3286373 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -18,6 +18,7 @@
package com.gemstone.gemfire.internal.cache;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashSet;
@@ -35,6 +36,7 @@ import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.cache.CacheRuntimeException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -81,6 +83,9 @@ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
+import com.gemstone.gemfire.pdx.PdxInstance;
+import com.gemstone.gemfire.pdx.PdxSerializationException;
+import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
/**
* Abstract implementation of {@link RegionMap}that has all the common
@@ -298,6 +303,10 @@ public abstract class AbstractRegionMap implements RegionMap {
public RegionEntry getEntry(Object key) {
RegionEntry re = (RegionEntry)_getMap().get(key);
+ if (re != null && re.isMarkedForEviction()) {
+ // entry has been faulted in from HDFS
+ return null;
+ }
return re;
}
@@ -328,12 +337,16 @@ public abstract class AbstractRegionMap implements RegionMap {
@Override
public final RegionEntry getOperationalEntryInVM(Object key) {
RegionEntry re = (RegionEntry)_getMap().get(key);
+ if (re != null && re.isMarkedForEviction()) {
+ // entry has been faulted in from HDFS
+ return null;
+ }
return re;
}
public final void removeEntry(Object key, RegionEntry re, boolean updateStat) {
- if (re.isTombstone() && _getMap().get(key) == re){
+ if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){
logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
@@ -349,7 +362,7 @@ public abstract class AbstractRegionMap implements RegionMap {
EntryEventImpl event, final LocalRegion owner,
final IndexUpdater indexUpdater) {
boolean success = false;
- if (re.isTombstone()&& _getMap().get(key) == re) {
+ if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
@@ -358,6 +371,18 @@ public abstract class AbstractRegionMap implements RegionMap {
indexUpdater.onEvent(owner, event, re);
}
+ //This is messy, but custom eviction calls removeEntry
+ //rather than re.destroy I think to avoid firing callbacks, etc.
+ //However, the value still needs to be set to removePhase1
+ //in order to remove the entry from disk.
+ if(event.isCustomEviction() && !re.isRemoved()) {
+ try {
+ re.removePhase1(owner, false);
+ } catch (RegionClearedException e) {
+ //that's ok, we were just trying to do evict incoming eviction
+ }
+ }
+
if (_getMap().remove(key, re)) {
re.removePhase2();
success = true;
@@ -1144,7 +1169,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// transaction conflict (caused by eviction) when the entry
// is being added to transaction state.
if (isEviction) {
- if (!confirmEvictionDestroy(oldRe)) {
+ if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
opCompleted = false;
return opCompleted;
}
@@ -1399,7 +1424,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// See comment above about eviction checks
if (isEviction) {
assert expectedOldValue == null;
- if (!confirmEvictionDestroy(re)) {
+ if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
opCompleted = false;
return opCompleted;
}
@@ -1481,6 +1506,12 @@ public abstract class AbstractRegionMap implements RegionMap {
}
} // !isRemoved
else { // already removed
+ if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) {
+ // For HDFS region there may be a race with eviction
+ // so retry the operation. fixes bug 49150
+ retry = true;
+ continue;
+ }
if (re.isTombstone() && event.getVersionTag() != null) {
// if we're dealing with a tombstone and this is a remote event
// (e.g., from cache client update thread) we need to update
@@ -2654,7 +2685,11 @@ public abstract class AbstractRegionMap implements RegionMap {
boolean onlyExisting, boolean returnTombstone) {
Object key = event.getKey();
RegionEntry retVal = null;
- retVal = getEntry(event);
+ if (event.isFetchFromHDFS()) {
+ retVal = getEntry(event);
+ } else {
+ retVal = getEntryInVM(key);
+ }
if (onlyExisting) {
if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
return null;
@@ -2953,6 +2988,47 @@ public abstract class AbstractRegionMap implements RegionMap {
else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
BucketRegion br = (BucketRegion)owner;
CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
+ long startTime= stats.startCustomEviction();
+ CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
+ // No need to update indexes if entry was faulted in but operation did not succeed.
+ if (csAttr != null && (csAttr.isEvictIncoming() || re.isMarkedForEviction())) {
+
+ if (csAttr.getCriteria().doEvict(event)) {
+ stats.incEvictionsInProgress();
+ // set the flag on event saying the entry should be evicted
+ // and not indexed
+ @Released EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(),
+ null/* newValue */, null, false, owner.getMyId());
+ try {
+
+ destroyEvent.setOldValueFromRegion();
+ destroyEvent.setCustomEviction(true);
+ destroyEvent.setPossibleDuplicate(event.isPossibleDuplicate());
+ if(logger.isDebugEnabled()) {
+ logger.debug("Evicting the entry " + destroyEvent);
+ }
+ if(result != null) {
+ removeEntry(event.getKey(),re, true, destroyEvent,owner, indexUpdater);
+ }
+ else{
+ removeEntry(event.getKey(),re, true, destroyEvent,owner, null);
+ }
+ //mark the region entry for this event as evicted
+ event.setEvicted();
+ stats.incEvictions();
+ if(logger.isDebugEnabled()) {
+ logger.debug("Evicted the entry " + destroyEvent);
+ }
+ //removeEntry(event.getKey(), re);
+ } finally {
+ destroyEvent.release();
+ stats.decEvictionsInProgress();
+ }
+ } else {
+ re.clearMarkedForEviction();
+ }
+ }
+ stats.endCustomEviction(startTime);
}
} // try
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index c241c6b..3038059 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -1316,6 +1316,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
((BucketRegion)br).processPendingSecondaryExpires();
}
if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
+ // i.e. this stats is not getting incremented for HDFSBucketRegionQueue!!
BucketRegionQueue brq = (BucketRegionQueue)br;
brq.incQueueSize(brq.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index f5ae0fb..6e4f426 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.logging.log4j.Logger;
@@ -34,6 +35,7 @@ import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.CopyHelper;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.DeltaSerializationException;
+import com.gemstone.gemfire.GemFireIOException;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.SystemFailure;
@@ -41,16 +43,20 @@ import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAlgorithm;
import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.EvictionCriteria;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
import com.gemstone.gemfire.cache.partition.PartitionListener;
import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
import com.gemstone.gemfire.distributed.DistributedMember;
@@ -84,11 +90,13 @@ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.concurrent.Atomics;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
@@ -225,6 +233,8 @@ implements Bucket
return eventSeqNum;
}
+ protected final AtomicReference<HoplogOrganizer> hoplog = new AtomicReference<HoplogOrganizer>();
+
public BucketRegion(String regionName, RegionAttributes attrs,
LocalRegion parentRegion, GemFireCacheImpl cache,
InternalRegionArguments internalRegionArgs) {
@@ -882,6 +892,12 @@ implements Bucket
beginLocalWrite(event);
try {
+ // increment the tailKey so that invalidate operations are written to HDFS
+ if (this.partitionedRegion.hdfsStoreName != null) {
+ /* MergeGemXDHDFSToGFE Disabled this while porting. Is this required? */
+ //assert this.partitionedRegion.isLocalParallelWanEnabled();
+ handleWANEvent(event);
+ }
// which performs the local op.
// The ARM then calls basicInvalidatePart2 with the entry synchronized.
if ( !hasSeenEvent(event) ) {
@@ -1136,6 +1152,20 @@ implements Bucket
if (this.partitionedRegion.isParallelWanEnabled()) {
handleWANEvent(event);
}
+ // In GemFire EVICT_DESTROY is not distributed, so in order to remove the entry
+ // from memory, allow the destroy to proceed. fixes #49784
+ if (event.isLoadedFromHDFS() && !getBucketAdvisor().isPrimary()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Put the destory event in HDFS queue on secondary "
+ + "and return as event is HDFS loaded " + event);
+ }
+ notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
+ return;
+ }else{
+ if (logger.isDebugEnabled()) {
+ logger.debug("Going ahead with the destroy on GemFire system");
+ }
+ }
// This call should invoke AbstractRegionMap (aka ARM) destroy method
// which calls the CacheWriter, then performs the local op.
// The ARM then calls basicDestroyPart2 with the entry synchronized.
@@ -1334,7 +1364,39 @@ implements Bucket
}
@Override
+ public boolean isHDFSRegion() {
+ return this.partitionedRegion.isHDFSRegion();
+ }
+
+ @Override
+ public boolean isHDFSReadWriteRegion() {
+ return this.partitionedRegion.isHDFSReadWriteRegion();
+ }
+
+ @Override
+ protected boolean isHDFSWriteOnly() {
+ return this.partitionedRegion.isHDFSWriteOnly();
+ }
+
+ @Override
public int sizeEstimate() {
+ if (isHDFSReadWriteRegion()) {
+ try {
+ checkForPrimary();
+ ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+ if (q == null) return 0;
+ int hdfsBucketRegionSize = q.getBucketRegionQueue(
+ partitionedRegion, getId()).size();
+ int hoplogEstimate = (int) getHoplogOrganizer().sizeEstimate();
+ if (logger.isDebugEnabled()) {
+ logger.debug("for bucket " + getName() + " estimateSize returning "
+ + (hdfsBucketRegionSize + hoplogEstimate));
+ }
+ return hdfsBucketRegionSize + hoplogEstimate;
+ } catch (ForceReattemptException e) {
+ throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+ }
+ }
return size();
}
@@ -1391,14 +1453,14 @@ implements Bucket
* if there is a serialization problem
* see LocalRegion#getDeserializedValue(RegionEntry, KeyInfo, boolean, boolean, boolean, EntryEventImpl, boolean, boolean, boolean)
*/
- private RawValue getSerialized(Object key,
- boolean updateStats,
- boolean doNotLockEntry,
- EntryEventImpl clientEvent,
- boolean returnTombstones)
+ private RawValue getSerialized(Object key, boolean updateStats, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws EntryNotFoundException, IOException {
RegionEntry re = null;
- re = this.entries.getEntry(key);
+ if (allowReadFromHDFS) {
+ re = this.entries.getEntry(key);
+ } else {
+ re = this.entries.getOperationalEntryInVM(key);
+ }
if (re == null) {
return NULLVALUE;
}
@@ -1442,18 +1504,13 @@ implements Bucket
*
* @param keyInfo
* @param generateCallbacks
- * @param clientEvent holder for the entry's version information
+ * @param clientEvent holder for the entry's version information
* @param returnTombstones TODO
* @return serialized (byte) form
* @throws IOException if the result is not serializable
* @see LocalRegion#get(Object, Object, boolean, EntryEventImpl)
*/
- public RawValue getSerialized(KeyInfo keyInfo,
- boolean generateCallbacks,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws IOException {
+ public RawValue getSerialized(KeyInfo keyInfo, boolean generateCallbacks, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws IOException {
checkReadiness();
checkForNoAccess();
CachePerfStats stats = getCachePerfStats();
@@ -1463,7 +1520,7 @@ implements Bucket
try {
RawValue valueBytes = NULLVALUE;
boolean isCreate = false;
- RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones);
+ RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS);
isCreate = result == NULLVALUE || (result.getRawValue() == Token.TOMBSTONE && !returnTombstones);
miss = (result == NULLVALUE || Token.isInvalid(result.getRawValue()));
if (miss) {
@@ -1475,7 +1532,7 @@ implements Bucket
return REQUIRES_ENTRY_LOCK;
}
Object value = nonTxnFindObject(keyInfo, isCreate,
- generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false);
+ generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false, allowReadFromHDFS);
if (value != null) {
result = new RawValue(value);
}
@@ -2414,8 +2471,36 @@ implements Bucket
}
public void beforeAcquiringPrimaryState() {
+ try {
+ createHoplogOrganizer();
+ } catch (IOException e) {
+ // 48990: when HDFS was down, gemfirexd should still start normally
+ logger.warn(LocalizedStrings.HOPLOG_NOT_STARTED_YET, e);
+ } catch(Throwable e) {
+ /*MergeGemXDHDFSToGFE changed this code to checkReadiness*/
+ // SystemFailure.checkThrowable(e);
+ this.checkReadiness();
+ //49333 - no matter what, we should elect a primary.
+ logger.error(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION, e);
+ }
+ }
+
+ public HoplogOrganizer<?> createHoplogOrganizer() throws IOException {
+ if (getPartitionedRegion().isHDFSRegion()) {
+ HoplogOrganizer<?> organizer = hoplog.get();
+ if (organizer != null) {
+ // hoplog is recreated by anther thread
+ return organizer;
+ }
+
+ HoplogOrganizer hdfs = hoplog.getAndSet(getPartitionedRegion().hdfsManager.create(getId()));
+ assert hdfs == null;
+ return hoplog.get();
+ } else {
+ return null;
+ }
}
-
+
public void afterAcquiringPrimaryState() {
}
@@ -2423,13 +2508,105 @@ implements Bucket
* Invoked when a primary bucket is demoted.
*/
public void beforeReleasingPrimaryLockDuringDemotion() {
+ releaseHoplogOrganizer();
}
+ protected void releaseHoplogOrganizer() {
+ // release resources during a clean transition
+ HoplogOrganizer hdfs = hoplog.getAndSet(null);
+ if (hdfs != null) {
+ getPartitionedRegion().hdfsManager.close(getId());
+ }
+ }
+
+ public HoplogOrganizer<?> getHoplogOrganizer() throws HDFSIOException {
+ HoplogOrganizer<?> organizer = hoplog.get();
+ if (organizer == null) {
+ synchronized (getBucketAdvisor()) {
+ checkForPrimary();
+ try {
+ organizer = createHoplogOrganizer();
+ } catch (IOException e) {
+ throw new HDFSIOException("Failed to create Hoplog organizer due to ", e);
+ }
+ if (organizer == null) {
+ throw new HDFSIOException("Hoplog organizer is not available for " + this);
+ }
+ }
+ }
+ return organizer;
+ }
+
@Override
public RegionAttributes getAttributes() {
return this;
}
+ @Override
+ public void hdfsCalled(Object key) {
+ this.partitionedRegion.hdfsCalled(key);
+ }
+
+ @Override
+ protected void clearHDFSData() {
+ //clear the HDFS data if present
+ if (getPartitionedRegion().isHDFSReadWriteRegion()) {
+ // Clear the queue
+ ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+ if (q == null) return;
+ q.clear(getPartitionedRegion(), this.getId());
+ HoplogOrganizer organizer = hoplog.get();
+ if (organizer != null) {
+ try {
+ organizer.clear();
+ } catch (IOException e) {
+ throw new GemFireIOException(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA.toLocalizedString(), e);
+ }
+ }
+ }
+ }
+
+ public EvictionCriteria getEvictionCriteria() {
+ return this.partitionedRegion.getEvictionCriteria();
+ }
+
+ public CustomEvictionAttributes getCustomEvictionAttributes() {
+ return this.partitionedRegion.getCustomEvictionAttributes();
+ }
+
+ /**
+ * @return true if the evict destroy was done; false if it was not needed
+ */
+ public boolean customEvictDestroy(Object key)
+ {
+ checkReadiness();
+ @Released final EntryEventImpl event =
+ generateCustomEvictDestroyEvent(key);
+ event.setCustomEviction(true);
+ boolean locked = false;
+ try {
+ locked = beginLocalWrite(event);
+ return mapDestroy(event,
+ false, // cacheWrite
+ true, // isEviction
+ null); // expectedOldValue
+ }
+ catch (CacheWriterException error) {
+ throw new Error(LocalizedStrings.LocalRegion_CACHE_WRITER_SHOULD_NOT_HAVE_BEEN_CALLED_FOR_EVICTDESTROY.toLocalizedString(), error);
+ }
+ catch (TimeoutException anotherError) {
+ throw new Error(LocalizedStrings.LocalRegion_NO_DISTRIBUTED_LOCK_SHOULD_HAVE_BEEN_ATTEMPTED_FOR_EVICTDESTROY.toLocalizedString(), anotherError);
+ }
+ catch (EntryNotFoundException yetAnotherError) {
+ throw new Error(LocalizedStrings.LocalRegion_ENTRYNOTFOUNDEXCEPTION_SHOULD_BE_MASKED_FOR_EVICTDESTROY.toLocalizedString(), yetAnotherError);
+ } finally {
+ if (locked) {
+ endLocalWrite(event);
+ }
+ event.release();
+ }
+ }
+
public boolean areSecondariesPingable() {
Set<InternalDistributedMember> hostingservers = this.partitionedRegion.getRegionAdvisor()
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
index 0243cde..0facd93 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
@@ -441,7 +441,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
}
- protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) {
+ protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
if (didPut) {
if (this.initialized) {
this.eventSeqNumQueue.add(key);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
index 4a34771..6f673c7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
@@ -38,6 +38,8 @@ import com.gemstone.gemfire.cache.InterestPolicy;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.distributed.Role;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
@@ -1226,16 +1228,30 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
public boolean include(final Profile profile) {
if (profile instanceof CacheProfile) {
final CacheProfile cp = (CacheProfile)profile;
- if (allAsyncEventIds.equals(cp.asyncEventQueueIds)) {
+ /*Since HDFS queues are created only when a region is created, this check is
+ * unnecessary. Also this check is creating problem because hdfs queue is not
+ * created on an accessor. Hence removing this check for hdfs queues. */
+ Set<String> allAsyncEventIdsNoHDFS = removeHDFSQueues(allAsyncEventIds);
+ Set<String> profileQueueIdsNoHDFS = removeHDFSQueues(cp.asyncEventQueueIds);
+ if (allAsyncEventIdsNoHDFS.equals(profileQueueIdsNoHDFS)) {
return true;
}else{
- differAsycnQueueIds.add(allAsyncEventIds);
- differAsycnQueueIds.add(cp.asyncEventQueueIds);
+ differAsycnQueueIds.add(allAsyncEventIdsNoHDFS);
+ differAsycnQueueIds.add(profileQueueIdsNoHDFS);
return false;
}
}
return false;
}
+ private Set<String> removeHDFSQueues(Set<String> queueIds){
+ Set<String> queueIdsWithoutHDFSQueues = new HashSet<String>();
+ for (String queueId: queueIds){
+ if (!queueId.startsWith(HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS)){
+ queueIdsWithoutHDFSQueues.add(queueId);
+ }
+ }
+ return queueIdsWithoutHDFSQueues;
+ }
});
return differAsycnQueueIds;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
index ad84963..382c537 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
@@ -156,6 +156,13 @@ public class CachePerfStats {
protected static final int compressionPreCompressedBytesId;
protected static final int compressionPostCompressedBytesId;
+ protected static final int evictByCriteria_evictionsId;// total actual evictions (entries evicted)
+ protected static final int evictByCriteria_evictionTimeId;// total eviction time including product + user expr.
+ protected static final int evictByCriteria_evictionsInProgressId;
+ protected static final int evictByCriteria_evaluationsId;// total eviction attempts
+ protected static final int evictByCriteria_evaluationTimeId;// time taken to evaluate user expression.
+
+
/** The Statistics object that we delegate most behavior to */
protected final Statistics stats;
@@ -514,6 +521,12 @@ public class CachePerfStats {
compressionDecompressionsId = type.nameToId("decompressions");
compressionPreCompressedBytesId = type.nameToId("preCompressedBytes");
compressionPostCompressedBytesId = type.nameToId("postCompressedBytes");
+
+ evictByCriteria_evictionsId = type.nameToId("evictByCriteria_evictions");
+ evictByCriteria_evictionTimeId = type.nameToId("evictByCriteria_evictionTime");
+ evictByCriteria_evictionsInProgressId = type.nameToId("evictByCriteria_evictionsInProgress");
+ evictByCriteria_evaluationsId= type.nameToId("evictByCriteria_evaluations");
+ evictByCriteria_evaluationTimeId = type.nameToId("evictByCriteria_evaluationTime");
}
//////////////////////// Constructors ////////////////////////
@@ -1341,4 +1354,66 @@ public class CachePerfStats {
stats.incLong(exportTimeId, getStatTime() - start);
}
}
+
+// // used for the case of evict on incoming
+ public long startCustomEviction() {
+ return NanoTimer.getTime();
+ }
+
+ // used for the case of evict on incoming
+ public void endCustomEviction(long start) {
+ long ts = NanoTimer.getTime();
+ stats.incLong(evictByCriteria_evictionTimeId, ts - start);
+ }
+
+ public void incEvictionsInProgress() {
+ this.stats.incLong(evictByCriteria_evictionsInProgressId, 1);
+ }
+
+ public void decEvictionsInProgress() {
+ this.stats.incLong(evictByCriteria_evictionsInProgressId, -1);
+ }
+
+ public void incEvictions() {
+ this.stats.incLong(evictByCriteria_evictionsId, 1);
+ }
+
+ public void incEvaluations() {
+ this.stats.incLong(evictByCriteria_evaluationsId, 1);
+ }
+
+ public void incEvaluations(int delta) {
+ this.stats.incLong(evictByCriteria_evaluationsId, delta);
+ }
+
+ public long startEvaluation() {
+ return NanoTimer.getTime();
+ }
+
+ public void endEvaluation(long start, long notEvaluationTime) {
+ long ts = NanoTimer.getTime();
+ long totalTime = ts - start;
+ long evaluationTime = totalTime - notEvaluationTime;
+ stats.incLong(evictByCriteria_evaluationTimeId, evaluationTime);
+ }
+
+ public long getEvictions() {
+ return stats.getLong(evictByCriteria_evictionsId);
+ }
+
+ public long getEvictionsInProgress() {
+ return stats.getLong(evictByCriteria_evictionsInProgressId);
+ }
+
+ public long getEvictionsTime() {
+ return stats.getLong(evictByCriteria_evictionTimeId);
+ }
+
+ public long getEvaluations() {
+ return stats.getLong(evictByCriteria_evaluationsId);
+ }
+
+ public long getEvaluationTime() {
+ return stats.getLong(evictByCriteria_evaluationTimeId);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
index 72edc10..1441144 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
@@ -107,6 +107,9 @@ public class ColocationHelper {
}
private static PartitionedRegion getColocatedPR(
final PartitionedRegion partitionedRegion, final String colocatedWith) {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.HOPLOG_0_COLOCATE_WITH_REGION_1_NOT_INITIALIZED_YET,
+ new Object[] { partitionedRegion.getFullPath(), colocatedWith }));
PartitionedRegion colocatedPR = (PartitionedRegion) partitionedRegion
.getCache().getPartitionedRegion(colocatedWith, false);
assert colocatedPR != null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
new file mode 100644
index 0000000..0c82f97
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+import com.gemstone.gemfire.cache.EvictionCriteria;
+
+/**
+ * Concrete instance of {@link CustomEvictionAttributes}.
+ *
+ * @since gfxd 1.0
+ */
+public final class CustomEvictionAttributesImpl extends
+ CustomEvictionAttributes {
+
+ public CustomEvictionAttributesImpl(EvictionCriteria<?, ?> criteria,
+ long startTime, long interval, boolean evictIncoming) {
+ super(criteria, startTime, interval, evictIncoming);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
index cafdb80..f8475ae 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
@@ -145,7 +145,7 @@ public class DistTXState extends TXState {
}
}
} // end if primary
- }
+ } // end non-hdfs buckets
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index 6a7b4f2..a6d2488 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -863,6 +863,8 @@ public abstract class DistributedCacheOperation {
private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
+ protected final static short FETCH_FROM_HDFS = 0x200;
+
protected final static short IS_PUT_DML = 0x100;
public boolean needsRouting;
@@ -1365,6 +1367,7 @@ public abstract class DistributedCacheOperation {
if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) {
this.inhibitAllNotifications = true;
if (this instanceof PutAllMessage) {
+ ((PutAllMessage) this).setFetchFromHDFS((extBits & FETCH_FROM_HDFS) != 0);
((PutAllMessage) this).setPutDML((extBits & IS_PUT_DML) != 0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
index b6aa1b6..2817fdd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
@@ -856,6 +856,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
PutAllMessage msg = new PutAllMessage();
msg.eventId = event.getEventId();
msg.context = event.getContext();
+ msg.setFetchFromHDFS(event.isFetchFromHDFS());
msg.setPutDML(event.isPutDML());
return msg;
}
@@ -870,7 +871,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
public PutAllPRMessage createPRMessagesNotifyOnly(int bucketId) {
final EntryEventImpl event = getBaseEvent();
PutAllPRMessage prMsg = new PutAllPRMessage(bucketId, putAllDataSize, true,
- event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false /*isPutDML*/);
+ event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false, false /*isPutDML*/);
if (event.getContext() != null) {
prMsg.setBridgeContext(event.getContext());
}
@@ -899,7 +900,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
PutAllPRMessage prMsg = (PutAllPRMessage)prMsgMap.get(bucketId);
if (prMsg == null) {
prMsg = new PutAllPRMessage(bucketId.intValue(), putAllDataSize, false,
- event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isPutDML());
+ event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isFetchFromHDFS(), event.isPutDML());
prMsg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
// set dpao's context(original sender) into each PutAllMsg
@@ -1076,6 +1077,9 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
protected EventID eventId = null;
+ // By default, fetchFromHDFS == true;
+ private transient boolean fetchFromHDFS = true;
+
private transient boolean isPutDML = false;
protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
@@ -1133,11 +1137,12 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
* the region the entry is put in
*/
public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn,
- boolean requiresRegionContext, boolean isPutDML) {
+ boolean requiresRegionContext, boolean fetchFromHDFS, boolean isPutDML) {
@Released EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(),
this.context, rgn,
requiresRegionContext, this.possibleDuplicate,
this.needsRouting, this.callbackArg, true, skipCallbacks);
+ ev.setFetchFromHDFS(fetchFromHDFS);
ev.setPutDML(isPutDML);
// we don't need to set old value here, because the msg is from remote. local old value will get from next step
try {
@@ -1232,7 +1237,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
logger.debug("putAll processing {} with {} sender={}", putAllData[i], putAllData[i].versionTag, sender);
}
putAllData[i].setSender(sender);
- doEntryPut(putAllData[i], rgn, requiresRegionContext, isPutDML);
+ doEntryPut(putAllData[i], rgn, requiresRegionContext, fetchFromHDFS, isPutDML);
}
}
}, ev.getEventId());
@@ -1361,6 +1366,10 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
return Arrays.asList(ops);
}
+ public void setFetchFromHDFS(boolean val) {
+ this.fetchFromHDFS = val;
+ }
+
public void setPutDML(boolean val) {
this.isPutDML = val;
}
@@ -1368,6 +1377,9 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
@Override
protected short computeCompressedExtBits(short bits) {
bits = super.computeCompressedExtBits(bits);
+ if (fetchFromHDFS) {
+ bits |= FETCH_FROM_HDFS;
+ }
if (isPutDML) {
bits |= IS_PUT_DML;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 226d914..addba8e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -17,6 +17,8 @@
package com.gemstone.gemfire.internal.cache;
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -111,6 +113,8 @@ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationE
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
@@ -1260,6 +1264,8 @@ public class DistributedRegion extends LocalRegion implements
private final Set<DistributedMember> memoryThresholdReachedMembers =
new CopyOnWriteArraySet<DistributedMember>();
+ private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+
/** Sets and returns giiMissingRequiredRoles */
private boolean checkInitialImageForReliability(
InternalDistributedMember imageTarget,
@@ -2418,16 +2424,9 @@ public class DistributedRegion extends LocalRegion implements
/** @return the deserialized value */
@Override
@Retained
- protected Object findObjectInSystem(KeyInfo keyInfo,
- boolean isCreate,
- TXStateInterface txState,
- boolean generateCallbacks,
- Object localValue,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones)
+ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+ TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
+ boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws CacheLoaderException, TimeoutException
{
checkForLimitedOrNoAccess();
@@ -2546,6 +2545,18 @@ public class DistributedRegion extends LocalRegion implements
}
}
+ protected ConcurrentParallelGatewaySenderQueue getHDFSQueue() {
+ if (this.hdfsQueue == null) {
+ String asyncQId = this.getPartitionedRegion().getHDFSEventQueueName();
+ final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
+ final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+ AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
+ if (ep == null) return null;
+ hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
+ }
+ return hdfsQueue;
+ }
+
/** hook for subclasses to note that a cache load was performed
* @see BucketRegion#performedLoad
*/
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index e241622..2b826ce 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -193,8 +193,16 @@ public class EntryEventImpl
/** version tag for concurrency checks */
protected VersionTag versionTag;
+ /** boolean to indicate that this operation should be optimized by not fetching from HDFS*/
+ private transient boolean fetchFromHDFS = true;
+
private transient boolean isPutDML = false;
+ /** boolean to indicate that the RegionEntry for this event was loaded from HDFS*/
+ private transient boolean loadedFromHDFS= false;
+
+ private transient boolean isCustomEviction = false;
+
/** boolean to indicate that the RegionEntry for this event has been evicted*/
private transient boolean isEvicted = false;
@@ -650,6 +658,14 @@ public class EntryEventImpl
return this.op.isEviction();
}
+ public final boolean isCustomEviction() {
+ return this.isCustomEviction;
+ }
+
+ public final void setCustomEviction(boolean customEvict) {
+ this.isCustomEviction = customEvict;
+ }
+
public final void setEvicted() {
this.isEvicted = true;
}
@@ -3031,6 +3047,13 @@ public class EntryEventImpl
public boolean isOldValueOffHeap() {
return isOffHeapReference(this.oldValue);
}
+ public final boolean isFetchFromHDFS() {
+ return fetchFromHDFS;
+ }
+
+ public final void setFetchFromHDFS(boolean fetchFromHDFS) {
+ this.fetchFromHDFS = fetchFromHDFS;
+ }
public final boolean isPutDML() {
return this.isPutDML;
@@ -3039,4 +3062,12 @@ public class EntryEventImpl
public final void setPutDML(boolean val) {
this.isPutDML = val;
}
+
+ public final boolean isLoadedFromHDFS() {
+ return loadedFromHDFS;
+ }
+
+ public final void setLoadedFromHDFS(boolean loadedFromHDFS) {
+ this.loadedFromHDFS = loadedFromHDFS;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
new file mode 100644
index 0000000..9054d6d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.EvictionCriteria;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.gemstone.gemfire.internal.offheap.Releasable;
+/**
+ * Schedules each iteration periodically. EvictorService takes absolute time and
+ * a period as input and schedules Eviction at absolute times by calculating the
+ * interval. For scheduling the next eviction iteration it also takes into
+ * account the time taken to complete one iteration. If an iteration takes more
+ * time than the specified period then another iteration is scheduled
+ * immediately.
+ *
+ *
+ */
+
+public class EvictorService extends AbstractScheduledService {
+
+ private final EvictionCriteria<Object, Object> criteria;
+
+ // period is always in seconds
+ private long interval;
+
+ private volatile boolean stopScheduling;
+
+ private long nextScheduleTime;
+
+ private GemFireCacheImpl cache;
+
+ private Region region;
+
+ private volatile ScheduledExecutorService executorService;
+
+ public EvictorService(EvictionCriteria<Object, Object> criteria,
+ long evictorStartTime, long evictorInterval, TimeUnit unit, Region r) {
+ this.criteria = criteria;
+ this.interval = unit.toSeconds(evictorInterval);
+ this.region = r;
+ try {
+ this.cache = GemFireCacheImpl.getExisting();
+ } catch (CacheClosedException cce) {
+
+ }
+ //TODO: Unless we revisit System.currentTimeMillis or cacheTimeMillis keep the default
+// long now = (evictorStartTime != 0 ? evictorStartTime
+// + this.cache.getDistributionManager().getCacheTimeOffset() : this.cache
+// .getDistributionManager().cacheTimeMillis()) / 1000;
+ long now = this.cache.getDistributionManager().cacheTimeMillis() / 1000;
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n().fine(
+ "EvictorService: The startTime(now) is " + now + " evictorStartTime : " + evictorStartTime);
+ }
+
+ this.nextScheduleTime = now + 10;
+
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n().fine(
+ "EvictorService: The startTime is " + this.nextScheduleTime);
+ }
+ }
+
+ @Override
+ protected void runOneIteration() throws Exception {
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n()
+ .fine(
+ "EvictorService: Running the iteration at "
+ + cache.cacheTimeMillis());
+ }
+ if (stopScheduling || checkCancelled(cache)) {
+ stopScheduling(); // if check cancelled
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache
+ .getLoggerI18n()
+ .fine(
+ "EvictorService: Abort eviction since stopScheduling OR cancel in progress. Evicted 0 entries ");
+ }
+ return;
+ }
+ CachePerfStats stats = ((LocalRegion)this.region).getCachePerfStats();
+ long startEvictionTime = stats.startCustomEviction();
+ int evicted = 0;
+ long startEvaluationTime = stats.startEvaluation();
+ Iterator<Entry<Object, Object>> keysItr = null;
+ long totalIterationsTime = 0;
+
+ keysItr = this.criteria.getKeysToBeEvicted(this.cache
+ .getDistributionManager().cacheTimeMillis(), this.region);
+ try {
+ stats.incEvaluations(this.region.size());
+ // if we have been asked to stop scheduling
+ // or the cache is closing stop in between.
+
+
+ while (keysItr.hasNext() && !stopScheduling && !checkCancelled(cache)) {
+ Map.Entry<Object, Object> entry = keysItr.next();
+ long startIterationTime = this.cache
+ .getDistributionManager().cacheTimeMillis();
+ Object routingObj = entry.getValue();
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n().fine(
+ "EvictorService: Going to evict the following entry " + entry);
+ }
+ if (this.region instanceof PartitionedRegion) {
+ try {
+ PartitionedRegion pr = (PartitionedRegion)this.region;
+ stats.incEvictionsInProgress();
+ int bucketId = PartitionedRegionHelper.getHashKey(pr, routingObj);
+ BucketRegion br = pr.getDataStore().getLocalBucketById(bucketId);
+ // This has to be called on BucketRegion directly and not on the PR as
+ // PR doesn't allow operation on Secondary buckets.
+ if (br != null) {
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n().fine(
+ "EvictorService: Going to evict the following entry " + entry
+ + " from bucket " + br);
+ }
+ if (br.getBucketAdvisor().isPrimary()) {
+ boolean succ = false;
+ try {
+ succ = br.customEvictDestroy(entry.getKey());
+ } catch (PrimaryBucketException e) {
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n().warning(
+ LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
+ }
+ }
+
+ if (succ)
+ evicted++;
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n()
+ .fine(
+ "EvictorService: Evicted the following entry " + entry
+ + " from bucket " + br + " successfully " + succ
+ + " the value in buk is " /*
+ * +
+ * br.get(entry.getKey())
+ */);
+ }
+ }
+ }
+ stats.incEvictions();
+ } catch (Exception e) {
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n().warning(
+ LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
+ }
+ // TODO:
+ // Do the exception handling .
+ // Check if the bucket is present
+ // If the entry could not be evicted then log the warning
+ // Log any other exception.
+ }finally{
+ stats.decEvictionsInProgress();
+ long endIterationTime = this.cache
+ .getDistributionManager().cacheTimeMillis();
+ totalIterationsTime += (endIterationTime - startIterationTime);
+ }
+ }
+ }
+ }finally {
+ if(keysItr instanceof Releasable) {
+ ((Releasable)keysItr).release();
+ }
+ }
+ stats.endEvaluation(startEvaluationTime, totalIterationsTime);
+
+ if (this.cache.getLoggerI18n().fineEnabled()) {
+ this.cache.getLoggerI18n().fine(
+ "EvictorService: Completed an iteration at time "
+ + this.cache.getDistributionManager().cacheTimeMillis() / 1000
+ + ". Evicted " + evicted + " entries.");
+ }
+ stats.endCustomEviction(startEvictionTime);
+ }
+
+ private boolean checkCancelled(GemFireCacheImpl cache) {
+ if (cache.getCancelCriterion().cancelInProgress() != null) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected Scheduler scheduler() {
+ return new CustomScheduler() {
+ @Override
+ protected Schedule getNextSchedule() throws Exception {
+ // get the current time in seconds from DM.
+ // it takes care of clock skew etc in different VMs
+ long now = cache.getDistributionManager().cacheTimeMillis() / 1000;
+ if (cache.getLoggerI18n().fineEnabled()) {
+ cache.getLoggerI18n().fine("EvictorService: Now is " + now);
+ }
+ long delay = 0;
+ if (now < nextScheduleTime) {
+ delay = nextScheduleTime - now;
+ }
+ nextScheduleTime += interval;
+ // calculate the next immediate time i.e. schedule time in seconds
+ // set the schedule.delay to that scheduletime - currenttime
+ if (cache.getLoggerI18n().fineEnabled()) {
+ cache.getLoggerI18n().fine(
+ "EvictorService: Returning the next schedule with delay " + delay
+ + " next schedule is at : " + nextScheduleTime);
+ }
+
+ return new Schedule(delay, TimeUnit.SECONDS);
+ }
+ };
+ }
+
+ /**
+ * Region.destroy and Region.close should make sure to call this method. This
+ * will be called here.
+ */
+ public void stopScheduling() {
+ this.stopScheduling = true;
+ }
+
+ // this will be called when we stop the service.
+ // not sure if we have to do any cleanup
+ // to stop the service call stop()
+ @Override
+ protected void shutDown() throws Exception {
+ this.executorService.shutdownNow();
+ this.region= null;
+ this.cache = null;
+ }
+
+ // This will be called when we start the service.
+ // not sure if we have to any intialization
+ @Override
+ protected void startUp() throws Exception {
+
+ }
+
+ public void changeEvictionInterval(long newInterval) {
+ this.interval = newInterval / 1000;
+ if (cache.getLoggerI18n().fineEnabled()) {
+ cache.getLoggerI18n().fine(
+ "EvictorService: New interval is " + this.interval);
+ }
+ }
+
+ public void changeStartTime(long newStart) {
+ this.nextScheduleTime = newStart/1000;
+ if (cache.getLoggerI18n().fineEnabled()) {
+ cache.getLoggerI18n().fine("EvictorService: New start time is " + this.nextScheduleTime);
+ }
+ }
+
+ protected ScheduledExecutorService executor() {
+ this.executorService = super.executor();
+ return this.executorService;
+ }
+
+}