You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/04/27 23:16:42 UTC
[10/22] incubator-geode git commit: GEODE-1072: Removing HDFS related
code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
index 74efd51..c5b5d3a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
@@ -96,8 +96,6 @@ public interface GatewaySender {
public static final int DEFAULT_DISPATCHER_THREADS = 5;
- public static final int DEFAULT_HDFS_DISPATCHER_THREADS = 5;
-
public static final OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY;
/**
* The default maximum amount of memory (MB) to allow in the queue before
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index 77f24a3..bd78f5a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -52,7 +52,6 @@ import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusRequest;
import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusResponse;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.internal.CqEntry;
import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults;
@@ -1023,8 +1022,6 @@ public final class DSFIDFactory implements DataSerializableFixedID {
RemoteFetchVersionMessage.FetchVersionReplyMessage.class);
registerDSFID(RELEASE_CLEAR_LOCK_MESSAGE, ReleaseClearLockMessage.class);
registerDSFID(PR_TOMBSTONE_MESSAGE, PRTombstoneMessage.class);
- registerDSFID(HDFS_GATEWAY_EVENT_IMPL, HDFSGatewayEventImpl.class);
-
registerDSFID(REQUEST_RVV_MESSAGE, InitialImageOperation.RequestRVVMessage.class);
registerDSFID(RVV_REPLY_MESSAGE, InitialImageOperation.RVVReplyMessage.class);
registerDSFID(SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE, SnappyCompressedCachedDeserializable.class);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 5d52346..7427f90 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -103,7 +103,6 @@ public interface DataSerializableFixedID extends SerializationVersions {
public static final short JOIN_RESPONSE = -143;
public static final short JOIN_REQUEST = -142;
- public static final short HDFS_GATEWAY_EVENT_IMPL = -141;
public static final short SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE = -140;
public static final short GATEWAY_EVENT_IMPL = -136;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
index 4d4197e..f8740db 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
@@ -33,7 +33,6 @@ import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.CustomExpiry;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Declarable;
@@ -50,10 +49,7 @@ import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.SubscriptionAttributes;
-import com.gemstone.gemfire.compression.CompressionException;
import com.gemstone.gemfire.compression.Compressor;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.EvictionAttributesImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -108,8 +104,6 @@ public class RemoteRegionAttributes implements RegionAttributes,
private String[] gatewaySendersDescs;
private boolean isGatewaySenderEnabled = false;
private String[] asyncEventQueueDescs;
- private String hdfsStoreName;
- private boolean hdfsWriteOnly;
private String compressorDesc;
private boolean offHeap;
@@ -161,8 +155,6 @@ public class RemoteRegionAttributes implements RegionAttributes,
this.isDiskSynchronous = attr.isDiskSynchronous();
this.gatewaySendersDescs = getDescs(attr.getGatewaySenderIds().toArray());
this.asyncEventQueueDescs = getDescs(attr.getAsyncEventQueueIds().toArray());
- this.hdfsStoreName = attr.getHDFSStoreName();
- this.hdfsWriteOnly = attr.getHDFSWriteOnly();
this.compressorDesc = getDesc(attr.getCompressor());
this.offHeap = attr.getOffHeap();
}
@@ -419,7 +411,6 @@ public class RemoteRegionAttributes implements RegionAttributes,
DataSerializer.writeString(this.compressorDesc, out);
out.writeBoolean(this.offHeap);
- DataSerializer.writeString(this.hdfsStoreName, out);
}
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
@@ -468,7 +459,6 @@ public class RemoteRegionAttributes implements RegionAttributes,
this.compressorDesc = DataSerializer.readString(in);
this.offHeap = in.readBoolean();
- this.hdfsStoreName = DataSerializer.readString(in);
}
private String[] getDescs(Object[] l) {
@@ -636,15 +626,6 @@ public class RemoteRegionAttributes implements RegionAttributes,
return this.evictionAttributes;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public CustomEvictionAttributes getCustomEvictionAttributes() {
- // TODO: HDFS: no support for custom eviction attributes from remote yet
- return null;
- }
-
public boolean getCloningEnabled() {
// TODO Auto-generated method stub
return this.cloningEnable;
@@ -653,12 +634,6 @@ public class RemoteRegionAttributes implements RegionAttributes,
public String getDiskStoreName() {
return this.diskStoreName;
}
- public String getHDFSStoreName() {
- return this.hdfsStoreName;
- }
- public boolean getHDFSWriteOnly() {
- return this.hdfsWriteOnly;
- }
public boolean isDiskSynchronous() {
return this.isDiskSynchronous;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
index 1f8da88..92eaa01 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
@@ -34,8 +34,6 @@ import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
import com.gemstone.gemfire.internal.cache.lru.LRUStatistics;
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
@@ -459,17 +457,8 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
}
waitIfQueueFull();
- int sizeOfHdfsEvent = -1;
try {
- if (this instanceof HDFSBucketRegionQueue) {
- // need to fetch the size before event is inserted in queue.
- // fix for #50016
- if (this.getBucketAdvisor().isPrimary()) {
- HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
- sizeOfHdfsEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
- }
- }
-
+
didPut = virtualPut(event, false, false, null, false, startPut, true);
checkReadiness();
@@ -492,7 +481,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
destroyKey(key);
didPut = false;
} else {
- addToEventQueue(key, didPut, event, sizeOfHdfsEvent);
+ addToEventQueue(key, didPut, event);
}
return didPut;
}
@@ -522,8 +511,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
}
protected abstract void clearQueues();
- protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event,
- int sizeOfHdfsEvent);
+ protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event);
@Override
public void afterAcquiringPrimaryState() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
index 10644cb..d37f025 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.Logger;
@@ -46,7 +45,6 @@ import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheStatistics;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.CustomExpiry;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.DiskWriteAttributes;
@@ -54,7 +52,6 @@ import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.EvictionAttributes;
import com.gemstone.gemfire.cache.EvictionAttributesMutator;
-import com.gemstone.gemfire.cache.EvictionCriteria;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.ExpirationAttributes;
import com.gemstone.gemfire.cache.MembershipAttributes;
@@ -100,7 +97,6 @@ import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.util.ArrayUtils;
import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
-import com.google.common.util.concurrent.Service.State;
/**
* Takes care of RegionAttributes, AttributesMutator, and some no-brainer method
@@ -236,8 +232,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
protected EvictionAttributesImpl evictionAttributes = new EvictionAttributesImpl();
- protected CustomEvictionAttributes customEvictionAttributes;
-
/** The membership attributes defining required roles functionality */
protected MembershipAttributes membershipAttributes;
@@ -260,10 +254,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
protected String poolName;
- protected String hdfsStoreName;
-
- protected boolean hdfsWriteOnly;
-
protected Compressor compressor;
/**
@@ -898,16 +888,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
return this.subscriptionAttributes;
}
- @Override
- public final String getHDFSStoreName() {
- return this.hdfsStoreName;
- }
-
- @Override
- public final boolean getHDFSWriteOnly() {
- return this.hdfsWriteOnly;
- }
-
/**
* Get IndexManger for region
*/
@@ -1728,7 +1708,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
this.setEvictionController(this.evictionAttributes
.createEvictionController(this, attrs.getOffHeap()));
}
- this.customEvictionAttributes = attrs.getCustomEvictionAttributes();
storeCacheListenersField(attrs.getCacheListeners());
assignCacheLoader(attrs.getCacheLoader());
assignCacheWriter(attrs.getCacheWriter());
@@ -1786,8 +1765,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
+ "when multiuser-authentication is true.");
}
}
- this.hdfsStoreName = attrs.getHDFSStoreName();
- this.hdfsWriteOnly = attrs.getHDFSWriteOnly();
this.diskStoreName = attrs.getDiskStoreName();
this.isDiskSynchronous = attrs.isDiskSynchronous();
@@ -1853,52 +1830,12 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
return this.evictionAttributes;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public CustomEvictionAttributes getCustomEvictionAttributes() {
- return this.customEvictionAttributes;
- }
-
public EvictionAttributesMutator getEvictionAttributesMutator()
{
return this.evictionAttributes;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public CustomEvictionAttributes setCustomEvictionAttributes(long newStart,
- long newInterval) {
- checkReadiness();
-
- if (this.customEvictionAttributes == null) {
- throw new IllegalArgumentException(
- LocalizedStrings.AbstractRegion_NO_CUSTOM_EVICTION_SET
- .toLocalizedString(getFullPath()));
- }
-
- if (newStart == 0) {
- newStart = this.customEvictionAttributes.getEvictorStartTime();
- }
- this.customEvictionAttributes = new CustomEvictionAttributesImpl(
- this.customEvictionAttributes.getCriteria(), newStart, newInterval,
- newStart == 0 && newInterval == 0);
-
-// if (this.evService == null) {
-// initilializeCustomEvictor();
-// } else {// we are changing the earlier one which is already started.
-// EvictorService service = getEvictorTask();
-// service.changeEvictionInterval(newInterval);
-// if (newStart != 0)
-// service.changeStartTime(newStart);
-// }
- return this.customEvictionAttributes;
- }
-
public void setEvictionController(LRUAlgorithm evictionController)
{
this.evictionController = evictionController;
@@ -2037,7 +1974,6 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
/**
* @since 8.1
- * property used to find region operations that reach out to HDFS multiple times
*/
@Override
public ExtensionPoint<Region<?, ?>> getExtensionPoint() {
@@ -2047,87 +1983,4 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
public boolean getOffHeap() {
return this.offHeap;
}
- /**
- * property used to find region operations that reach out to HDFS multiple times
- */
- private static final boolean DEBUG_HDFS_CALLS = Boolean.getBoolean("DebugHDFSCalls");
-
- /**
- * throws exception if region operation goes out to HDFS multiple times
- */
- private static final boolean THROW_ON_MULTIPLE_HDFS_CALLS = Boolean.getBoolean("throwOnMultipleHDFSCalls");
-
- private ThreadLocal<CallLog> logHDFSCalls = DEBUG_HDFS_CALLS ? new ThreadLocal<CallLog>() : null;
-
- public void hdfsCalled(Object key) {
- if (!DEBUG_HDFS_CALLS) {
- return;
- }
- logHDFSCalls.get().addStack(new Throwable());
- logHDFSCalls.get().setKey(key);
- }
- public final void operationStart() {
- if (!DEBUG_HDFS_CALLS) {
- return;
- }
- if (logHDFSCalls.get() == null) {
- logHDFSCalls.set(new CallLog());
- //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationStart", new Throwable());
- } else {
- logHDFSCalls.get().incNestedCall();
- //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:incNestedCall:", new Throwable());
- }
- }
- public final void operationCompleted() {
- if (!DEBUG_HDFS_CALLS) {
- return;
- }
- //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationCompleted", new Throwable());
- if (logHDFSCalls.get() != null && logHDFSCalls.get().decNestedCall() < 0) {
- logHDFSCalls.get().assertCalls();
- logHDFSCalls.set(null);
- }
- }
-
- public static class CallLog {
- private List<Throwable> stackTraces = new ArrayList<Throwable>();
- private Object key;
- private int nestedCall = 0;
- public void incNestedCall() {
- nestedCall++;
- }
- public int decNestedCall() {
- return --nestedCall;
- }
- public void addStack(Throwable stack) {
- this.stackTraces.add(stack);
- }
- public void setKey(Object key) {
- this.key = key;
- }
- public void assertCalls() {
- if (stackTraces.size() > 1) {
- Throwable firstTrace = new Throwable();
- Throwable lastTrace = firstTrace;
- for (Throwable t : this.stackTraces) {
- lastTrace.initCause(t);
- lastTrace = t;
- }
- if (THROW_ON_MULTIPLE_HDFS_CALLS) {
- throw new RuntimeException("SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
- } else {
- InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
- }
- }
- }
- }
-
- public EvictionCriteria getEvictionCriteria() {
- EvictionCriteria criteria = null;
- if (this.customEvictionAttributes != null
- && !this.customEvictionAttributes.isEvictIncoming()) {
- criteria = this.customEvictionAttributes.getCriteria();
- }
- return criteria;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index b936e3f..46a851d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -870,15 +870,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
removeEntry = true;
}
- // See #47887, we do not insert a tombstone for evicted HDFS
- // entries since the value is still present in HDFS
- // Check if we have to evict or just do destroy.
- boolean forceRemoveEntry =
- (event.isEviction() || event.isExpiration())
- && event.getRegion().isUsedForPartitionedRegionBucket()
- && event.getRegion().getPartitionedRegion().isHDFSRegion();
-
- if (removeEntry || forceRemoveEntry) {
+ if (removeEntry) {
boolean isThisTombstone = isTombstone();
if(inTokenMode && !event.getOperation().isEviction()) {
setValue(region, Token.DESTROYED);
@@ -1398,27 +1390,7 @@ public abstract class AbstractRegionEntry implements RegionEntry,
/**
* {@inheritDoc}
*/
- @Override
- public final boolean isMarkedForEviction() {
- return areAnyBitsSet(MARKED_FOR_EVICTION);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final void setMarkedForEviction() {
- setBits(MARKED_FOR_EVICTION);
- }
- /**
- * {@inheritDoc}
- */
- @Override
- public final void clearMarkedForEviction() {
- clearBits(~MARKED_FOR_EVICTION);
- }
-
@Override
public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) {
if (TXManagerImpl.decRefCount(this)) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 3286373..75a1e32 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -18,7 +18,6 @@
package com.gemstone.gemfire.internal.cache;
-import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashSet;
@@ -36,7 +35,6 @@ import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.cache.CacheRuntimeException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -83,9 +81,6 @@ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
-import com.gemstone.gemfire.pdx.PdxInstance;
-import com.gemstone.gemfire.pdx.PdxSerializationException;
-import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
/**
* Abstract implementation of {@link RegionMap}that has all the common
@@ -303,10 +298,6 @@ public abstract class AbstractRegionMap implements RegionMap {
public RegionEntry getEntry(Object key) {
RegionEntry re = (RegionEntry)_getMap().get(key);
- if (re != null && re.isMarkedForEviction()) {
- // entry has been faulted in from HDFS
- return null;
- }
return re;
}
@@ -337,16 +328,12 @@ public abstract class AbstractRegionMap implements RegionMap {
@Override
public final RegionEntry getOperationalEntryInVM(Object key) {
RegionEntry re = (RegionEntry)_getMap().get(key);
- if (re != null && re.isMarkedForEviction()) {
- // entry has been faulted in from HDFS
- return null;
- }
return re;
}
public final void removeEntry(Object key, RegionEntry re, boolean updateStat) {
- if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){
+ if (re.isTombstone() && _getMap().get(key) == re){
logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
@@ -362,7 +349,7 @@ public abstract class AbstractRegionMap implements RegionMap {
EntryEventImpl event, final LocalRegion owner,
final IndexUpdater indexUpdater) {
boolean success = false;
- if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) {
+ if (re.isTombstone()&& _getMap().get(key) == re) {
logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
return; // can't remove tombstones except from the tombstone sweeper
}
@@ -371,18 +358,6 @@ public abstract class AbstractRegionMap implements RegionMap {
indexUpdater.onEvent(owner, event, re);
}
- //This is messy, but custom eviction calls removeEntry
- //rather than re.destroy I think to avoid firing callbacks, etc.
- //However, the value still needs to be set to removePhase1
- //in order to remove the entry from disk.
- if(event.isCustomEviction() && !re.isRemoved()) {
- try {
- re.removePhase1(owner, false);
- } catch (RegionClearedException e) {
- //that's ok, we were just trying to do evict incoming eviction
- }
- }
-
if (_getMap().remove(key, re)) {
re.removePhase2();
success = true;
@@ -1169,7 +1144,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// transaction conflict (caused by eviction) when the entry
// is being added to transaction state.
if (isEviction) {
- if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
+ if (!confirmEvictionDestroy(oldRe)) {
opCompleted = false;
return opCompleted;
}
@@ -1424,7 +1399,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// See comment above about eviction checks
if (isEviction) {
assert expectedOldValue == null;
- if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
+ if (!confirmEvictionDestroy(re)) {
opCompleted = false;
return opCompleted;
}
@@ -1506,12 +1481,6 @@ public abstract class AbstractRegionMap implements RegionMap {
}
} // !isRemoved
else { // already removed
- if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) {
- // For HDFS region there may be a race with eviction
- // so retry the operation. fixes bug 49150
- retry = true;
- continue;
- }
if (re.isTombstone() && event.getVersionTag() != null) {
// if we're dealing with a tombstone and this is a remote event
// (e.g., from cache client update thread) we need to update
@@ -2685,11 +2654,7 @@ public abstract class AbstractRegionMap implements RegionMap {
boolean onlyExisting, boolean returnTombstone) {
Object key = event.getKey();
RegionEntry retVal = null;
- if (event.isFetchFromHDFS()) {
- retVal = getEntry(event);
- } else {
- retVal = getEntryInVM(key);
- }
+ retVal = getEntry(event);
if (onlyExisting) {
if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
return null;
@@ -2988,47 +2953,6 @@ public abstract class AbstractRegionMap implements RegionMap {
else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
BucketRegion br = (BucketRegion)owner;
CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
- long startTime= stats.startCustomEviction();
- CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
- // No need to update indexes if entry was faulted in but operation did not succeed.
- if (csAttr != null && (csAttr.isEvictIncoming() || re.isMarkedForEviction())) {
-
- if (csAttr.getCriteria().doEvict(event)) {
- stats.incEvictionsInProgress();
- // set the flag on event saying the entry should be evicted
- // and not indexed
- @Released EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(),
- null/* newValue */, null, false, owner.getMyId());
- try {
-
- destroyEvent.setOldValueFromRegion();
- destroyEvent.setCustomEviction(true);
- destroyEvent.setPossibleDuplicate(event.isPossibleDuplicate());
- if(logger.isDebugEnabled()) {
- logger.debug("Evicting the entry " + destroyEvent);
- }
- if(result != null) {
- removeEntry(event.getKey(),re, true, destroyEvent,owner, indexUpdater);
- }
- else{
- removeEntry(event.getKey(),re, true, destroyEvent,owner, null);
- }
- //mark the region entry for this event as evicted
- event.setEvicted();
- stats.incEvictions();
- if(logger.isDebugEnabled()) {
- logger.debug("Evicted the entry " + destroyEvent);
- }
- //removeEntry(event.getKey(), re);
- } finally {
- destroyEvent.release();
- stats.decEvictionsInProgress();
- }
- } else {
- re.clearMarkedForEviction();
- }
- }
- stats.endCustomEviction(startTime);
}
} // try
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index 3038059..c241c6b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -1316,7 +1316,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor {
((BucketRegion)br).processPendingSecondaryExpires();
}
if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
- // i.e. this stats is not getting incremented for HDFSBucketRegionQueue!!
BucketRegionQueue brq = (BucketRegionQueue)br;
brq.incQueueSize(brq.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index 6e4f426..f5ae0fb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -26,7 +26,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.logging.log4j.Logger;
@@ -35,7 +34,6 @@ import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.CopyHelper;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.DeltaSerializationException;
-import com.gemstone.gemfire.GemFireIOException;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.InvalidDeltaException;
import com.gemstone.gemfire.SystemFailure;
@@ -43,20 +41,16 @@ import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheWriter;
import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.CustomEvictionAttributes;
import com.gemstone.gemfire.cache.DiskAccessException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAlgorithm;
import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.EvictionCriteria;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
import com.gemstone.gemfire.cache.partition.PartitionListener;
import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
import com.gemstone.gemfire.distributed.DistributedMember;
@@ -90,13 +84,11 @@ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import com.gemstone.gemfire.internal.concurrent.Atomics;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
-import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
@@ -233,8 +225,6 @@ implements Bucket
return eventSeqNum;
}
- protected final AtomicReference<HoplogOrganizer> hoplog = new AtomicReference<HoplogOrganizer>();
-
public BucketRegion(String regionName, RegionAttributes attrs,
LocalRegion parentRegion, GemFireCacheImpl cache,
InternalRegionArguments internalRegionArgs) {
@@ -892,12 +882,6 @@ implements Bucket
beginLocalWrite(event);
try {
- // increment the tailKey so that invalidate operations are written to HDFS
- if (this.partitionedRegion.hdfsStoreName != null) {
- /* MergeGemXDHDFSToGFE Disabled this while porting. Is this required? */
- //assert this.partitionedRegion.isLocalParallelWanEnabled();
- handleWANEvent(event);
- }
// which performs the local op.
// The ARM then calls basicInvalidatePart2 with the entry synchronized.
if ( !hasSeenEvent(event) ) {
@@ -1152,20 +1136,6 @@ implements Bucket
if (this.partitionedRegion.isParallelWanEnabled()) {
handleWANEvent(event);
}
- // In GemFire EVICT_DESTROY is not distributed, so in order to remove the entry
- // from memory, allow the destroy to proceed. fixes #49784
- if (event.isLoadedFromHDFS() && !getBucketAdvisor().isPrimary()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Put the destory event in HDFS queue on secondary "
- + "and return as event is HDFS loaded " + event);
- }
- notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
- return;
- }else{
- if (logger.isDebugEnabled()) {
- logger.debug("Going ahead with the destroy on GemFire system");
- }
- }
// This call should invoke AbstractRegionMap (aka ARM) destroy method
// which calls the CacheWriter, then performs the local op.
// The ARM then calls basicDestroyPart2 with the entry synchronized.
@@ -1364,39 +1334,7 @@ implements Bucket
}
@Override
- public boolean isHDFSRegion() {
- return this.partitionedRegion.isHDFSRegion();
- }
-
- @Override
- public boolean isHDFSReadWriteRegion() {
- return this.partitionedRegion.isHDFSReadWriteRegion();
- }
-
- @Override
- protected boolean isHDFSWriteOnly() {
- return this.partitionedRegion.isHDFSWriteOnly();
- }
-
- @Override
public int sizeEstimate() {
- if (isHDFSReadWriteRegion()) {
- try {
- checkForPrimary();
- ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
- if (q == null) return 0;
- int hdfsBucketRegionSize = q.getBucketRegionQueue(
- partitionedRegion, getId()).size();
- int hoplogEstimate = (int) getHoplogOrganizer().sizeEstimate();
- if (logger.isDebugEnabled()) {
- logger.debug("for bucket " + getName() + " estimateSize returning "
- + (hdfsBucketRegionSize + hoplogEstimate));
- }
- return hdfsBucketRegionSize + hoplogEstimate;
- } catch (ForceReattemptException e) {
- throw new PrimaryBucketException(e.getLocalizedMessage(), e);
- }
- }
return size();
}
@@ -1453,14 +1391,14 @@ implements Bucket
* if there is a serialization problem
* see LocalRegion#getDeserializedValue(RegionEntry, KeyInfo, boolean, boolean, boolean, EntryEventImpl, boolean, boolean, boolean)
*/
- private RawValue getSerialized(Object key, boolean updateStats, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ private RawValue getSerialized(Object key,
+ boolean updateStats,
+ boolean doNotLockEntry,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones)
throws EntryNotFoundException, IOException {
RegionEntry re = null;
- if (allowReadFromHDFS) {
- re = this.entries.getEntry(key);
- } else {
- re = this.entries.getOperationalEntryInVM(key);
- }
+ re = this.entries.getEntry(key);
if (re == null) {
return NULLVALUE;
}
@@ -1504,13 +1442,18 @@ implements Bucket
*
* @param keyInfo
* @param generateCallbacks
- * @param clientEvent holder for the entry's version information
+ * @param clientEvent holder for the entry's version information
* @param returnTombstones TODO
* @return serialized (byte) form
* @throws IOException if the result is not serializable
* @see LocalRegion#get(Object, Object, boolean, EntryEventImpl)
*/
- public RawValue getSerialized(KeyInfo keyInfo, boolean generateCallbacks, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws IOException {
+ public RawValue getSerialized(KeyInfo keyInfo,
+ boolean generateCallbacks,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws IOException {
checkReadiness();
checkForNoAccess();
CachePerfStats stats = getCachePerfStats();
@@ -1520,7 +1463,7 @@ implements Bucket
try {
RawValue valueBytes = NULLVALUE;
boolean isCreate = false;
- RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS);
+ RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones);
isCreate = result == NULLVALUE || (result.getRawValue() == Token.TOMBSTONE && !returnTombstones);
miss = (result == NULLVALUE || Token.isInvalid(result.getRawValue()));
if (miss) {
@@ -1532,7 +1475,7 @@ implements Bucket
return REQUIRES_ENTRY_LOCK;
}
Object value = nonTxnFindObject(keyInfo, isCreate,
- generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false, allowReadFromHDFS);
+ generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false);
if (value != null) {
result = new RawValue(value);
}
@@ -2471,36 +2414,8 @@ implements Bucket
}
public void beforeAcquiringPrimaryState() {
- try {
- createHoplogOrganizer();
- } catch (IOException e) {
- // 48990: when HDFS was down, gemfirexd should still start normally
- logger.warn(LocalizedStrings.HOPLOG_NOT_STARTED_YET, e);
- } catch(Throwable e) {
- /*MergeGemXDHDFSToGFE changed this code to checkReadiness*/
- // SystemFailure.checkThrowable(e);
- this.checkReadiness();
- //49333 - no matter what, we should elect a primary.
- logger.error(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION, e);
- }
- }
-
- public HoplogOrganizer<?> createHoplogOrganizer() throws IOException {
- if (getPartitionedRegion().isHDFSRegion()) {
- HoplogOrganizer<?> organizer = hoplog.get();
- if (organizer != null) {
- // hoplog is recreated by anther thread
- return organizer;
- }
-
- HoplogOrganizer hdfs = hoplog.getAndSet(getPartitionedRegion().hdfsManager.create(getId()));
- assert hdfs == null;
- return hoplog.get();
- } else {
- return null;
- }
}
-
+
public void afterAcquiringPrimaryState() {
}
@@ -2508,105 +2423,13 @@ implements Bucket
* Invoked when a primary bucket is demoted.
*/
public void beforeReleasingPrimaryLockDuringDemotion() {
- releaseHoplogOrganizer();
}
- protected void releaseHoplogOrganizer() {
- // release resources during a clean transition
- HoplogOrganizer hdfs = hoplog.getAndSet(null);
- if (hdfs != null) {
- getPartitionedRegion().hdfsManager.close(getId());
- }
- }
-
- public HoplogOrganizer<?> getHoplogOrganizer() throws HDFSIOException {
- HoplogOrganizer<?> organizer = hoplog.get();
- if (organizer == null) {
- synchronized (getBucketAdvisor()) {
- checkForPrimary();
- try {
- organizer = createHoplogOrganizer();
- } catch (IOException e) {
- throw new HDFSIOException("Failed to create Hoplog organizer due to ", e);
- }
- if (organizer == null) {
- throw new HDFSIOException("Hoplog organizer is not available for " + this);
- }
- }
- }
- return organizer;
- }
-
@Override
public RegionAttributes getAttributes() {
return this;
}
- @Override
- public void hdfsCalled(Object key) {
- this.partitionedRegion.hdfsCalled(key);
- }
-
- @Override
- protected void clearHDFSData() {
- //clear the HDFS data if present
- if (getPartitionedRegion().isHDFSReadWriteRegion()) {
- // Clear the queue
- ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
- if (q == null) return;
- q.clear(getPartitionedRegion(), this.getId());
- HoplogOrganizer organizer = hoplog.get();
- if (organizer != null) {
- try {
- organizer.clear();
- } catch (IOException e) {
- throw new GemFireIOException(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA.toLocalizedString(), e);
- }
- }
- }
- }
-
- public EvictionCriteria getEvictionCriteria() {
- return this.partitionedRegion.getEvictionCriteria();
- }
-
- public CustomEvictionAttributes getCustomEvictionAttributes() {
- return this.partitionedRegion.getCustomEvictionAttributes();
- }
-
- /**
- * @return true if the evict destroy was done; false if it was not needed
- */
- public boolean customEvictDestroy(Object key)
- {
- checkReadiness();
- @Released final EntryEventImpl event =
- generateCustomEvictDestroyEvent(key);
- event.setCustomEviction(true);
- boolean locked = false;
- try {
- locked = beginLocalWrite(event);
- return mapDestroy(event,
- false, // cacheWrite
- true, // isEviction
- null); // expectedOldValue
- }
- catch (CacheWriterException error) {
- throw new Error(LocalizedStrings.LocalRegion_CACHE_WRITER_SHOULD_NOT_HAVE_BEEN_CALLED_FOR_EVICTDESTROY.toLocalizedString(), error);
- }
- catch (TimeoutException anotherError) {
- throw new Error(LocalizedStrings.LocalRegion_NO_DISTRIBUTED_LOCK_SHOULD_HAVE_BEEN_ATTEMPTED_FOR_EVICTDESTROY.toLocalizedString(), anotherError);
- }
- catch (EntryNotFoundException yetAnotherError) {
- throw new Error(LocalizedStrings.LocalRegion_ENTRYNOTFOUNDEXCEPTION_SHOULD_BE_MASKED_FOR_EVICTDESTROY.toLocalizedString(), yetAnotherError);
- } finally {
- if (locked) {
- endLocalWrite(event);
- }
- event.release();
- }
- }
-
public boolean areSecondariesPingable() {
Set<InternalDistributedMember> hostingservers = this.partitionedRegion.getRegionAdvisor()
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
index 0facd93..0243cde 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
@@ -441,7 +441,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
}
- protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
+ protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) {
if (didPut) {
if (this.initialized) {
this.eventSeqNumQueue.add(key);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
index 6f673c7..4a34771 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
@@ -38,8 +38,6 @@ import com.gemstone.gemfire.cache.InterestPolicy;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.SubscriptionAttributes;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
import com.gemstone.gemfire.distributed.Role;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
@@ -1228,30 +1226,16 @@ public class CacheDistributionAdvisor extends DistributionAdvisor {
public boolean include(final Profile profile) {
if (profile instanceof CacheProfile) {
final CacheProfile cp = (CacheProfile)profile;
- /*Since HDFS queues are created only when a region is created, this check is
- * unnecessary. Also this check is creating problem because hdfs queue is not
- * created on an accessor. Hence removing this check for hdfs queues. */
- Set<String> allAsyncEventIdsNoHDFS = removeHDFSQueues(allAsyncEventIds);
- Set<String> profileQueueIdsNoHDFS = removeHDFSQueues(cp.asyncEventQueueIds);
- if (allAsyncEventIdsNoHDFS.equals(profileQueueIdsNoHDFS)) {
+ if (allAsyncEventIds.equals(cp.asyncEventQueueIds)) {
return true;
}else{
- differAsycnQueueIds.add(allAsyncEventIdsNoHDFS);
- differAsycnQueueIds.add(profileQueueIdsNoHDFS);
+ differAsycnQueueIds.add(allAsyncEventIds);
+ differAsycnQueueIds.add(cp.asyncEventQueueIds);
return false;
}
}
return false;
}
- private Set<String> removeHDFSQueues(Set<String> queueIds){
- Set<String> queueIdsWithoutHDFSQueues = new HashSet<String>();
- for (String queueId: queueIds){
- if (!queueId.startsWith(HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS)){
- queueIdsWithoutHDFSQueues.add(queueId);
- }
- }
- return queueIdsWithoutHDFSQueues;
- }
});
return differAsycnQueueIds;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
index 382c537..ad84963 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
@@ -156,13 +156,6 @@ public class CachePerfStats {
protected static final int compressionPreCompressedBytesId;
protected static final int compressionPostCompressedBytesId;
- protected static final int evictByCriteria_evictionsId;// total actual evictions (entries evicted)
- protected static final int evictByCriteria_evictionTimeId;// total eviction time including product + user expr.
- protected static final int evictByCriteria_evictionsInProgressId;
- protected static final int evictByCriteria_evaluationsId;// total eviction attempts
- protected static final int evictByCriteria_evaluationTimeId;// time taken to evaluate user expression.
-
-
/** The Statistics object that we delegate most behavior to */
protected final Statistics stats;
@@ -521,12 +514,6 @@ public class CachePerfStats {
compressionDecompressionsId = type.nameToId("decompressions");
compressionPreCompressedBytesId = type.nameToId("preCompressedBytes");
compressionPostCompressedBytesId = type.nameToId("postCompressedBytes");
-
- evictByCriteria_evictionsId = type.nameToId("evictByCriteria_evictions");
- evictByCriteria_evictionTimeId = type.nameToId("evictByCriteria_evictionTime");
- evictByCriteria_evictionsInProgressId = type.nameToId("evictByCriteria_evictionsInProgress");
- evictByCriteria_evaluationsId= type.nameToId("evictByCriteria_evaluations");
- evictByCriteria_evaluationTimeId = type.nameToId("evictByCriteria_evaluationTime");
}
//////////////////////// Constructors ////////////////////////
@@ -1354,66 +1341,4 @@ public class CachePerfStats {
stats.incLong(exportTimeId, getStatTime() - start);
}
}
-
-// // used for the case of evict on incoming
- public long startCustomEviction() {
- return NanoTimer.getTime();
- }
-
- // used for the case of evict on incoming
- public void endCustomEviction(long start) {
- long ts = NanoTimer.getTime();
- stats.incLong(evictByCriteria_evictionTimeId, ts - start);
- }
-
- public void incEvictionsInProgress() {
- this.stats.incLong(evictByCriteria_evictionsInProgressId, 1);
- }
-
- public void decEvictionsInProgress() {
- this.stats.incLong(evictByCriteria_evictionsInProgressId, -1);
- }
-
- public void incEvictions() {
- this.stats.incLong(evictByCriteria_evictionsId, 1);
- }
-
- public void incEvaluations() {
- this.stats.incLong(evictByCriteria_evaluationsId, 1);
- }
-
- public void incEvaluations(int delta) {
- this.stats.incLong(evictByCriteria_evaluationsId, delta);
- }
-
- public long startEvaluation() {
- return NanoTimer.getTime();
- }
-
- public void endEvaluation(long start, long notEvaluationTime) {
- long ts = NanoTimer.getTime();
- long totalTime = ts - start;
- long evaluationTime = totalTime - notEvaluationTime;
- stats.incLong(evictByCriteria_evaluationTimeId, evaluationTime);
- }
-
- public long getEvictions() {
- return stats.getLong(evictByCriteria_evictionsId);
- }
-
- public long getEvictionsInProgress() {
- return stats.getLong(evictByCriteria_evictionsInProgressId);
- }
-
- public long getEvictionsTime() {
- return stats.getLong(evictByCriteria_evictionTimeId);
- }
-
- public long getEvaluations() {
- return stats.getLong(evictByCriteria_evaluationsId);
- }
-
- public long getEvaluationTime() {
- return stats.getLong(evictByCriteria_evaluationTimeId);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
index 1441144..72edc10 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
@@ -107,9 +107,6 @@ public class ColocationHelper {
}
private static PartitionedRegion getColocatedPR(
final PartitionedRegion partitionedRegion, final String colocatedWith) {
- logger.info(LocalizedMessage.create(
- LocalizedStrings.HOPLOG_0_COLOCATE_WITH_REGION_1_NOT_INITIALIZED_YET,
- new Object[] { partitionedRegion.getFullPath(), colocatedWith }));
PartitionedRegion colocatedPR = (PartitionedRegion) partitionedRegion
.getCache().getPartitionedRegion(colocatedWith, false);
assert colocatedPR != null;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
deleted file mode 100644
index 0c82f97..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.internal.cache;
-
-import com.gemstone.gemfire.cache.CustomEvictionAttributes;
-import com.gemstone.gemfire.cache.EvictionCriteria;
-
-/**
- * Concrete instance of {@link CustomEvictionAttributes}.
- *
- * @since gfxd 1.0
- */
-public final class CustomEvictionAttributesImpl extends
- CustomEvictionAttributes {
-
- public CustomEvictionAttributesImpl(EvictionCriteria<?, ?> criteria,
- long startTime, long interval, boolean evictIncoming) {
- super(criteria, startTime, interval, evictIncoming);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
index f8475ae..cafdb80 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
@@ -145,7 +145,7 @@ public class DistTXState extends TXState {
}
}
} // end if primary
- } // end non-hdfs buckets
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index a6d2488..6a7b4f2 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -863,8 +863,6 @@ public abstract class DistributedCacheOperation {
private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
- protected final static short FETCH_FROM_HDFS = 0x200;
-
protected final static short IS_PUT_DML = 0x100;
public boolean needsRouting;
@@ -1367,7 +1365,6 @@ public abstract class DistributedCacheOperation {
if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) {
this.inhibitAllNotifications = true;
if (this instanceof PutAllMessage) {
- ((PutAllMessage) this).setFetchFromHDFS((extBits & FETCH_FROM_HDFS) != 0);
((PutAllMessage) this).setPutDML((extBits & IS_PUT_DML) != 0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
index 2817fdd..b6aa1b6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
@@ -856,7 +856,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
PutAllMessage msg = new PutAllMessage();
msg.eventId = event.getEventId();
msg.context = event.getContext();
- msg.setFetchFromHDFS(event.isFetchFromHDFS());
msg.setPutDML(event.isPutDML());
return msg;
}
@@ -871,7 +870,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
public PutAllPRMessage createPRMessagesNotifyOnly(int bucketId) {
final EntryEventImpl event = getBaseEvent();
PutAllPRMessage prMsg = new PutAllPRMessage(bucketId, putAllDataSize, true,
- event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false, false /*isPutDML*/);
+ event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false /*isPutDML*/);
if (event.getContext() != null) {
prMsg.setBridgeContext(event.getContext());
}
@@ -900,7 +899,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
PutAllPRMessage prMsg = (PutAllPRMessage)prMsgMap.get(bucketId);
if (prMsg == null) {
prMsg = new PutAllPRMessage(bucketId.intValue(), putAllDataSize, false,
- event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isFetchFromHDFS(), event.isPutDML());
+ event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isPutDML());
prMsg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
// set dpao's context(original sender) into each PutAllMsg
@@ -1077,9 +1076,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
protected EventID eventId = null;
- // By default, fetchFromHDFS == true;
- private transient boolean fetchFromHDFS = true;
-
private transient boolean isPutDML = false;
protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
@@ -1137,12 +1133,11 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
* the region the entry is put in
*/
public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn,
- boolean requiresRegionContext, boolean fetchFromHDFS, boolean isPutDML) {
+ boolean requiresRegionContext, boolean isPutDML) {
@Released EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(),
this.context, rgn,
requiresRegionContext, this.possibleDuplicate,
this.needsRouting, this.callbackArg, true, skipCallbacks);
- ev.setFetchFromHDFS(fetchFromHDFS);
ev.setPutDML(isPutDML);
// we don't need to set old value here, because the msg is from remote. local old value will get from next step
try {
@@ -1237,7 +1232,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
logger.debug("putAll processing {} with {} sender={}", putAllData[i], putAllData[i].versionTag, sender);
}
putAllData[i].setSender(sender);
- doEntryPut(putAllData[i], rgn, requiresRegionContext, fetchFromHDFS, isPutDML);
+ doEntryPut(putAllData[i], rgn, requiresRegionContext, isPutDML);
}
}
}, ev.getEventId());
@@ -1366,10 +1361,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
return Arrays.asList(ops);
}
- public void setFetchFromHDFS(boolean val) {
- this.fetchFromHDFS = val;
- }
-
public void setPutDML(boolean val) {
this.isPutDML = val;
}
@@ -1377,9 +1368,6 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
@Override
protected short computeCompressedExtBits(short bits) {
bits = super.computeCompressedExtBits(bits);
- if (fetchFromHDFS) {
- bits |= FETCH_FROM_HDFS;
- }
if (isPutDML) {
bits |= IS_PUT_DML;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index addba8e..226d914 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -17,8 +17,6 @@
package com.gemstone.gemfire.internal.cache;
-import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -113,8 +111,6 @@ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationE
import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionSource;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
@@ -1264,8 +1260,6 @@ public class DistributedRegion extends LocalRegion implements
private final Set<DistributedMember> memoryThresholdReachedMembers =
new CopyOnWriteArraySet<DistributedMember>();
- private ConcurrentParallelGatewaySenderQueue hdfsQueue;
-
/** Sets and returns giiMissingRequiredRoles */
private boolean checkInitialImageForReliability(
InternalDistributedMember imageTarget,
@@ -2424,9 +2418,16 @@ public class DistributedRegion extends LocalRegion implements
/** @return the deserialized value */
@Override
@Retained
- protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
- TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
- boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ protected Object findObjectInSystem(KeyInfo keyInfo,
+ boolean isCreate,
+ TXStateInterface txState,
+ boolean generateCallbacks,
+ Object localValue,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones)
throws CacheLoaderException, TimeoutException
{
checkForLimitedOrNoAccess();
@@ -2545,18 +2546,6 @@ public class DistributedRegion extends LocalRegion implements
}
}
- protected ConcurrentParallelGatewaySenderQueue getHDFSQueue() {
- if (this.hdfsQueue == null) {
- String asyncQId = this.getPartitionedRegion().getHDFSEventQueueName();
- final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
- final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
- AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
- if (ep == null) return null;
- hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
- }
- return hdfsQueue;
- }
-
/** hook for subclasses to note that a cache load was performed
* @see BucketRegion#performedLoad
*/
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index 2b826ce..e241622 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -193,16 +193,8 @@ public class EntryEventImpl
/** version tag for concurrency checks */
protected VersionTag versionTag;
- /** boolean to indicate that this operation should be optimized by not fetching from HDFS*/
- private transient boolean fetchFromHDFS = true;
-
private transient boolean isPutDML = false;
- /** boolean to indicate that the RegionEntry for this event was loaded from HDFS*/
- private transient boolean loadedFromHDFS= false;
-
- private transient boolean isCustomEviction = false;
-
/** boolean to indicate that the RegionEntry for this event has been evicted*/
private transient boolean isEvicted = false;
@@ -658,14 +650,6 @@ public class EntryEventImpl
return this.op.isEviction();
}
- public final boolean isCustomEviction() {
- return this.isCustomEviction;
- }
-
- public final void setCustomEviction(boolean customEvict) {
- this.isCustomEviction = customEvict;
- }
-
public final void setEvicted() {
this.isEvicted = true;
}
@@ -3047,13 +3031,6 @@ public class EntryEventImpl
public boolean isOldValueOffHeap() {
return isOffHeapReference(this.oldValue);
}
- public final boolean isFetchFromHDFS() {
- return fetchFromHDFS;
- }
-
- public final void setFetchFromHDFS(boolean fetchFromHDFS) {
- this.fetchFromHDFS = fetchFromHDFS;
- }
public final boolean isPutDML() {
return this.isPutDML;
@@ -3062,12 +3039,4 @@ public class EntryEventImpl
public final void setPutDML(boolean val) {
this.isPutDML = val;
}
-
- public final boolean isLoadedFromHDFS() {
- return loadedFromHDFS;
- }
-
- public final void setLoadedFromHDFS(boolean loadedFromHDFS) {
- this.loadedFromHDFS = loadedFromHDFS;
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
deleted file mode 100644
index 9054d6d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.EvictionCriteria;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.google.common.util.concurrent.AbstractScheduledService;
-import com.gemstone.gemfire.internal.offheap.Releasable;
-/**
- * Schedules each iteration periodically. EvictorService takes absolute time and
- * a period as input and schedules Eviction at absolute times by calculating the
- * interval. For scheduling the next eviction iteration it also takes into
- * account the time taken to complete one iteration. If an iteration takes more
- * time than the specified period then another iteration is scheduled
- * immediately.
- *
- *
- */
-
-public class EvictorService extends AbstractScheduledService {
-
- private final EvictionCriteria<Object, Object> criteria;
-
- // period is always in seconds
- private long interval;
-
- private volatile boolean stopScheduling;
-
- private long nextScheduleTime;
-
- private GemFireCacheImpl cache;
-
- private Region region;
-
- private volatile ScheduledExecutorService executorService;
-
- public EvictorService(EvictionCriteria<Object, Object> criteria,
- long evictorStartTime, long evictorInterval, TimeUnit unit, Region r) {
- this.criteria = criteria;
- this.interval = unit.toSeconds(evictorInterval);
- this.region = r;
- try {
- this.cache = GemFireCacheImpl.getExisting();
- } catch (CacheClosedException cce) {
-
- }
- //TODO: Unless we revisit System.currentTimeMillis or cacheTimeMillis keep the default
-// long now = (evictorStartTime != 0 ? evictorStartTime
-// + this.cache.getDistributionManager().getCacheTimeOffset() : this.cache
-// .getDistributionManager().cacheTimeMillis()) / 1000;
- long now = this.cache.getDistributionManager().cacheTimeMillis() / 1000;
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n().fine(
- "EvictorService: The startTime(now) is " + now + " evictorStartTime : " + evictorStartTime);
- }
-
- this.nextScheduleTime = now + 10;
-
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n().fine(
- "EvictorService: The startTime is " + this.nextScheduleTime);
- }
- }
-
- @Override
- protected void runOneIteration() throws Exception {
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n()
- .fine(
- "EvictorService: Running the iteration at "
- + cache.cacheTimeMillis());
- }
- if (stopScheduling || checkCancelled(cache)) {
- stopScheduling(); // if check cancelled
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache
- .getLoggerI18n()
- .fine(
- "EvictorService: Abort eviction since stopScheduling OR cancel in progress. Evicted 0 entries ");
- }
- return;
- }
- CachePerfStats stats = ((LocalRegion)this.region).getCachePerfStats();
- long startEvictionTime = stats.startCustomEviction();
- int evicted = 0;
- long startEvaluationTime = stats.startEvaluation();
- Iterator<Entry<Object, Object>> keysItr = null;
- long totalIterationsTime = 0;
-
- keysItr = this.criteria.getKeysToBeEvicted(this.cache
- .getDistributionManager().cacheTimeMillis(), this.region);
- try {
- stats.incEvaluations(this.region.size());
- // if we have been asked to stop scheduling
- // or the cache is closing stop in between.
-
-
- while (keysItr.hasNext() && !stopScheduling && !checkCancelled(cache)) {
- Map.Entry<Object, Object> entry = keysItr.next();
- long startIterationTime = this.cache
- .getDistributionManager().cacheTimeMillis();
- Object routingObj = entry.getValue();
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n().fine(
- "EvictorService: Going to evict the following entry " + entry);
- }
- if (this.region instanceof PartitionedRegion) {
- try {
- PartitionedRegion pr = (PartitionedRegion)this.region;
- stats.incEvictionsInProgress();
- int bucketId = PartitionedRegionHelper.getHashKey(pr, routingObj);
- BucketRegion br = pr.getDataStore().getLocalBucketById(bucketId);
- // This has to be called on BucketRegion directly and not on the PR as
- // PR doesn't allow operation on Secondary buckets.
- if (br != null) {
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n().fine(
- "EvictorService: Going to evict the following entry " + entry
- + " from bucket " + br);
- }
- if (br.getBucketAdvisor().isPrimary()) {
- boolean succ = false;
- try {
- succ = br.customEvictDestroy(entry.getKey());
- } catch (PrimaryBucketException e) {
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n().warning(
- LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
- }
- }
-
- if (succ)
- evicted++;
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n()
- .fine(
- "EvictorService: Evicted the following entry " + entry
- + " from bucket " + br + " successfully " + succ
- + " the value in buk is " /*
- * +
- * br.get(entry.getKey())
- */);
- }
- }
- }
- stats.incEvictions();
- } catch (Exception e) {
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n().warning(
- LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
- }
- // TODO:
- // Do the exception handling .
- // Check if the bucket is present
- // If the entry could not be evicted then log the warning
- // Log any other exception.
- }finally{
- stats.decEvictionsInProgress();
- long endIterationTime = this.cache
- .getDistributionManager().cacheTimeMillis();
- totalIterationsTime += (endIterationTime - startIterationTime);
- }
- }
- }
- }finally {
- if(keysItr instanceof Releasable) {
- ((Releasable)keysItr).release();
- }
- }
- stats.endEvaluation(startEvaluationTime, totalIterationsTime);
-
- if (this.cache.getLoggerI18n().fineEnabled()) {
- this.cache.getLoggerI18n().fine(
- "EvictorService: Completed an iteration at time "
- + this.cache.getDistributionManager().cacheTimeMillis() / 1000
- + ". Evicted " + evicted + " entries.");
- }
- stats.endCustomEviction(startEvictionTime);
- }
-
- private boolean checkCancelled(GemFireCacheImpl cache) {
- if (cache.getCancelCriterion().cancelInProgress() != null) {
- return true;
- }
- return false;
- }
-
- @Override
- protected Scheduler scheduler() {
- return new CustomScheduler() {
- @Override
- protected Schedule getNextSchedule() throws Exception {
- // get the current time in seconds from DM.
- // it takes care of clock skew etc in different VMs
- long now = cache.getDistributionManager().cacheTimeMillis() / 1000;
- if (cache.getLoggerI18n().fineEnabled()) {
- cache.getLoggerI18n().fine("EvictorService: Now is " + now);
- }
- long delay = 0;
- if (now < nextScheduleTime) {
- delay = nextScheduleTime - now;
- }
- nextScheduleTime += interval;
- // calculate the next immediate time i.e. schedule time in seconds
- // set the schedule.delay to that scheduletime - currenttime
- if (cache.getLoggerI18n().fineEnabled()) {
- cache.getLoggerI18n().fine(
- "EvictorService: Returning the next schedule with delay " + delay
- + " next schedule is at : " + nextScheduleTime);
- }
-
- return new Schedule(delay, TimeUnit.SECONDS);
- }
- };
- }
-
- /**
- * Region.destroy and Region.close should make sure to call this method. This
- * will be called here.
- */
- public void stopScheduling() {
- this.stopScheduling = true;
- }
-
- // this will be called when we stop the service.
- // not sure if we have to do any cleanup
- // to stop the service call stop()
- @Override
- protected void shutDown() throws Exception {
- this.executorService.shutdownNow();
- this.region= null;
- this.cache = null;
- }
-
- // This will be called when we start the service.
- // not sure if we have to any intialization
- @Override
- protected void startUp() throws Exception {
-
- }
-
- public void changeEvictionInterval(long newInterval) {
- this.interval = newInterval / 1000;
- if (cache.getLoggerI18n().fineEnabled()) {
- cache.getLoggerI18n().fine(
- "EvictorService: New interval is " + this.interval);
- }
- }
-
- public void changeStartTime(long newStart) {
- this.nextScheduleTime = newStart/1000;
- if (cache.getLoggerI18n().fineEnabled()) {
- cache.getLoggerI18n().fine("EvictorService: New start time is " + this.nextScheduleTime);
- }
- }
-
- protected ScheduledExecutorService executor() {
- this.executorService = super.executor();
- return this.executorService;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index cc9727b..c477466 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -126,16 +126,6 @@ import com.gemstone.gemfire.cache.client.internal.ClientMetadataService;
import com.gemstone.gemfire.cache.client.internal.ClientRegionFactoryImpl;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.execute.FunctionService;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
@@ -932,9 +922,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
}
FunctionService.registerFunction(new PRContainsValueFunction());
- FunctionService.registerFunction(new HDFSLastCompactionTimeFunction());
- FunctionService.registerFunction(new HDFSForceCompactionFunction());
- FunctionService.registerFunction(new HDFSFlushQueueFunction());
this.expirationScheduler = new ExpirationScheduler(this.system);
// uncomment following line when debugging CacheExistsException
@@ -2185,8 +2172,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
closeDiskStores();
diskMonitor.close();
- closeHDFSStores();
-
// Close the CqService Handle.
try {
if (isDebugEnabled) {
@@ -2272,7 +2257,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
} catch (CancelException e) {
// make sure the disk stores get closed
closeDiskStores();
- closeHDFSStores();
// NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
// okay, we're taking too long to do this stuff, so let's
@@ -3119,8 +3103,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
future = (Future) this.reinitializingRegions.get(fullPath);
}
if (future == null) {
- HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath, attrs, this);
- attrs = setEvictionAttributesForLargeRegion(attrs);
if (internalRegionArgs.getInternalMetaRegion() != null) {
rgn = internalRegionArgs.getInternalMetaRegion();
} else if (isPartitionedRegion) {
@@ -3245,54 +3227,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- /**
- * turn on eviction by default for HDFS regions
- */
- @SuppressWarnings("deprecation")
- public <K, V> RegionAttributes<K, V> setEvictionAttributesForLargeRegion(
- RegionAttributes<K, V> attrs) {
- RegionAttributes<K, V> ra = attrs;
- if (DISABLE_AUTO_EVICTION) {
- return ra;
- }
- if (attrs.getDataPolicy().withHDFS()
- || attrs.getHDFSStoreName() != null) {
- // make the region overflow by default
- EvictionAttributes evictionAttributes = attrs.getEvictionAttributes();
- boolean hasNoEvictionAttrs = evictionAttributes == null
- || evictionAttributes.getAlgorithm().isNone();
- AttributesFactory<K, V> af = new AttributesFactory<K, V>(attrs);
- String diskStoreName = attrs.getDiskStoreName();
- // set the local persistent directory to be the same as that for
- // HDFS store
- if (attrs.getHDFSStoreName() != null) {
- HDFSStoreImpl hdfsStore = findHDFSStore(attrs.getHDFSStoreName());
- if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && hdfsStore == null) {
- // HDFS store expected to be found at this point
- throw new IllegalStateException(
- LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND
- .toLocalizedString(attrs.getHDFSStoreName()));
- }
- // if there is no disk store, use the one configured for hdfs queue
- if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && diskStoreName == null) {
- diskStoreName = hdfsStore.getDiskStoreName();
- }
- }
- // set LRU heap eviction with overflow to disk for HDFS stores with
- // local Oplog persistence
- // set eviction attributes only if not set
- if (hasNoEvictionAttrs) {
- if (diskStoreName != null) {
- af.setDiskStoreName(diskStoreName);
- }
- af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(
- ObjectSizer.DEFAULT, EvictionAction.OVERFLOW_TO_DISK));
- }
- ra = af.create();
- }
- return ra;
- }
-
public final Region getRegion(String path) {
return getRegion(path, false);
}
@@ -5403,39 +5337,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- public HDFSStoreFactory createHDFSStoreFactory(HDFSStoreCreation creation) {
- return new HDFSStoreFactoryImpl(this, creation);
- }
- public void addHDFSStore(HDFSStoreImpl hsi) {
- HDFSStoreDirector.getInstance().addHDFSStore(hsi);
- //TODO:HDFS Add a resource event for hdfs store creation as well
- // like the following disk store event
- //system.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi);
- }
-
- public void removeHDFSStore(HDFSStoreImpl hsi) {
- //hsi.destroy();
- HDFSStoreDirector.getInstance().removeHDFSStore(hsi.getName());
- //TODO:HDFS Add a resource event for hdfs store as well
- // like the following disk store event
- //system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
- }
-
- public void closeHDFSStores() {
- HDFSRegionDirector.reset();
- HDFSStoreDirector.getInstance().closeHDFSStores();
- }
-
-
- public HDFSStoreImpl findHDFSStore(String name) {
- return HDFSStoreDirector.getInstance().getHDFSStore(name);
- }
-
- public Collection<HDFSStoreImpl> getHDFSStores() {
- return HDFSStoreDirector.getInstance().getAllHDFSStores();
- }
-
-
public TemporaryResultSetFactory getResultSetFactory() {
return this.resultSetFactory;
}