You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:59 UTC
[13/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 328c196..c75286e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -92,12 +93,22 @@ import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
import com.gemstone.gemfire.cache.partition.PartitionListener;
import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
import com.gemstone.gemfire.cache.query.FunctionDomainException;
@@ -213,6 +224,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
import com.gemstone.gemfire.internal.cache.partitioned.PutMessage.PutResult;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
+import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveIndexesMessage;
@@ -244,6 +256,7 @@ import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
import com.gemstone.gemfire.internal.util.TransformUtils;
+import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
import com.gemstone.gemfire.i18n.StringId;
@@ -695,9 +708,17 @@ public class PartitionedRegion extends LocalRegion implements
private final PartitionListener[] partitionListeners;
private boolean isShadowPR = false;
-
+ private boolean isShadowPRForHDFS = false;
+
private AbstractGatewaySender parallelGatewaySender = null;
+ private final ThreadLocal<Boolean> queryHDFS = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
public PartitionedRegion(String regionname, RegionAttributes ra,
LocalRegion parentRegion, GemFireCacheImpl cache,
InternalRegionArguments internalRegionArgs) {
@@ -717,6 +738,12 @@ public class PartitionedRegion extends LocalRegion implements
// (which prevents pridmap cleanup).
cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener);
+ // add an async queue for the region if the store name is not null.
+ if (this.getHDFSStoreName() != null) {
+ String eventQueueName = getHDFSEventQueueName();
+ super.addAsyncEventQueueId(eventQueueName);
+ }
+
// this.userScope = ra.getScope();
this.partitionAttributes = ra.getPartitionAttributes();
this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory();
@@ -795,6 +822,8 @@ public class PartitionedRegion extends LocalRegion implements
if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) {
this.isShadowPR = true;
this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender();
+ if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue())
+ this.isShadowPRForHDFS = true;
}
@@ -838,10 +867,38 @@ public class PartitionedRegion extends LocalRegion implements
});
}
+ @Override
+ public final boolean isHDFSRegion() {
+ return this.getHDFSStoreName() != null;
+ }
+
+ @Override
+ public final boolean isHDFSReadWriteRegion() {
+ return isHDFSRegion() && !getHDFSWriteOnly();
+ }
+
+ @Override
+ protected final boolean isHDFSWriteOnly() {
+ return isHDFSRegion() && getHDFSWriteOnly();
+ }
+
+ public final void setQueryHDFS(boolean includeHDFS) {
+ queryHDFS.set(includeHDFS);
+ }
+
+ @Override
+ public final boolean includeHDFSResults() {
+ return queryHDFS.get();
+ }
+
public final boolean isShadowPR() {
return isShadowPR;
}
+ public final boolean isShadowPRForHDFS() {
+ return isShadowPRForHDFS;
+ }
+
public AbstractGatewaySender getParallelGatewaySender() {
return parallelGatewaySender;
}
@@ -1607,7 +1664,7 @@ public class PartitionedRegion extends LocalRegion implements
try {
final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId());
if (loc) {
- ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones);
+ ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true);
} else {
ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones);
// TODO:Suranjan&Yogesh : there should be better way than this one
@@ -2066,7 +2123,8 @@ public class PartitionedRegion extends LocalRegion implements
bucketStorageAssigned=false;
// if this is a Delta update, then throw exception since the key doesn't
// exist if there is no bucket for it yet
- if (event.hasDelta()) {
+ // For HDFS region, we will recover key, so allow bucket creation
+ if (!this.dataPolicy.withHDFS() && event.hasDelta()) {
throw new EntryNotFoundException(LocalizedStrings.
PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY
.toLocalizedString());
@@ -3261,9 +3319,9 @@ public class PartitionedRegion extends LocalRegion implements
*/
@Override
public Object get(Object key, Object aCallbackArgument,
- boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException
+ boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException
{
validateKey(key);
validateCallbackArg(aCallbackArgument);
@@ -3277,7 +3335,7 @@ public class PartitionedRegion extends LocalRegion implements
// if scope is local and there is no loader, then
// don't go further to try and get value
Object value = getDataView().findObject(getKeyInfo(key, aCallbackArgument), this, true/*isCreate*/, generateCallbacks,
- null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+ null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
if (value != null && !Token.isInvalid(value)) {
miss = false;
}
@@ -3323,7 +3381,7 @@ public class PartitionedRegion extends LocalRegion implements
if (primary == null) {
return null;
}
- if (isTX()) {
+ if (isTX() || this.hdfsStoreName != null) {
return getNodeForBucketWrite(bucketId, null);
}
InternalDistributedMember result = getRegionAdvisor().getPreferredNode(bucketId);
@@ -3337,7 +3395,7 @@ public class PartitionedRegion extends LocalRegion implements
*/
private InternalDistributedMember getNodeForBucketReadOrLoad(int bucketId) {
InternalDistributedMember targetNode;
- if (!this.haveCacheLoader) {
+ if (!this.haveCacheLoader && (this.hdfsStoreName == null)) {
targetNode = getNodeForBucketRead(bucketId);
}
else {
@@ -3470,16 +3528,9 @@ public class PartitionedRegion extends LocalRegion implements
}
@Override
- protected Object findObjectInSystem(KeyInfo keyInfo,
- boolean isCreate,
- TXStateInterface tx,
- boolean generateCallbacks,
- Object localValue,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones)
+ protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+ TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws CacheLoaderException, TimeoutException
{
Object obj = null;
@@ -3515,7 +3566,7 @@ public class PartitionedRegion extends LocalRegion implements
return null;
}
- obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry);
+ obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry, allowReadFromHDFS);
}
finally {
this.prStats.endGet(startTime);
@@ -4098,22 +4149,15 @@ public class PartitionedRegion extends LocalRegion implements
/**
* no docs
- * @param preferCD
+ * @param preferCD
* @param requestingClient the client requesting the object, or null if not from a client
* @param clientEvent TODO
* @param returnTombstones TODO
* @param allowRetry if false then do not retry
*/
private Object getFromBucket(final InternalDistributedMember targetNode,
- int bucketId,
- final Object key,
- final Object aCallbackArgument,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean allowRetry) {
+ int bucketId, final Object key, final Object aCallbackArgument,
+ boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowRetry, boolean allowReadFromHDFS) {
final boolean isDebugEnabled = logger.isDebugEnabled();
final int retryAttempts = calcRetry();
@@ -4143,7 +4187,7 @@ public class PartitionedRegion extends LocalRegion implements
try {
if (isLocal) {
obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false);
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS);
}
else {
if (localCacheEnabled && null != (obj = localCacheGet(key))) { // OFFHEAP: copy into heap cd; TODO optimize for preferCD case
@@ -4152,14 +4196,14 @@ public class PartitionedRegion extends LocalRegion implements
}
return obj;
}
- else if (this.haveCacheLoader) {
+ else if (this.haveCacheLoader || this.hdfsStoreName != null) {
// If the region has a cache loader,
// the target node is the primary server of the bucket. But, if the
// value can be found in a local bucket, we should first try there.
/* MergeGemXDHDFSToGFE -readoing from local bucket was disabled in GemXD*/
if (null != ( obj = getFromLocalBucket(bucketId, key, aCallbackArgument,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones))) {
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS))) {
return obj;
}
}
@@ -4167,7 +4211,7 @@ public class PartitionedRegion extends LocalRegion implements
// Test hook
if (((LocalRegion)this).isTest())
((LocalRegion)this).incCountNotFoundInLocal();
- obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones);
+ obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
// TODO:Suranjan&Yogesh : there should be better way than this one
String name = Thread.currentThread().getName();
@@ -4265,9 +4309,9 @@ public class PartitionedRegion extends LocalRegion implements
*
*/
public Object getFromLocalBucket(int bucketId, final Object key,
- final Object aCallbackArgument, boolean disableCopyOnRead,
- boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones)
+ final Object aCallbackArgument, boolean disableCopyOnRead,
+ boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws ForceReattemptException, PRLocallyDestroyedException {
Object obj;
// try reading locally.
@@ -4276,7 +4320,7 @@ public class PartitionedRegion extends LocalRegion implements
return null; // fixes 51657
}
if (readNode.equals(getMyId()) && null != ( obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true))) {
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true, allowReadFromHDFS))) {
if (logger.isTraceEnabled()) {
logger.trace("getFromBucket: Getting key {} ({}) locally - success", key, key.hashCode());
}
@@ -5072,13 +5116,7 @@ public class PartitionedRegion extends LocalRegion implements
* if the peer is no longer available
*/
public Object getRemotely(InternalDistributedMember targetNode,
- int bucketId,
- final Object key,
- final Object aCallbackArgument,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws PrimaryBucketException,
+ int bucketId, final Object key, final Object aCallbackArgument, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
ForceReattemptException {
Object value;
if (logger.isDebugEnabled()) {
@@ -5086,7 +5124,7 @@ public class PartitionedRegion extends LocalRegion implements
getPRId(), BUCKET_ID_SEPARATOR, bucketId, key);
}
GetResponse response = GetMessage.send(targetNode, this, key,
- aCallbackArgument, requestingClient, returnTombstones);
+ aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS);
this.prStats.incPartitionMessagesSent();
value = response.waitForResponse(preferCD);
if (clientEvent != null) {
@@ -7040,6 +7078,9 @@ public class PartitionedRegion extends LocalRegion implements
public int entryCount(Set<Integer> buckets,
boolean estimate) {
Map<Integer, SizeEntry> bucketSizes = null;
+ if (isHDFSReadWriteRegion() && (includeHDFSResults() || estimate)) {
+ bucketSizes = getSizeForHDFS( buckets, estimate);
+ } else {
if (buckets != null) {
if (this.dataStore != null) {
List<Integer> list = new ArrayList<Integer>();
@@ -7071,6 +7112,7 @@ public class PartitionedRegion extends LocalRegion implements
}
}
}
+ }
int size = 0;
if (bucketSizes != null) {
@@ -7093,7 +7135,81 @@ public class PartitionedRegion extends LocalRegion implements
return 0;
}
}
+ private Map<Integer, SizeEntry> getSizeForHDFS(final Set<Integer> buckets, boolean estimate) {
+ // figure out which buckets to include
+ Map<Integer, SizeEntry> bucketSizes = new HashMap<Integer, SizeEntry>();
+ getRegionAdvisor().accept(new BucketVisitor<Map<Integer, SizeEntry>>() {
+ @Override
+ public boolean visit(RegionAdvisor advisor, ProxyBucketRegion pbr,
+ Map<Integer, SizeEntry> map) {
+ if (buckets == null || buckets.contains(pbr.getBucketId())) {
+ map.put(pbr.getBucketId(), null);
+ // ensure that the bucket has been created
+ pbr.getPartitionedRegion().getOrCreateNodeForBucketWrite(pbr.getBucketId(), null);
+ }
+ return true;
+ }
+ }, bucketSizes);
+ RetryTimeKeeper retry = new RetryTimeKeeper(retryTimeout);
+
+ while (true) {
+ // get the size from local buckets
+ if (dataStore != null) {
+ Map<Integer, SizeEntry> localSizes;
+ if (estimate) {
+ localSizes = dataStore.getSizeEstimateForLocalPrimaryBuckets();
+ } else {
+ localSizes = dataStore.getSizeForLocalPrimaryBuckets();
+ }
+ for (Map.Entry<Integer, SizeEntry> me : localSizes.entrySet()) {
+ if (bucketSizes.containsKey(me.getKey())) {
+ bucketSizes.put(me.getKey(), me.getValue());
+ }
+ }
+ }
+ // all done
+ int count = 0;
+ Iterator it = bucketSizes.values().iterator();
+ while (it.hasNext()) {
+ if (it.next() != null) count++;
+ }
+ if (bucketSizes.size() == count) {
+ return bucketSizes;
+ }
+
+ Set<InternalDistributedMember> remotes = getRegionAdvisor().adviseDataStore(true);
+ remotes.remove(getMyId());
+
+ // collect remote sizes
+ if (!remotes.isEmpty()) {
+ Map<Integer, SizeEntry> remoteSizes = new HashMap<Integer, PartitionedRegion.SizeEntry>();
+ try {
+ remoteSizes = getSizeRemotely(remotes, estimate);
+ } catch (ReplyException e) {
+ // Remote member will never throw ForceReattemptException or
+ // PrimaryBucketException, so any exception on the remote member
+ // should be re-thrown
+ e.handleAsUnexpected();
+ }
+ for (Map.Entry<Integer, SizeEntry> me : remoteSizes.entrySet()) {
+ Integer k = me.getKey();
+ if (bucketSizes.containsKey(k) && me.getValue().isPrimary()) {
+ bucketSizes.put(k, me.getValue());
+ }
+ }
+ }
+
+ if (retry.overMaximum()) {
+ checkReadiness();
+ PRHARedundancyProvider.timedOut(this, null, null, "calculate size", retry.getRetryTime());
+ }
+
+ // throttle subsequent attempts
+ retry.waitForBucketsRecovery();
+ }
+ }
+
/**
* This method gets a PartitionServerSocketConnection to targetNode and sends
* size request to the node. It returns size of all the buckets "primarily"
@@ -7491,7 +7607,9 @@ public class PartitionedRegion extends LocalRegion implements
.append("; isClosed=").append(this.isClosed)
.append("; retryTimeout=").append(this.retryTimeout)
.append("; serialNumber=").append(getSerialNumber())
-
+ .append("; hdfsStoreName=").append(getHDFSStoreName())
+ .append("; hdfsWriteOnly=").append(getHDFSWriteOnly())
+
.append("; partition attributes=").append(getPartitionAttributes().toString())
.append("; on VM ").append(getMyId())
.append("]")
@@ -7634,6 +7752,18 @@ public class PartitionedRegion extends LocalRegion implements
@Override
public void destroyRegion(Object aCallbackArgument)
throws CacheWriterException, TimeoutException {
+ //For HDFS regions, we need a data store
+ //to do the global destroy so that it can delete
+ //the data from HDFS as well.
+ if(!isDataStore() && this.dataPolicy.withHDFS()) {
+ if(destroyOnDataStore(aCallbackArgument)) {
+ //If we were able to find a data store to do the destroy,
+ //stop here.
+ //otherwise go ahead and destroy the region from this member
+ return;
+ }
+ }
+
checkForColocatedChildren();
getDataView().checkSupportsRegionDestroy();
checkForLimitedOrNoAccess();
@@ -7681,6 +7811,7 @@ public class PartitionedRegion extends LocalRegion implements
boolean keepWaiting = true;
+ AsyncEventQueueImpl hdfsQueue = getHDFSEventQueue();
while(true) {
List<String> pausedSenders = new ArrayList<String>();
List<ConcurrentParallelGatewaySenderQueue> parallelQueues = new ArrayList<ConcurrentParallelGatewaySenderQueue>();
@@ -7798,6 +7929,11 @@ public class PartitionedRegion extends LocalRegion implements
}
}
}
+
+ if(hdfsQueue != null) {
+ hdfsQueue.destroy();
+ cache.removeAsyncEventQueue(hdfsQueue);
+ }
}
@Override
@@ -7978,6 +8114,9 @@ public class PartitionedRegion extends LocalRegion implements
final boolean isClose = event.getOperation().isClose();
destroyPartitionedRegionLocally(!isClose);
destroyCleanUp(event, serials);
+ if(!isClose) {
+ destroyHDFSData();
+ }
return true;
}
@@ -8270,6 +8409,8 @@ public class PartitionedRegion extends LocalRegion implements
}
}
+ HDFSRegionDirector.getInstance().clear(getFullPath());
+
RegionLogger.logDestroy(getName(), cache.getMyId(), null, op.isClose());
}
@@ -10914,6 +11055,11 @@ public class PartitionedRegion extends LocalRegion implements
}
}
+ //hoplogs - pause HDFS dispatcher while we
+ //clear the buckets to avoid missing some files
+ //during the clear
+ pauseHDFSDispatcher();
+
try {
// now clear the bucket regions; we go through the primary bucket
// regions so there is distribution for every bucket but that
@@ -10929,6 +11075,7 @@ public class PartitionedRegion extends LocalRegion implements
}
}
} finally {
+ resumeHDFSDispatcher();
// release the bucket locks
for (BucketRegion br : lockedRegions) {
try {
@@ -10944,6 +11091,247 @@ public class PartitionedRegion extends LocalRegion implements
}
}
+
+ /**Destroy all data in HDFS, if this region is using HDFS persistence.*/
+ private void destroyHDFSData() {
+ if(getHDFSStoreName() == null) {
+ return;
+ }
+
+ try {
+ hdfsManager.destroyData();
+ } catch (IOException e) {
+ logger.warn(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA, e);
+ }
+ }
+
+ private void pauseHDFSDispatcher() {
+ if(!isHDFSRegion()) {
+ return;
+ }
+ AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
+ if (eventProcessor == null) return;
+ eventProcessor.pauseDispatching();
+ eventProcessor.waitForDispatcherToPause();
+ }
+
+ /**
+ * Get the statistics for the HDFS event queue associated with this region,
+ * if any
+ */
+ public AsyncEventQueueStats getHDFSEventQueueStats() {
+ AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
+ if(asyncQ == null) {
+ return null;
+ }
+ return asyncQ.getStatistics();
+ }
+
+ protected AbstractGatewaySenderEventProcessor getHDFSEventProcessor() {
+ final AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
+ final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+ AbstractGatewaySenderEventProcessor eventProcessor = gatewaySender.getEventProcessor();
+ return eventProcessor;
+ }
+
+ public AsyncEventQueueImpl getHDFSEventQueue() {
+ String asyncQId = getHDFSEventQueueName();
+ if(asyncQId == null) {
+ return null;
+ }
+ final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
+ return asyncQ;
+ }
+
+ private void resumeHDFSDispatcher() {
+ if(!isHDFSRegion()) {
+ return;
+ }
+ AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
+ if (eventProcessor == null) return;
+ eventProcessor.resumeDispatching();
+ }
+
+ protected String getHDFSEventQueueName() {
+ if (!this.getDataPolicy().withHDFS()) return null;
+ String colocatedWith = this.getPartitionAttributes().getColocatedWith();
+ String eventQueueName;
+ if (colocatedWith != null) {
+ PartitionedRegion leader = ColocationHelper.getLeaderRegionName(this);
+ eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(leader
+ .getFullPath());
+ }
+ else {
+ eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(getFullPath());
+ }
+ return eventQueueName;
+ }
+
+ /**
+ * schedules compaction on all members where this region is hosted.
+ *
+ * @param isMajor
+ * true for major compaction
+ * @param maxWaitTime
+ * time to wait for the operation to complete, 0 will wait forever
+ */
+ @Override
+ public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) {
+ if (!this.isHDFSReadWriteRegion()) {
+ if (this.isHDFSRegion()) {
+ throw new UnsupportedOperationException(
+ LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
+ .toLocalizedString(getName()));
+ }
+ throw new UnsupportedOperationException(
+ LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+ .toLocalizedString(getName()));
+ }
+ // send request to remote data stores
+ long start = System.currentTimeMillis();
+ int waitTime = maxWaitTime * 1000;
+ HDFSForceCompactionArgs args = new HDFSForceCompactionArgs(getRegionAdvisor().getBucketSet(), isMajor, waitTime);
+ HDFSForceCompactionResultCollector rc = new HDFSForceCompactionResultCollector();
+ AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
+ execution.setWaitOnExceptionFlag(true); // wait for all exceptions
+ if (logger.isDebugEnabled()) {
+ logger.debug("HDFS: ForceCompat invoking function with arguments "+args);
+ }
+ execution.execute(HDFSForceCompactionFunction.ID);
+ List<CompactionStatus> result = rc.getResult();
+ Set<Integer> successfulBuckets = rc.getSuccessfulBucketIds();
+ if (rc.shouldRetry()) {
+ int retries = 0;
+ while (retries < HDFSForceCompactionFunction.FORCE_COMPACTION_MAX_RETRIES) {
+ waitTime -= System.currentTimeMillis() - start;
+ if (maxWaitTime > 0 && waitTime < 0) {
+ break;
+ }
+ start = System.currentTimeMillis();
+ retries++;
+ Set<Integer> retryBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
+ retryBuckets.removeAll(successfulBuckets);
+
+ for (int bucketId : retryBuckets) {
+ getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper(waitTime));
+ long now = System.currentTimeMillis();
+ waitTime -= now - start;
+ start = now;
+ }
+
+ args = new HDFSForceCompactionArgs(retryBuckets, isMajor, waitTime);
+ rc = new HDFSForceCompactionResultCollector();
+ execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
+ execution.setWaitOnExceptionFlag(true); // wait for all exceptions
+ if (logger.isDebugEnabled()) {
+ logger.debug("HDFS: ForceCompat re-invoking function with arguments "+args+" filter:"+retryBuckets);
+ }
+ execution.execute(HDFSForceCompactionFunction.ID);
+ result = rc.getResult();
+ successfulBuckets.addAll(rc.getSuccessfulBucketIds());
+ }
+ }
+ if (successfulBuckets.size() != getRegionAdvisor().getBucketSet().size()) {
+ checkReadiness();
+ Set<Integer> uncessfulBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
+ uncessfulBuckets.removeAll(successfulBuckets);
+ throw new FunctionException("Could not run compaction on following buckets:"+uncessfulBuckets);
+ }
+ }
+
+ /**
+ * Schedules compaction on local buckets
+ * @param buckets the set of buckets to compact
+ * @param isMajor true for major compaction
+ * @param time TODO use this
+ * @return a list of futures for the scheduled compaction tasks
+ */
+ public List<Future<CompactionStatus>> forceLocalHDFSCompaction(Set<Integer> buckets, boolean isMajor, long time) {
+ List<Future<CompactionStatus>> futures = new ArrayList<Future<CompactionStatus>>();
+ if (!isDataStore() || hdfsManager == null || buckets == null || buckets.isEmpty()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "HDFS: did not schedule local " + (isMajor ? "Major" : "Minor") + " compaction");
+ }
+ // nothing to do
+ return futures;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "HDFS: scheduling local " + (isMajor ? "Major" : "Minor") + " compaction for buckets:"+buckets);
+ }
+ Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers(buckets);
+
+ for (HoplogOrganizer hoplogOrganizer : organizers) {
+ Future<CompactionStatus> f = hoplogOrganizer.forceCompaction(isMajor);
+ futures.add(f);
+ }
+ return futures;
+ }
+
+ @Override
+ public void flushHDFSQueue(int maxWaitTime) {
+ if (!this.isHDFSRegion()) {
+ throw new UnsupportedOperationException(
+ LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+ .toLocalizedString(getName()));
+ }
+ HDFSFlushQueueFunction.flushQueue(this, maxWaitTime);
+ }
+
+ @Override
+ public long lastMajorHDFSCompaction() {
+ if (!this.isHDFSReadWriteRegion()) {
+ if (this.isHDFSRegion()) {
+ throw new UnsupportedOperationException(
+ LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
+ .toLocalizedString(getName()));
+ }
+ throw new UnsupportedOperationException(
+ LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+ .toLocalizedString(getName()));
+ }
+ List<Long> result = (List<Long>) FunctionService.onRegion(this)
+ .execute(HDFSLastCompactionTimeFunction.ID)
+ .getResult();
+ if (logger.isDebugEnabled()) {
+ logger.debug("HDFS: Result of LastCompactionTimeFunction "+result);
+ }
+ long min = Long.MAX_VALUE;
+ for (long ts : result) {
+ if (ts !=0 && ts < min) {
+ min = ts;
+ }
+ }
+ min = min == Long.MAX_VALUE ? 0 : min;
+ return min;
+ }
+
+ public long lastLocalMajorHDFSCompaction() {
+ if (!isDataStore() || hdfsManager == null) {
+ // nothing to do
+ return 0;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "HDFS: getting local Major compaction time");
+ }
+ Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers();
+ long minTS = Long.MAX_VALUE;
+ for (HoplogOrganizer hoplogOrganizer : organizers) {
+ long ts = hoplogOrganizer.getLastMajorCompactionTimestamp();
+ if (ts !=0 && ts < minTS) {
+ minTS = ts;
+ }
+ }
+ minTS = minTS == Long.MAX_VALUE ? 0 : minTS;
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "HDFS: local Major compaction time: "+minTS);
+ }
+ return minTS;
+ }
+
public void shadowPRWaitForBucketRecovery() {
assert this.isShadowPR();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
index bda68e3..57b1e71 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
@@ -64,6 +64,7 @@ import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultSender;
import com.gemstone.gemfire.cache.query.QueryInvalidException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
import com.gemstone.gemfire.cache.query.internal.QCompiler;
import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
@@ -2058,13 +2059,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
ForceReattemptException, PRLocallyDestroyedException
{
return getLocally(bucketId, key,aCallbackArgument, disableCopyOnRead, preferCD, requestingClient,
- clientEvent, returnTombstones, false);
+ clientEvent, returnTombstones, false, false);
}
/**
* Returns value corresponding to this key.
* @param key
* the key to look for
- * @param preferCD
+ * @param preferCD
* @param requestingClient the client making the request, or null
* @param clientEvent client's event (for returning version tag)
* @param returnTombstones whether tombstones should be returned
@@ -2075,28 +2076,21 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* @throws PrimaryBucketException if the locally managed bucket is not primary
* @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed
*/
- public Object getLocally(int bucketId,
- final Object key,
- final Object aCallbackArgument,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean opScopeIsLocal) throws PrimaryBucketException,
+ public Object getLocally(int bucketId, final Object key,
+ final Object aCallbackArgument, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+ boolean returnTombstones, boolean opScopeIsLocal, boolean allowReadFromHDFS) throws PrimaryBucketException,
ForceReattemptException, PRLocallyDestroyedException
{
final BucketRegion bucketRegion = getInitializedBucketForId(key, Integer.valueOf(bucketId));
// check for primary (when a loader is present) done deeper in the BucketRegion
Object ret=null;
if (logger.isDebugEnabled()) {
- logger.debug("getLocally: key {}) bucketId={}{}{} region {} returnTombstones {} ", key,
- this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones);
+ logger.debug("getLocally: key {}) bucketId={}{}{} region {} returnTombstones {} allowReadFromHDFS {}", key,
+ this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones, allowReadFromHDFS);
}
invokeBucketReadHook();
try {
- ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal,
- false);
+ ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, allowReadFromHDFS, false);
checkIfBucketMoved(bucketRegion);
}
catch (RegionDestroyedException rde) {
@@ -2128,11 +2122,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* @throws PrimaryBucketException if the locally managed bucket is not primary
* @see #getLocally(int, Object, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
*/
- public RawValue getSerializedLocally(KeyInfo keyInfo,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws PrimaryBucketException,
+ public RawValue getSerializedLocally(KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
ForceReattemptException {
final BucketRegion bucketRegion = getInitializedBucketForId(keyInfo.getKey(), keyInfo.getBucketId());
// check for primary (when loader is present) done deeper in the BucketRegion
@@ -2143,7 +2133,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
invokeBucketReadHook();
try {
- RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
+ RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
checkIfBucketMoved(bucketRegion);
return result;
} catch (RegionDestroyedException rde) {
@@ -2167,7 +2157,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* @param access
* true if caller wants last accessed time updated
* @param allowTombstones whether a tombstoned entry can be returned
- *
+ *
* @throws ForceReattemptException
* if bucket region is not present in this process
* @return a RegionEntry for the given key, which will be null if the key is
@@ -2178,7 +2168,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* if the PartitionRegion is locally destroyed
*/
public EntrySnapshot getEntryLocally(int bucketId, final Object key,
- boolean access, boolean allowTombstones)
+ boolean access, boolean allowTombstones, boolean allowReadFromHDFS)
throws EntryNotFoundException, PrimaryBucketException,
ForceReattemptException, PRLocallyDestroyedException
{
@@ -2191,7 +2181,12 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
EntrySnapshot res = null;
RegionEntry ent = null;
try {
- ent = bucketRegion.entries.getEntry(key);
+ if (allowReadFromHDFS) {
+ ent = bucketRegion.entries.getEntry(key);
+ }
+ else {
+ ent = bucketRegion.entries.getOperationalEntryInVM(key);
+ }
if (ent == null) {
this.getPartitionedRegion().checkReadiness();
@@ -2301,8 +2296,14 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
try{
if (r != null) {
Set keys = r.keySet(allowTombstones);
+ if (getPartitionedRegion().isHDFSReadWriteRegion()) {
+ // hdfs regions can't copy all keys into memory
+ ret = keys;
+
+ } else {
// A copy is made so that the bucket is free to move
ret = new HashSet(r.keySet(allowTombstones));
+ }
checkIfBucketMoved(r);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
index de1f7d8..f083268 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
@@ -65,19 +65,12 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
}
@Override
- public Object findObject(KeyInfo key,
- LocalRegion r,
- boolean isCreate,
- boolean generateCallbacks,
- Object value,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) {
+ public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
+ boolean generateCallbacks, Object value, boolean disableCopyOnRead,
+ boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
TXStateProxy tx = r.cache.getTXMgr().internalSuspend();
try {
- return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+ return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
} finally {
r.cache.getTXMgr().resume(tx);
}
@@ -89,14 +82,10 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
return pr.nonTXContainsKey(keyInfo);
}
@Override
- public Object getSerializedValue(LocalRegion localRegion,
- KeyInfo keyInfo,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
PartitionedRegion pr = (PartitionedRegion)localRegion;
- return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
+ return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
}
@Override
public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
@@ -129,7 +118,7 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
boolean allowTombstones) throws DataLocationException {
PartitionedRegion pr = (PartitionedRegion)localRegion;
return pr.getDataStore().getEntryLocally(keyInfo.getBucketId(),
- keyInfo.getKey(), false, allowTombstones);
+ keyInfo.getKey(), false, allowTombstones, true);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
index 6ce783a..a3ed32a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
@@ -122,6 +122,8 @@ public class PartitionedRegionHelper
Set policies = new HashSet();
policies.add(DEFAULT_DATA_POLICY);
policies.add(DataPolicy.PERSISTENT_PARTITION);
+ policies.add(DataPolicy.HDFS_PARTITION);
+ policies.add(DataPolicy.HDFS_PERSISTENT_PARTITION);
// policies.add(DataPolicy.NORMAL);
ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index 74c134b..f0a6543 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -626,6 +626,27 @@ final class ProxyRegionMap implements RegionMap {
}
@Override
+ public boolean isMarkedForEviction() {
+ throw new UnsupportedOperationException(LocalizedStrings
+ .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+ .toLocalizedString(DataPolicy.EMPTY));
+ }
+
+ @Override
+ public void setMarkedForEviction() {
+ throw new UnsupportedOperationException(LocalizedStrings
+ .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+ .toLocalizedString(DataPolicy.EMPTY));
+ }
+
+ @Override
+ public void clearMarkedForEviction() {
+ throw new UnsupportedOperationException(LocalizedStrings
+ .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+ .toLocalizedString(DataPolicy.EMPTY));
+ }
+
+ @Override
public boolean isValueNull() {
throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
index bedbf81..5838ead 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
@@ -35,6 +35,7 @@ import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+import com.gemstone.gemfire.cache.EvictionCriteria;
/**
* Internal interface for a region entry.
@@ -414,6 +415,25 @@ public interface RegionEntry {
public void setUpdateInProgress(final boolean underUpdate);
/**
+ * Returns true if this entry has been marked for eviction for custom eviction
+ * via {@link EvictionCriteria}.
+ */
+ public boolean isMarkedForEviction();
+
+ /**
+ * Marks this entry for eviction by custom eviction via
+ * {@link EvictionCriteria}.
+ */
+ public void setMarkedForEviction();
+
+ /**
+ * Clears this entry as for eviction by custom eviction via
+ * {@link EvictionCriteria} or when an update is done after it was marked for
+ * eviction.
+ */
+ public void clearMarkedForEviction();
+
+ /**
* Event containing this RegionEntry is being passed through
* dispatchListenerEvent for CacheListeners under RegionEntry lock. This is
* used during deserialization for a VMCacheSerializable value contained by
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
index 7a97408..2a7f0c4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
@@ -39,6 +39,12 @@ class RegionMapFactory {
//.getDataPolicy().withPartitioning());
if (owner.isProxy() /*|| owner instanceof PartitionedRegion*/) { // TODO enabling this causes eviction tests to fail
return new ProxyRegionMap(owner, attrs, internalRegionArgs);
+ } else if (internalRegionArgs.isReadWriteHDFSRegion()) {
+ if (owner.getEvictionController() == null) {
+ return new HDFSRegionMapImpl(owner, attrs, internalRegionArgs);
+ }
+ return new HDFSLRURegionMap(owner, attrs, internalRegionArgs);
+ //else if (owner.getEvictionController() != null && isNotPartitionedRegion) {
} else if (owner.getEvictionController() != null ) {
return new VMLRURegionMap(owner, attrs,internalRegionArgs);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
index b565a2c..c754339 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
@@ -122,7 +122,7 @@ public final class RemoteGetMessage extends RemoteOperationMessageWithDirectRepl
((KeyWithRegionContext)this.key).setRegionContext(r);
}
KeyInfo keyInfo = r.getKeyInfo(key, cbArg);
- val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false /*for replicate regions*/);
+ val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false, false/*for replicate regions*/);
valueBytes = val instanceof RawValue ? (RawValue)val : new RawValue(val);
if (logger.isTraceEnabled(LogMarker.DM)) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
index 2906ff6..983f928 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
@@ -113,8 +113,7 @@ public class TXEntry implements Region.Entry
{
checkTX();
// Object value = this.localRegion.getDeserialized(this.key, false, this.myTX, this.rememberReads);
- @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false,
- false);
+ @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false, false, false);
if (value == null) {
throw new EntryDestroyedException(this.keyInfo.getKey().toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index 617873c..a67d3cc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -1407,14 +1407,7 @@ public class TXState implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo,
- LocalRegion localRegion,
- boolean updateStats,
- boolean disableCopyOnRead,
- boolean preferCD,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean retainResult) {
+ public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/*create txEntry is absent*/);
if (tx != null) {
Object v = tx.getValue(keyInfo, localRegion, preferCD);
@@ -1423,8 +1416,7 @@ public class TXState implements TXStateInterface {
}
return v;
} else {
- return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
- retainResult);
+ return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
}
}
@@ -1433,19 +1425,15 @@ public class TXState implements TXStateInterface {
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
@Retained
- public Object getSerializedValue(LocalRegion localRegion,
- KeyInfo keyInfo,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
+ boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
final Object key = keyInfo.getKey();
TXEntryState tx = txReadEntry(keyInfo, localRegion, true,true/*create txEntry is absent*/);
if (tx != null) {
Object val = tx.getPendingValue();
if(val==null || Token.isInvalidOrRemoved(val)) {
val = findObject(keyInfo,localRegion, val!=Token.INVALID,
- true, val, false, false, requestingClient, clientEvent, false);
+ true, val, false, false, requestingClient, clientEvent, false, allowReadFromHDFS);
}
return val;
} else {
@@ -1453,7 +1441,7 @@ public class TXState implements TXStateInterface {
// so we should never come here
assert localRegion instanceof PartitionedRegion;
PartitionedRegion pr = (PartitionedRegion)localRegion;
- return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones);
+ return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones, allowReadFromHDFS);
}
}
@@ -1531,17 +1519,9 @@ public class TXState implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
- public Object findObject(KeyInfo key,
- LocalRegion r,
- boolean isCreate,
- boolean generateCallbacks,
- Object value,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) {
- return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+ public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
+ boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+ return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
}
private boolean readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
index 3fa9351..5da20d8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
@@ -123,14 +123,8 @@ public interface TXStateInterface extends Synchronization, InternalDataView {
* @param localRegion
* @param updateStats TODO
*/
- public Object getDeserializedValue(KeyInfo keyInfo,
- LocalRegion localRegion,
- boolean updateStats,
- boolean disableCopyOnRead,
- boolean preferCD,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean retainResult);
+ public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+ boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult);
public TXEvent getEvent();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
index 0939ab0..e66302e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
@@ -341,16 +341,9 @@ public class TXStateProxyImpl implements TXStateProxy {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo,
- LocalRegion localRegion,
- boolean updateStats,
- boolean disableCopyOnRead,
- boolean preferCD,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean retainResult) {
- Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false,
- retainResult);
+ public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+ boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
+ Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false, allowReadFromHDFS, retainResult);
if (val != null) {
// fixes bug 51057: TXStateStub on client always returns null, so do not increment
// the operation count it will be incremented in findObject()
@@ -606,13 +599,13 @@ public class TXStateProxyImpl implements TXStateProxy {
* @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
- boolean generateCallbacks, Object value, boolean disableCopyOnRead,
- boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones) {
+ boolean generateCallbacks, Object value, boolean disableCopyOnRead,
+ boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
try {
this.operationCount++;
Object retVal = getRealDeal(key, r).findObject(key, r, isCreate, generateCallbacks,
- value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false);
+ value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
trackBucketForTx(key);
return retVal;
} catch (TransactionDataRebalancedException | PrimaryBucketException re) {
@@ -727,14 +720,9 @@ public class TXStateProxyImpl implements TXStateProxy {
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
- public Object getSerializedValue(LocalRegion localRegion,
- KeyInfo key,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
this.operationCount++;
- return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
+ return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
index 0b226e0..ac35425 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
@@ -184,14 +184,8 @@ public abstract class TXStateStub implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo,
- LocalRegion localRegion,
- boolean updateStats,
- boolean disableCopyOnRead,
- boolean preferCD,
- EntryEventImpl clientEvent,
- boolean returnTombstones,
- boolean retainResult) {
+ public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+ boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
// We never have a local value if we are a stub...
return null;
}
@@ -379,17 +373,10 @@ public abstract class TXStateStub implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
- public Object findObject(KeyInfo keyInfo,
- LocalRegion r,
- boolean isCreate,
- boolean generateCallbacks,
- Object value,
- boolean disableCopyOnRead,
- boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) {
- return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent);
+ public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
+ boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+ return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent, allowReadFromHDFS);
}
/* (non-Javadoc)
@@ -445,12 +432,7 @@ public abstract class TXStateStub implements TXStateInterface {
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
- public Object getSerializedValue(LocalRegion localRegion,
- KeyInfo key,
- boolean doNotLockEntry,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent,
- boolean returnTombstones) {
+ public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
index 269f891..a17650c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
@@ -114,6 +114,10 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
*/
private boolean hasCloningEnabled = false;
+ private boolean hasHDFSStoreName = false;
+
+ private boolean hasHDFSWriteOnly = false;
+
/**
* Whether this region has entry value compression.
*
@@ -522,7 +526,7 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
{
this.hasDiskSynchronous = val;
}
- private static final int HAS_COUNT = 41;
+ private static final int HAS_COUNT = 43;
public void initHasFields(UserSpecifiedRegionAttributes<K,V> other)
{
@@ -598,4 +602,22 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
public List getIndexes() {
return this.indexes;
}
+
+ public boolean hasHDFSStoreName()
+ {
+ return this.hasHDFSStoreName;
+ }
+ public void setHasHDFSStoreName(boolean val)
+ {
+ this.hasHDFSStoreName = val;
+ }
+
+ public void setHasHDFSWriteOnly(boolean val)
+ {
+ this.hasHDFSWriteOnly = val;
+ }
+ public boolean hasHDFSWriteOnly()
+ {
+ return this.hasHDFSWriteOnly;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
index 54133cc..f587e39 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
@@ -408,6 +408,19 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor
// TODO Auto-generated method stub
}
@Override
+ public boolean isMarkedForEviction() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ @Override
+ public void setMarkedForEviction() {
+ // TODO Auto-generated method stub
+ }
+ @Override
+ public void clearMarkedForEviction() {
+ // TODO Auto-generated method stub
+ }
+ @Override
public boolean isInvalid() {
// TODO Auto-generated method stub
return false;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
index ea47e91..d3078a9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
@@ -299,7 +299,7 @@ public final class FetchBulkEntriesMessage extends PartitionMessage
Object key = it.next();
VersionTagHolder clientEvent = new VersionTagHolder();
Object value = map.get(key, null, true, true, true, null,
- clientEvent, allowTombstones);
+ clientEvent, allowTombstones, false);
if (needToWriteBucketInfo) {
DataSerializer.writePrimitiveInt(map.getId(), mos);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
index 3fef790..d7e50f1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
@@ -93,9 +93,11 @@ public final class GetMessage extends PartitionMessageWithDirectReply
private boolean returnTombstones;
+ private boolean allowReadFromHDFS;
// reuse some flags
protected static final int HAS_LOADER = NOTIFICATION_ONLY;
protected static final int CAN_START_TX = IF_NEW;
+ protected static final int READ_FROM_HDFS = IF_OLD;
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
@@ -104,14 +106,15 @@ public final class GetMessage extends PartitionMessageWithDirectReply
}
private GetMessage(InternalDistributedMember recipient, int regionId,
- DirectReplyProcessor processor,
- final Object key, final Object aCallbackArgument, ClientProxyMembershipID context,
- boolean returnTombstones) {
+ DirectReplyProcessor processor,
+ final Object key, final Object aCallbackArgument, ClientProxyMembershipID context,
+ boolean returnTombstones, boolean allowReadFromHDFS) {
super(recipient, regionId, processor);
this.key = key;
this.cbArg = aCallbackArgument;
this.context = context;
this.returnTombstones = returnTombstones;
+ this.allowReadFromHDFS = allowReadFromHDFS;
}
private static final boolean ORDER_PR_GETS = Boolean.getBoolean("gemfire.order-pr-gets");
@@ -188,7 +191,7 @@ public final class GetMessage extends PartitionMessageWithDirectReply
KeyInfo keyInfo = r.getKeyInfo(key, cbArg);
boolean lockEntry = forceUseOfPRExecutor || isDirectAck();
- val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones);
+ val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones, allowReadFromHDFS);
if(val == BucketRegion.REQUIRES_ENTRY_LOCK) {
Assert.assertTrue(!lockEntry);
@@ -269,12 +272,14 @@ public final class GetMessage extends PartitionMessageWithDirectReply
@Override
protected short computeCompressedShort(short s) {
s = super.computeCompressedShort(s);
+ if (this.allowReadFromHDFS) s |= READ_FROM_HDFS;
return s;
}
@Override
protected void setBooleans(short s, DataInput in) throws ClassNotFoundException, IOException {
super.setBooleans(s, in);
+ if ((s & READ_FROM_HDFS) != 0) this.allowReadFromHDFS = true;
}
public void setKey(Object key)
@@ -298,18 +303,15 @@ public final class GetMessage extends PartitionMessageWithDirectReply
* @throws ForceReattemptException if the peer is no longer available
*/
public static GetResponse send(InternalDistributedMember recipient,
- PartitionedRegion r,
- final Object key,
- final Object aCallbackArgument,
- ClientProxyMembershipID requestingClient,
- boolean returnTombstones)
+ PartitionedRegion r, final Object key, final Object aCallbackArgument,
+ ClientProxyMembershipID requestingClient, boolean returnTombstones, boolean allowReadFromHDFS)
throws ForceReattemptException
{
Assert.assertTrue(recipient != null,
"PRDistribuedGetReplyMessage NULL reply message");
GetResponse p = new GetResponse(r.getSystem(), Collections.singleton(recipient), key);
GetMessage m = new GetMessage(recipient, r.getPRId(), p,
- key, aCallbackArgument, requestingClient, returnTombstones);
+ key, aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS);
Set failures = r.getDistributionManager().putOutgoing(m);
if (failures != null && failures.size() > 0) {
throw new ForceReattemptException(LocalizedStrings.GetMessage_FAILED_SENDING_0.toLocalizedString(m));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
index 8aaf587..a88f96f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
@@ -101,8 +101,9 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
protected static final short SKIP_CALLBACKS = (HAS_BRIDGE_CONTEXT << 1);
+ protected static final short FETCH_FROM_HDFS = (SKIP_CALLBACKS << 1);
//using the left most bit for IS_PUT_DML, the last available bit
- protected static final short IS_PUT_DML = (short) (SKIP_CALLBACKS << 1);
+ protected static final short IS_PUT_DML = (short) (FETCH_FROM_HDFS << 1);
private transient InternalDistributedSystem internalDs;
@@ -117,6 +118,9 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
transient VersionedObjectList versions = null;
+ /** whether this operation should fetch oldValue from HDFS */
+ private boolean fetchFromHDFS;
+
private boolean isPutDML;
/**
* Empty constructor to satisfy {@link DataSerializer}requirements
@@ -125,7 +129,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
}
public PutAllPRMessage(int bucketId, int size, boolean notificationOnly,
- boolean posDup, boolean skipCallbacks, Object callbackArg, boolean isPutDML) {
+ boolean posDup, boolean skipCallbacks, Object callbackArg, boolean fetchFromHDFS, boolean isPutDML) {
this.bucketId = Integer.valueOf(bucketId);
putAllPRData = new PutAllEntryData[size];
this.notificationOnly = notificationOnly;
@@ -133,7 +137,8 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
this.skipCallbacks = skipCallbacks;
this.callbackArg = callbackArg;
initTxMemberId();
- this.isPutDML = isPutDML;
+ this.fetchFromHDFS = fetchFromHDFS;
+ this.isPutDML = isPutDML;
}
public void addEntry(PutAllEntryData entry) {
@@ -302,6 +307,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
s = super.computeCompressedShort(s);
if (this.bridgeContext != null) s |= HAS_BRIDGE_CONTEXT;
if (this.skipCallbacks) s |= SKIP_CALLBACKS;
+ if (this.fetchFromHDFS) s |= FETCH_FROM_HDFS;
if (this.isPutDML) s |= IS_PUT_DML;
return s;
}
@@ -311,6 +317,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
ClassNotFoundException {
super.setBooleans(s, in);
this.skipCallbacks = ((s & SKIP_CALLBACKS) != 0);
+ this.fetchFromHDFS = ((s & FETCH_FROM_HDFS) != 0);
this.isPutDML = ((s & IS_PUT_DML) != 0);
}
@@ -488,6 +495,9 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
ev.setPutAllOperation(dpao);
+ // set the fetchFromHDFS flag
+ ev.setFetchFromHDFS(this.fetchFromHDFS);
+
// make sure a local update inserts a cache de-serializable
ev.makeSerializedNewValue();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
index a6a39dc..d5abaa1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
@@ -182,6 +182,9 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
private VersionTag versionTag;
+ /** whether this operation should fetch oldValue from HDFS*/
+ private transient boolean fetchFromHDFS;
+
private transient boolean isPutDML;
// additional bitmask flags used for serialization/deserialization
@@ -205,6 +208,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
// masks there are taken
// also switching the masks will impact backwards compatibility. Need to
// verify if it is ok to break backwards compatibility
+ protected static final int FETCH_FROM_HDFS = getNextByteMask(HAS_CALLBACKARG);
/*
private byte[] oldValBytes;
@@ -604,6 +608,9 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
this.originalSender = (InternalDistributedMember)DataSerializer
.readObject(in);
}
+ if ((extraFlags & FETCH_FROM_HDFS) != 0) {
+ this.fetchFromHDFS = true;
+ }
this.eventId = new EventID();
InternalDataSerializer.invokeFromData(this.eventId, in);
@@ -690,6 +697,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
extraFlags |= HAS_DELTA_WITH_FULL_VALUE;
}
if (this.originalSender != null) extraFlags |= HAS_ORIGINAL_SENDER;
+ if (this.event.isFetchFromHDFS()) extraFlags |= FETCH_FROM_HDFS;
out.writeByte(extraFlags);
DataSerializer.writeObject(getKey(), out);
@@ -814,6 +822,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
ev.setCausedByMessage(this);
ev.setInvokePRCallbacks(!notificationOnly);
ev.setPossibleDuplicate(this.posDup);
+ ev.setFetchFromHDFS(this.fetchFromHDFS);
ev.setPutDML(this.isPutDML);
/*if (this.hasOldValue) {
if (this.oldValueIsSerialized) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
new file mode 100644
index 0000000..5c199ae
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
+
+/**
+ * Compares objects byte-by-byte. This is fast and sufficient for cases when
+ * lexicographic ordering is not important or the serialization is order-
+ * preserving.
+ *
+ */
+public class ByteComparator implements SerializedComparator {
+ @Override
+ public int compare(byte[] rhs, byte[] lhs) {
+ return compare(rhs, 0, rhs.length, lhs, 0, lhs.length);
+ }
+
+ @Override
+ public int compare(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) {
+ return compareBytes(r, rOff, rLen, l, lOff, lLen);
+ }
+
+ /**
+ * Compares two byte arrays element-by-element.
+ *
+ * @param r the right array
+ * @param rOff the offset of r
+ * @param rLen the length of r to compare
+ * @param l the left array
+ * @param lOff the offset of l
+ * @param lLen the length of l to compare
+ * @return -1 if r < l; 0 if r == l; 1 if r > 1
+ */
+
+ public static int compareBytes(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) {
+ return Bytes.compareTo(r, rOff, rLen, l, lOff, lLen);
+ }
+}