You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2016/05/03 23:45:18 UTC
[04/60] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS
related code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index c75286e..328c196 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -93,22 +92,12 @@ import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException;
import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionContext;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.FunctionService;
import com.gemstone.gemfire.cache.execute.ResultCollector;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
import com.gemstone.gemfire.cache.partition.PartitionListener;
import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
import com.gemstone.gemfire.cache.query.FunctionDomainException;
@@ -224,7 +213,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
import com.gemstone.gemfire.internal.cache.partitioned.PutMessage.PutResult;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
-import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor;
import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveIndexesMessage;
@@ -256,7 +244,6 @@ import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
import com.gemstone.gemfire.internal.util.TransformUtils;
-import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
import com.gemstone.gemfire.i18n.StringId;
@@ -708,17 +695,9 @@ public class PartitionedRegion extends LocalRegion implements
private final PartitionListener[] partitionListeners;
private boolean isShadowPR = false;
- private boolean isShadowPRForHDFS = false;
-
+
private AbstractGatewaySender parallelGatewaySender = null;
- private final ThreadLocal<Boolean> queryHDFS = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return false;
- }
- };
-
public PartitionedRegion(String regionname, RegionAttributes ra,
LocalRegion parentRegion, GemFireCacheImpl cache,
InternalRegionArguments internalRegionArgs) {
@@ -738,12 +717,6 @@ public class PartitionedRegion extends LocalRegion implements
// (which prevents pridmap cleanup).
cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener);
- // add an async queue for the region if the store name is not null.
- if (this.getHDFSStoreName() != null) {
- String eventQueueName = getHDFSEventQueueName();
- super.addAsyncEventQueueId(eventQueueName);
- }
-
// this.userScope = ra.getScope();
this.partitionAttributes = ra.getPartitionAttributes();
this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory();
@@ -822,8 +795,6 @@ public class PartitionedRegion extends LocalRegion implements
if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) {
this.isShadowPR = true;
this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender();
- if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue())
- this.isShadowPRForHDFS = true;
}
@@ -867,38 +838,10 @@ public class PartitionedRegion extends LocalRegion implements
});
}
- @Override
- public final boolean isHDFSRegion() {
- return this.getHDFSStoreName() != null;
- }
-
- @Override
- public final boolean isHDFSReadWriteRegion() {
- return isHDFSRegion() && !getHDFSWriteOnly();
- }
-
- @Override
- protected final boolean isHDFSWriteOnly() {
- return isHDFSRegion() && getHDFSWriteOnly();
- }
-
- public final void setQueryHDFS(boolean includeHDFS) {
- queryHDFS.set(includeHDFS);
- }
-
- @Override
- public final boolean includeHDFSResults() {
- return queryHDFS.get();
- }
-
public final boolean isShadowPR() {
return isShadowPR;
}
- public final boolean isShadowPRForHDFS() {
- return isShadowPRForHDFS;
- }
-
public AbstractGatewaySender getParallelGatewaySender() {
return parallelGatewaySender;
}
@@ -1664,7 +1607,7 @@ public class PartitionedRegion extends LocalRegion implements
try {
final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId());
if (loc) {
- ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true);
+ ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones);
} else {
ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones);
// TODO:Suranjan&Yogesh : there should be better way than this one
@@ -2123,8 +2066,7 @@ public class PartitionedRegion extends LocalRegion implements
bucketStorageAssigned=false;
// if this is a Delta update, then throw exception since the key doesn't
// exist if there is no bucket for it yet
- // For HDFS region, we will recover key, so allow bucket creation
- if (!this.dataPolicy.withHDFS() && event.hasDelta()) {
+ if (event.hasDelta()) {
throw new EntryNotFoundException(LocalizedStrings.
PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY
.toLocalizedString());
@@ -3319,9 +3261,9 @@ public class PartitionedRegion extends LocalRegion implements
*/
@Override
public Object get(Object key, Object aCallbackArgument,
- boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
- ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException
+ boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException
{
validateKey(key);
validateCallbackArg(aCallbackArgument);
@@ -3335,7 +3277,7 @@ public class PartitionedRegion extends LocalRegion implements
// if scope is local and there is no loader, then
// don't go further to try and get value
Object value = getDataView().findObject(getKeyInfo(key, aCallbackArgument), this, true/*isCreate*/, generateCallbacks,
- null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
if (value != null && !Token.isInvalid(value)) {
miss = false;
}
@@ -3381,7 +3323,7 @@ public class PartitionedRegion extends LocalRegion implements
if (primary == null) {
return null;
}
- if (isTX() || this.hdfsStoreName != null) {
+ if (isTX()) {
return getNodeForBucketWrite(bucketId, null);
}
InternalDistributedMember result = getRegionAdvisor().getPreferredNode(bucketId);
@@ -3395,7 +3337,7 @@ public class PartitionedRegion extends LocalRegion implements
*/
private InternalDistributedMember getNodeForBucketReadOrLoad(int bucketId) {
InternalDistributedMember targetNode;
- if (!this.haveCacheLoader && (this.hdfsStoreName == null)) {
+ if (!this.haveCacheLoader) {
targetNode = getNodeForBucketRead(bucketId);
}
else {
@@ -3528,9 +3470,16 @@ public class PartitionedRegion extends LocalRegion implements
}
@Override
- protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
- TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ protected Object findObjectInSystem(KeyInfo keyInfo,
+ boolean isCreate,
+ TXStateInterface tx,
+ boolean generateCallbacks,
+ Object localValue,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones)
throws CacheLoaderException, TimeoutException
{
Object obj = null;
@@ -3566,7 +3515,7 @@ public class PartitionedRegion extends LocalRegion implements
return null;
}
- obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry, allowReadFromHDFS);
+ obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry);
}
finally {
this.prStats.endGet(startTime);
@@ -4149,15 +4098,22 @@ public class PartitionedRegion extends LocalRegion implements
/**
* no docs
- * @param preferCD
+ * @param preferCD
* @param requestingClient the client requesting the object, or null if not from a client
* @param clientEvent TODO
* @param returnTombstones TODO
* @param allowRetry if false then do not retry
*/
private Object getFromBucket(final InternalDistributedMember targetNode,
- int bucketId, final Object key, final Object aCallbackArgument,
- boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowRetry, boolean allowReadFromHDFS) {
+ int bucketId,
+ final Object key,
+ final Object aCallbackArgument,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean allowRetry) {
final boolean isDebugEnabled = logger.isDebugEnabled();
final int retryAttempts = calcRetry();
@@ -4187,7 +4143,7 @@ public class PartitionedRegion extends LocalRegion implements
try {
if (isLocal) {
obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS);
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false);
}
else {
if (localCacheEnabled && null != (obj = localCacheGet(key))) { // OFFHEAP: copy into heap cd; TODO optimize for preferCD case
@@ -4196,14 +4152,14 @@ public class PartitionedRegion extends LocalRegion implements
}
return obj;
}
- else if (this.haveCacheLoader || this.hdfsStoreName != null) {
+ else if (this.haveCacheLoader) {
// If the region has a cache loader,
// the target node is the primary server of the bucket. But, if the
// value can be found in a local bucket, we should first try there.
/* MergeGemXDHDFSToGFE -readoing from local bucket was disabled in GemXD*/
if (null != ( obj = getFromLocalBucket(bucketId, key, aCallbackArgument,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS))) {
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones))) {
return obj;
}
}
@@ -4211,7 +4167,7 @@ public class PartitionedRegion extends LocalRegion implements
// Test hook
if (((LocalRegion)this).isTest())
((LocalRegion)this).incCountNotFoundInLocal();
- obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones);
// TODO:Suranjan&Yogesh : there should be better way than this one
String name = Thread.currentThread().getName();
@@ -4309,9 +4265,9 @@ public class PartitionedRegion extends LocalRegion implements
*
*/
public Object getFromLocalBucket(int bucketId, final Object key,
- final Object aCallbackArgument, boolean disableCopyOnRead,
- boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
+ final Object aCallbackArgument, boolean disableCopyOnRead,
+ boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones)
throws ForceReattemptException, PRLocallyDestroyedException {
Object obj;
// try reading locally.
@@ -4320,7 +4276,7 @@ public class PartitionedRegion extends LocalRegion implements
return null; // fixes 51657
}
if (readNode.equals(getMyId()) && null != ( obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
- disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true, allowReadFromHDFS))) {
+ disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true))) {
if (logger.isTraceEnabled()) {
logger.trace("getFromBucket: Getting key {} ({}) locally - success", key, key.hashCode());
}
@@ -5116,7 +5072,13 @@ public class PartitionedRegion extends LocalRegion implements
* if the peer is no longer available
*/
public Object getRemotely(InternalDistributedMember targetNode,
- int bucketId, final Object key, final Object aCallbackArgument, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
+ int bucketId,
+ final Object key,
+ final Object aCallbackArgument,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws PrimaryBucketException,
ForceReattemptException {
Object value;
if (logger.isDebugEnabled()) {
@@ -5124,7 +5086,7 @@ public class PartitionedRegion extends LocalRegion implements
getPRId(), BUCKET_ID_SEPARATOR, bucketId, key);
}
GetResponse response = GetMessage.send(targetNode, this, key,
- aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS);
+ aCallbackArgument, requestingClient, returnTombstones);
this.prStats.incPartitionMessagesSent();
value = response.waitForResponse(preferCD);
if (clientEvent != null) {
@@ -7078,9 +7040,6 @@ public class PartitionedRegion extends LocalRegion implements
public int entryCount(Set<Integer> buckets,
boolean estimate) {
Map<Integer, SizeEntry> bucketSizes = null;
- if (isHDFSReadWriteRegion() && (includeHDFSResults() || estimate)) {
- bucketSizes = getSizeForHDFS( buckets, estimate);
- } else {
if (buckets != null) {
if (this.dataStore != null) {
List<Integer> list = new ArrayList<Integer>();
@@ -7112,7 +7071,6 @@ public class PartitionedRegion extends LocalRegion implements
}
}
}
- }
int size = 0;
if (bucketSizes != null) {
@@ -7135,81 +7093,7 @@ public class PartitionedRegion extends LocalRegion implements
return 0;
}
}
- private Map<Integer, SizeEntry> getSizeForHDFS(final Set<Integer> buckets, boolean estimate) {
- // figure out which buckets to include
- Map<Integer, SizeEntry> bucketSizes = new HashMap<Integer, SizeEntry>();
- getRegionAdvisor().accept(new BucketVisitor<Map<Integer, SizeEntry>>() {
- @Override
- public boolean visit(RegionAdvisor advisor, ProxyBucketRegion pbr,
- Map<Integer, SizeEntry> map) {
- if (buckets == null || buckets.contains(pbr.getBucketId())) {
- map.put(pbr.getBucketId(), null);
- // ensure that the bucket has been created
- pbr.getPartitionedRegion().getOrCreateNodeForBucketWrite(pbr.getBucketId(), null);
- }
- return true;
- }
- }, bucketSizes);
- RetryTimeKeeper retry = new RetryTimeKeeper(retryTimeout);
-
- while (true) {
- // get the size from local buckets
- if (dataStore != null) {
- Map<Integer, SizeEntry> localSizes;
- if (estimate) {
- localSizes = dataStore.getSizeEstimateForLocalPrimaryBuckets();
- } else {
- localSizes = dataStore.getSizeForLocalPrimaryBuckets();
- }
- for (Map.Entry<Integer, SizeEntry> me : localSizes.entrySet()) {
- if (bucketSizes.containsKey(me.getKey())) {
- bucketSizes.put(me.getKey(), me.getValue());
- }
- }
- }
- // all done
- int count = 0;
- Iterator it = bucketSizes.values().iterator();
- while (it.hasNext()) {
- if (it.next() != null) count++;
- }
- if (bucketSizes.size() == count) {
- return bucketSizes;
- }
-
- Set<InternalDistributedMember> remotes = getRegionAdvisor().adviseDataStore(true);
- remotes.remove(getMyId());
-
- // collect remote sizes
- if (!remotes.isEmpty()) {
- Map<Integer, SizeEntry> remoteSizes = new HashMap<Integer, PartitionedRegion.SizeEntry>();
- try {
- remoteSizes = getSizeRemotely(remotes, estimate);
- } catch (ReplyException e) {
- // Remote member will never throw ForceReattemptException or
- // PrimaryBucketException, so any exception on the remote member
- // should be re-thrown
- e.handleAsUnexpected();
- }
- for (Map.Entry<Integer, SizeEntry> me : remoteSizes.entrySet()) {
- Integer k = me.getKey();
- if (bucketSizes.containsKey(k) && me.getValue().isPrimary()) {
- bucketSizes.put(k, me.getValue());
- }
- }
- }
-
- if (retry.overMaximum()) {
- checkReadiness();
- PRHARedundancyProvider.timedOut(this, null, null, "calculate size", retry.getRetryTime());
- }
-
- // throttle subsequent attempts
- retry.waitForBucketsRecovery();
- }
- }
-
/**
* This method gets a PartitionServerSocketConnection to targetNode and sends
* size request to the node. It returns size of all the buckets "primarily"
@@ -7607,9 +7491,7 @@ public class PartitionedRegion extends LocalRegion implements
.append("; isClosed=").append(this.isClosed)
.append("; retryTimeout=").append(this.retryTimeout)
.append("; serialNumber=").append(getSerialNumber())
- .append("; hdfsStoreName=").append(getHDFSStoreName())
- .append("; hdfsWriteOnly=").append(getHDFSWriteOnly())
-
+
.append("; partition attributes=").append(getPartitionAttributes().toString())
.append("; on VM ").append(getMyId())
.append("]")
@@ -7752,18 +7634,6 @@ public class PartitionedRegion extends LocalRegion implements
@Override
public void destroyRegion(Object aCallbackArgument)
throws CacheWriterException, TimeoutException {
- //For HDFS regions, we need a data store
- //to do the global destroy so that it can delete
- //the data from HDFS as well.
- if(!isDataStore() && this.dataPolicy.withHDFS()) {
- if(destroyOnDataStore(aCallbackArgument)) {
- //If we were able to find a data store to do the destroy,
- //stop here.
- //otherwise go ahead and destroy the region from this member
- return;
- }
- }
-
checkForColocatedChildren();
getDataView().checkSupportsRegionDestroy();
checkForLimitedOrNoAccess();
@@ -7811,7 +7681,6 @@ public class PartitionedRegion extends LocalRegion implements
boolean keepWaiting = true;
- AsyncEventQueueImpl hdfsQueue = getHDFSEventQueue();
while(true) {
List<String> pausedSenders = new ArrayList<String>();
List<ConcurrentParallelGatewaySenderQueue> parallelQueues = new ArrayList<ConcurrentParallelGatewaySenderQueue>();
@@ -7929,11 +7798,6 @@ public class PartitionedRegion extends LocalRegion implements
}
}
}
-
- if(hdfsQueue != null) {
- hdfsQueue.destroy();
- cache.removeAsyncEventQueue(hdfsQueue);
- }
}
@Override
@@ -8114,9 +7978,6 @@ public class PartitionedRegion extends LocalRegion implements
final boolean isClose = event.getOperation().isClose();
destroyPartitionedRegionLocally(!isClose);
destroyCleanUp(event, serials);
- if(!isClose) {
- destroyHDFSData();
- }
return true;
}
@@ -8409,8 +8270,6 @@ public class PartitionedRegion extends LocalRegion implements
}
}
- HDFSRegionDirector.getInstance().clear(getFullPath());
-
RegionLogger.logDestroy(getName(), cache.getMyId(), null, op.isClose());
}
@@ -11055,11 +10914,6 @@ public class PartitionedRegion extends LocalRegion implements
}
}
- //hoplogs - pause HDFS dispatcher while we
- //clear the buckets to avoid missing some files
- //during the clear
- pauseHDFSDispatcher();
-
try {
// now clear the bucket regions; we go through the primary bucket
// regions so there is distribution for every bucket but that
@@ -11075,7 +10929,6 @@ public class PartitionedRegion extends LocalRegion implements
}
}
} finally {
- resumeHDFSDispatcher();
// release the bucket locks
for (BucketRegion br : lockedRegions) {
try {
@@ -11091,247 +10944,6 @@ public class PartitionedRegion extends LocalRegion implements
}
}
-
- /**Destroy all data in HDFS, if this region is using HDFS persistence.*/
- private void destroyHDFSData() {
- if(getHDFSStoreName() == null) {
- return;
- }
-
- try {
- hdfsManager.destroyData();
- } catch (IOException e) {
- logger.warn(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA, e);
- }
- }
-
- private void pauseHDFSDispatcher() {
- if(!isHDFSRegion()) {
- return;
- }
- AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
- if (eventProcessor == null) return;
- eventProcessor.pauseDispatching();
- eventProcessor.waitForDispatcherToPause();
- }
-
- /**
- * Get the statistics for the HDFS event queue associated with this region,
- * if any
- */
- public AsyncEventQueueStats getHDFSEventQueueStats() {
- AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
- if(asyncQ == null) {
- return null;
- }
- return asyncQ.getStatistics();
- }
-
- protected AbstractGatewaySenderEventProcessor getHDFSEventProcessor() {
- final AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
- final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
- AbstractGatewaySenderEventProcessor eventProcessor = gatewaySender.getEventProcessor();
- return eventProcessor;
- }
-
- public AsyncEventQueueImpl getHDFSEventQueue() {
- String asyncQId = getHDFSEventQueueName();
- if(asyncQId == null) {
- return null;
- }
- final AsyncEventQueueImpl asyncQ = (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
- return asyncQ;
- }
-
- private void resumeHDFSDispatcher() {
- if(!isHDFSRegion()) {
- return;
- }
- AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
- if (eventProcessor == null) return;
- eventProcessor.resumeDispatching();
- }
-
- protected String getHDFSEventQueueName() {
- if (!this.getDataPolicy().withHDFS()) return null;
- String colocatedWith = this.getPartitionAttributes().getColocatedWith();
- String eventQueueName;
- if (colocatedWith != null) {
- PartitionedRegion leader = ColocationHelper.getLeaderRegionName(this);
- eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(leader
- .getFullPath());
- }
- else {
- eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(getFullPath());
- }
- return eventQueueName;
- }
-
- /**
- * schedules compaction on all members where this region is hosted.
- *
- * @param isMajor
- * true for major compaction
- * @param maxWaitTime
- * time to wait for the operation to complete, 0 will wait forever
- */
- @Override
- public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) {
- if (!this.isHDFSReadWriteRegion()) {
- if (this.isHDFSRegion()) {
- throw new UnsupportedOperationException(
- LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
- .toLocalizedString(getName()));
- }
- throw new UnsupportedOperationException(
- LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
- .toLocalizedString(getName()));
- }
- // send request to remote data stores
- long start = System.currentTimeMillis();
- int waitTime = maxWaitTime * 1000;
- HDFSForceCompactionArgs args = new HDFSForceCompactionArgs(getRegionAdvisor().getBucketSet(), isMajor, waitTime);
- HDFSForceCompactionResultCollector rc = new HDFSForceCompactionResultCollector();
- AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
- execution.setWaitOnExceptionFlag(true); // wait for all exceptions
- if (logger.isDebugEnabled()) {
- logger.debug("HDFS: ForceCompat invoking function with arguments "+args);
- }
- execution.execute(HDFSForceCompactionFunction.ID);
- List<CompactionStatus> result = rc.getResult();
- Set<Integer> successfulBuckets = rc.getSuccessfulBucketIds();
- if (rc.shouldRetry()) {
- int retries = 0;
- while (retries < HDFSForceCompactionFunction.FORCE_COMPACTION_MAX_RETRIES) {
- waitTime -= System.currentTimeMillis() - start;
- if (maxWaitTime > 0 && waitTime < 0) {
- break;
- }
- start = System.currentTimeMillis();
- retries++;
- Set<Integer> retryBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
- retryBuckets.removeAll(successfulBuckets);
-
- for (int bucketId : retryBuckets) {
- getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper(waitTime));
- long now = System.currentTimeMillis();
- waitTime -= now - start;
- start = now;
- }
-
- args = new HDFSForceCompactionArgs(retryBuckets, isMajor, waitTime);
- rc = new HDFSForceCompactionResultCollector();
- execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
- execution.setWaitOnExceptionFlag(true); // wait for all exceptions
- if (logger.isDebugEnabled()) {
- logger.debug("HDFS: ForceCompat re-invoking function with arguments "+args+" filter:"+retryBuckets);
- }
- execution.execute(HDFSForceCompactionFunction.ID);
- result = rc.getResult();
- successfulBuckets.addAll(rc.getSuccessfulBucketIds());
- }
- }
- if (successfulBuckets.size() != getRegionAdvisor().getBucketSet().size()) {
- checkReadiness();
- Set<Integer> uncessfulBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
- uncessfulBuckets.removeAll(successfulBuckets);
- throw new FunctionException("Could not run compaction on following buckets:"+uncessfulBuckets);
- }
- }
-
- /**
- * Schedules compaction on local buckets
- * @param buckets the set of buckets to compact
- * @param isMajor true for major compaction
- * @param time TODO use this
- * @return a list of futures for the scheduled compaction tasks
- */
- public List<Future<CompactionStatus>> forceLocalHDFSCompaction(Set<Integer> buckets, boolean isMajor, long time) {
- List<Future<CompactionStatus>> futures = new ArrayList<Future<CompactionStatus>>();
- if (!isDataStore() || hdfsManager == null || buckets == null || buckets.isEmpty()) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- "HDFS: did not schedule local " + (isMajor ? "Major" : "Minor") + " compaction");
- }
- // nothing to do
- return futures;
- }
- if (logger.isDebugEnabled()) {
- logger.debug(
- "HDFS: scheduling local " + (isMajor ? "Major" : "Minor") + " compaction for buckets:"+buckets);
- }
- Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers(buckets);
-
- for (HoplogOrganizer hoplogOrganizer : organizers) {
- Future<CompactionStatus> f = hoplogOrganizer.forceCompaction(isMajor);
- futures.add(f);
- }
- return futures;
- }
-
- @Override
- public void flushHDFSQueue(int maxWaitTime) {
- if (!this.isHDFSRegion()) {
- throw new UnsupportedOperationException(
- LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
- .toLocalizedString(getName()));
- }
- HDFSFlushQueueFunction.flushQueue(this, maxWaitTime);
- }
-
- @Override
- public long lastMajorHDFSCompaction() {
- if (!this.isHDFSReadWriteRegion()) {
- if (this.isHDFSRegion()) {
- throw new UnsupportedOperationException(
- LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
- .toLocalizedString(getName()));
- }
- throw new UnsupportedOperationException(
- LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
- .toLocalizedString(getName()));
- }
- List<Long> result = (List<Long>) FunctionService.onRegion(this)
- .execute(HDFSLastCompactionTimeFunction.ID)
- .getResult();
- if (logger.isDebugEnabled()) {
- logger.debug("HDFS: Result of LastCompactionTimeFunction "+result);
- }
- long min = Long.MAX_VALUE;
- for (long ts : result) {
- if (ts !=0 && ts < min) {
- min = ts;
- }
- }
- min = min == Long.MAX_VALUE ? 0 : min;
- return min;
- }
-
- public long lastLocalMajorHDFSCompaction() {
- if (!isDataStore() || hdfsManager == null) {
- // nothing to do
- return 0;
- }
- if (logger.isDebugEnabled()) {
- logger.debug(
- "HDFS: getting local Major compaction time");
- }
- Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers();
- long minTS = Long.MAX_VALUE;
- for (HoplogOrganizer hoplogOrganizer : organizers) {
- long ts = hoplogOrganizer.getLastMajorCompactionTimestamp();
- if (ts !=0 && ts < minTS) {
- minTS = ts;
- }
- }
- minTS = minTS == Long.MAX_VALUE ? 0 : minTS;
- if (logger.isDebugEnabled()) {
- logger.debug(
- "HDFS: local Major compaction time: "+minTS);
- }
- return minTS;
- }
-
public void shadowPRWaitForBucketRecovery() {
assert this.isShadowPR();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
index 57b1e71..bda68e3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
@@ -64,7 +64,6 @@ import com.gemstone.gemfire.cache.execute.Function;
import com.gemstone.gemfire.cache.execute.FunctionException;
import com.gemstone.gemfire.cache.execute.ResultSender;
import com.gemstone.gemfire.cache.query.QueryInvalidException;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
import com.gemstone.gemfire.cache.query.internal.QCompiler;
import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
@@ -2059,13 +2058,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
ForceReattemptException, PRLocallyDestroyedException
{
return getLocally(bucketId, key,aCallbackArgument, disableCopyOnRead, preferCD, requestingClient,
- clientEvent, returnTombstones, false, false);
+ clientEvent, returnTombstones, false);
}
/**
* Returns value corresponding to this key.
* @param key
* the key to look for
- * @param preferCD
+ * @param preferCD
* @param requestingClient the client making the request, or null
* @param clientEvent client's event (for returning version tag)
* @param returnTombstones whether tombstones should be returned
@@ -2076,21 +2075,28 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* @throws PrimaryBucketException if the locally managed bucket is not primary
* @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed
*/
- public Object getLocally(int bucketId, final Object key,
- final Object aCallbackArgument, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean opScopeIsLocal, boolean allowReadFromHDFS) throws PrimaryBucketException,
+ public Object getLocally(int bucketId,
+ final Object key,
+ final Object aCallbackArgument,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean opScopeIsLocal) throws PrimaryBucketException,
ForceReattemptException, PRLocallyDestroyedException
{
final BucketRegion bucketRegion = getInitializedBucketForId(key, Integer.valueOf(bucketId));
// check for primary (when a loader is present) done deeper in the BucketRegion
Object ret=null;
if (logger.isDebugEnabled()) {
- logger.debug("getLocally: key {}) bucketId={}{}{} region {} returnTombstones {} allowReadFromHDFS {}", key,
- this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones, allowReadFromHDFS);
+ logger.debug("getLocally: key {}) bucketId={}{}{} region {} returnTombstones {} ", key,
+ this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones);
}
invokeBucketReadHook();
try {
- ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, allowReadFromHDFS, false);
+ ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal,
+ false);
checkIfBucketMoved(bucketRegion);
}
catch (RegionDestroyedException rde) {
@@ -2122,7 +2128,11 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* @throws PrimaryBucketException if the locally managed bucket is not primary
* @see #getLocally(int, Object, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
*/
- public RawValue getSerializedLocally(KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
+ public RawValue getSerializedLocally(KeyInfo keyInfo,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws PrimaryBucketException,
ForceReattemptException {
final BucketRegion bucketRegion = getInitializedBucketForId(keyInfo.getKey(), keyInfo.getBucketId());
// check for primary (when loader is present) done deeper in the BucketRegion
@@ -2133,7 +2143,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
invokeBucketReadHook();
try {
- RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
checkIfBucketMoved(bucketRegion);
return result;
} catch (RegionDestroyedException rde) {
@@ -2157,7 +2167,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* @param access
* true if caller wants last accessed time updated
* @param allowTombstones whether a tombstoned entry can be returned
- *
+ *
* @throws ForceReattemptException
* if bucket region is not present in this process
* @return a RegionEntry for the given key, which will be null if the key is
@@ -2168,7 +2178,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
* if the PartitionRegion is locally destroyed
*/
public EntrySnapshot getEntryLocally(int bucketId, final Object key,
- boolean access, boolean allowTombstones, boolean allowReadFromHDFS)
+ boolean access, boolean allowTombstones)
throws EntryNotFoundException, PrimaryBucketException,
ForceReattemptException, PRLocallyDestroyedException
{
@@ -2181,12 +2191,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
EntrySnapshot res = null;
RegionEntry ent = null;
try {
- if (allowReadFromHDFS) {
- ent = bucketRegion.entries.getEntry(key);
- }
- else {
- ent = bucketRegion.entries.getOperationalEntryInVM(key);
- }
+ ent = bucketRegion.entries.getEntry(key);
if (ent == null) {
this.getPartitionedRegion().checkReadiness();
@@ -2296,14 +2301,8 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
try{
if (r != null) {
Set keys = r.keySet(allowTombstones);
- if (getPartitionedRegion().isHDFSReadWriteRegion()) {
- // hdfs regions can't copy all keys into memory
- ret = keys;
-
- } else {
// A copy is made so that the bucket is free to move
ret = new HashSet(r.keySet(allowTombstones));
- }
checkIfBucketMoved(r);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
index f083268..de1f7d8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
@@ -65,12 +65,19 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
}
@Override
- public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
- boolean generateCallbacks, Object value, boolean disableCopyOnRead,
- boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+ public Object findObject(KeyInfo key,
+ LocalRegion r,
+ boolean isCreate,
+ boolean generateCallbacks,
+ Object value,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) {
TXStateProxy tx = r.cache.getTXMgr().internalSuspend();
try {
- return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
} finally {
r.cache.getTXMgr().resume(tx);
}
@@ -82,10 +89,14 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
return pr.nonTXContainsKey(keyInfo);
}
@Override
- public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion,
+ KeyInfo keyInfo,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws DataLocationException {
PartitionedRegion pr = (PartitionedRegion)localRegion;
- return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
}
@Override
public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
@@ -118,7 +129,7 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
boolean allowTombstones) throws DataLocationException {
PartitionedRegion pr = (PartitionedRegion)localRegion;
return pr.getDataStore().getEntryLocally(keyInfo.getBucketId(),
- keyInfo.getKey(), false, allowTombstones, true);
+ keyInfo.getKey(), false, allowTombstones);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index f0a6543..74c134b 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -626,27 +626,6 @@ final class ProxyRegionMap implements RegionMap {
}
@Override
- public boolean isMarkedForEviction() {
- throw new UnsupportedOperationException(LocalizedStrings
- .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
- .toLocalizedString(DataPolicy.EMPTY));
- }
-
- @Override
- public void setMarkedForEviction() {
- throw new UnsupportedOperationException(LocalizedStrings
- .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
- .toLocalizedString(DataPolicy.EMPTY));
- }
-
- @Override
- public void clearMarkedForEviction() {
- throw new UnsupportedOperationException(LocalizedStrings
- .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
- .toLocalizedString(DataPolicy.EMPTY));
- }
-
- @Override
public boolean isValueNull() {
throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
index 5838ead..bedbf81 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
@@ -35,7 +35,6 @@ import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Released;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
-import com.gemstone.gemfire.cache.EvictionCriteria;
/**
* Internal interface for a region entry.
@@ -415,25 +414,6 @@ public interface RegionEntry {
public void setUpdateInProgress(final boolean underUpdate);
/**
- * Returns true if this entry has been marked for eviction for custom eviction
- * via {@link EvictionCriteria}.
- */
- public boolean isMarkedForEviction();
-
- /**
- * Marks this entry for eviction by custom eviction via
- * {@link EvictionCriteria}.
- */
- public void setMarkedForEviction();
-
- /**
- * Clears this entry as for eviction by custom eviction via
- * {@link EvictionCriteria} or when an update is done after it was marked for
- * eviction.
- */
- public void clearMarkedForEviction();
-
- /**
* Event containing this RegionEntry is being passed through
* dispatchListenerEvent for CacheListeners under RegionEntry lock. This is
* used during deserialization for a VMCacheSerializable value contained by
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
index 2a7f0c4..7a97408 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
@@ -39,12 +39,6 @@ class RegionMapFactory {
//.getDataPolicy().withPartitioning());
if (owner.isProxy() /*|| owner instanceof PartitionedRegion*/) { // TODO enabling this causes eviction tests to fail
return new ProxyRegionMap(owner, attrs, internalRegionArgs);
- } else if (internalRegionArgs.isReadWriteHDFSRegion()) {
- if (owner.getEvictionController() == null) {
- return new HDFSRegionMapImpl(owner, attrs, internalRegionArgs);
- }
- return new HDFSLRURegionMap(owner, attrs, internalRegionArgs);
- //else if (owner.getEvictionController() != null && isNotPartitionedRegion) {
} else if (owner.getEvictionController() != null ) {
return new VMLRURegionMap(owner, attrs,internalRegionArgs);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
index c754339..b565a2c 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
@@ -122,7 +122,7 @@ public final class RemoteGetMessage extends RemoteOperationMessageWithDirectRepl
((KeyWithRegionContext)this.key).setRegionContext(r);
}
KeyInfo keyInfo = r.getKeyInfo(key, cbArg);
- val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false, false/*for replicate regions*/);
+ val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false /*for replicate regions*/);
valueBytes = val instanceof RawValue ? (RawValue)val : new RawValue(val);
if (logger.isTraceEnabled(LogMarker.DM)) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
index 983f928..2906ff6 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
@@ -113,7 +113,8 @@ public class TXEntry implements Region.Entry
{
checkTX();
// Object value = this.localRegion.getDeserialized(this.key, false, this.myTX, this.rememberReads);
- @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false, false, false);
+ @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false,
+ false);
if (value == null) {
throw new EntryDestroyedException(this.keyInfo.getKey().toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index a67d3cc..617873c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -1407,7 +1407,14 @@ public class TXState implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
+ public Object getDeserializedValue(KeyInfo keyInfo,
+ LocalRegion localRegion,
+ boolean updateStats,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean retainResult) {
TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/*create txEntry is absent*/);
if (tx != null) {
Object v = tx.getValue(keyInfo, localRegion, preferCD);
@@ -1416,7 +1423,8 @@ public class TXState implements TXStateInterface {
}
return v;
} else {
- return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
+ return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
+ retainResult);
}
}
@@ -1425,15 +1433,19 @@ public class TXState implements TXStateInterface {
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
@Retained
- public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
- boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion,
+ KeyInfo keyInfo,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws DataLocationException {
final Object key = keyInfo.getKey();
TXEntryState tx = txReadEntry(keyInfo, localRegion, true,true/*create txEntry is absent*/);
if (tx != null) {
Object val = tx.getPendingValue();
if(val==null || Token.isInvalidOrRemoved(val)) {
val = findObject(keyInfo,localRegion, val!=Token.INVALID,
- true, val, false, false, requestingClient, clientEvent, false, allowReadFromHDFS);
+ true, val, false, false, requestingClient, clientEvent, false);
}
return val;
} else {
@@ -1441,7 +1453,7 @@ public class TXState implements TXStateInterface {
// so we should never come here
assert localRegion instanceof PartitionedRegion;
PartitionedRegion pr = (PartitionedRegion)localRegion;
- return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones, allowReadFromHDFS);
+ return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones);
}
}
@@ -1519,9 +1531,17 @@ public class TXState implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
- public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
- boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
- return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ public Object findObject(KeyInfo key,
+ LocalRegion r,
+ boolean isCreate,
+ boolean generateCallbacks,
+ Object value,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) {
+ return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
}
private boolean readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
index 5da20d8..3fa9351 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
@@ -123,8 +123,14 @@ public interface TXStateInterface extends Synchronization, InternalDataView {
* @param localRegion
* @param updateStats TODO
*/
- public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
- boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult);
+ public Object getDeserializedValue(KeyInfo keyInfo,
+ LocalRegion localRegion,
+ boolean updateStats,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean retainResult);
public TXEvent getEvent();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
index e66302e..0939ab0 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
@@ -341,9 +341,16 @@ public class TXStateProxyImpl implements TXStateProxy {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
- boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
- Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false, allowReadFromHDFS, retainResult);
+ public Object getDeserializedValue(KeyInfo keyInfo,
+ LocalRegion localRegion,
+ boolean updateStats,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean retainResult) {
+ Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false,
+ retainResult);
if (val != null) {
// fixes bug 51057: TXStateStub on client always returns null, so do not increment
// the operation count it will be incremented in findObject()
@@ -599,13 +606,13 @@ public class TXStateProxyImpl implements TXStateProxy {
* @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
- boolean generateCallbacks, Object value, boolean disableCopyOnRead,
- boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+ boolean generateCallbacks, Object value, boolean disableCopyOnRead,
+ boolean preferCD, ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent, boolean returnTombstones) {
try {
this.operationCount++;
Object retVal = getRealDeal(key, r).findObject(key, r, isCreate, generateCallbacks,
- value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
+ value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false);
trackBucketForTx(key);
return retVal;
} catch (TransactionDataRebalancedException | PrimaryBucketException re) {
@@ -720,9 +727,14 @@ public class TXStateProxyImpl implements TXStateProxy {
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
- public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
+ public Object getSerializedValue(LocalRegion localRegion,
+ KeyInfo key,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) throws DataLocationException {
this.operationCount++;
- return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
+ return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
}
/* (non-Javadoc)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
index ac35425..0b226e0 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
@@ -184,8 +184,14 @@ public abstract class TXStateStub implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
*/
- public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
- boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
+ public Object getDeserializedValue(KeyInfo keyInfo,
+ LocalRegion localRegion,
+ boolean updateStats,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones,
+ boolean retainResult) {
// We never have a local value if we are a stub...
return null;
}
@@ -373,10 +379,17 @@ public abstract class TXStateStub implements TXStateInterface {
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
*/
- public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
- boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
- EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
- return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent, allowReadFromHDFS);
+ public Object findObject(KeyInfo keyInfo,
+ LocalRegion r,
+ boolean isCreate,
+ boolean generateCallbacks,
+ Object value,
+ boolean disableCopyOnRead,
+ boolean preferCD,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) {
+ return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent);
}
/* (non-Javadoc)
@@ -432,7 +445,12 @@ public abstract class TXStateStub implements TXStateInterface {
* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
*/
- public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+ public Object getSerializedValue(LocalRegion localRegion,
+ KeyInfo key,
+ boolean doNotLockEntry,
+ ClientProxyMembershipID requestingClient,
+ EntryEventImpl clientEvent,
+ boolean returnTombstones) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
index a17650c..269f891 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
@@ -114,10 +114,6 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
*/
private boolean hasCloningEnabled = false;
- private boolean hasHDFSStoreName = false;
-
- private boolean hasHDFSWriteOnly = false;
-
/**
* Whether this region has entry value compression.
*
@@ -526,7 +522,7 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
{
this.hasDiskSynchronous = val;
}
- private static final int HAS_COUNT = 43;
+ private static final int HAS_COUNT = 41;
public void initHasFields(UserSpecifiedRegionAttributes<K,V> other)
{
@@ -602,22 +598,4 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
public List getIndexes() {
return this.indexes;
}
-
- public boolean hasHDFSStoreName()
- {
- return this.hasHDFSStoreName;
- }
- public void setHasHDFSStoreName(boolean val)
- {
- this.hasHDFSStoreName = val;
- }
-
- public void setHasHDFSWriteOnly(boolean val)
- {
- this.hasHDFSWriteOnly = val;
- }
- public boolean hasHDFSWriteOnly()
- {
- return this.hasHDFSWriteOnly;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
index f587e39..54133cc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
@@ -408,19 +408,6 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor
// TODO Auto-generated method stub
}
@Override
- public boolean isMarkedForEviction() {
- // TODO Auto-generated method stub
- return false;
- }
- @Override
- public void setMarkedForEviction() {
- // TODO Auto-generated method stub
- }
- @Override
- public void clearMarkedForEviction() {
- // TODO Auto-generated method stub
- }
- @Override
public boolean isInvalid() {
// TODO Auto-generated method stub
return false;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
index d3078a9..ea47e91 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
@@ -299,7 +299,7 @@ public final class FetchBulkEntriesMessage extends PartitionMessage
Object key = it.next();
VersionTagHolder clientEvent = new VersionTagHolder();
Object value = map.get(key, null, true, true, true, null,
- clientEvent, allowTombstones, false);
+ clientEvent, allowTombstones);
if (needToWriteBucketInfo) {
DataSerializer.writePrimitiveInt(map.getId(), mos);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
index d7e50f1..3fef790 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
@@ -93,11 +93,9 @@ public final class GetMessage extends PartitionMessageWithDirectReply
private boolean returnTombstones;
- private boolean allowReadFromHDFS;
// reuse some flags
protected static final int HAS_LOADER = NOTIFICATION_ONLY;
protected static final int CAN_START_TX = IF_NEW;
- protected static final int READ_FROM_HDFS = IF_OLD;
/**
* Empty constructor to satisfy {@link DataSerializer} requirements
@@ -106,15 +104,14 @@ public final class GetMessage extends PartitionMessageWithDirectReply
}
private GetMessage(InternalDistributedMember recipient, int regionId,
- DirectReplyProcessor processor,
- final Object key, final Object aCallbackArgument, ClientProxyMembershipID context,
- boolean returnTombstones, boolean allowReadFromHDFS) {
+ DirectReplyProcessor processor,
+ final Object key, final Object aCallbackArgument, ClientProxyMembershipID context,
+ boolean returnTombstones) {
super(recipient, regionId, processor);
this.key = key;
this.cbArg = aCallbackArgument;
this.context = context;
this.returnTombstones = returnTombstones;
- this.allowReadFromHDFS = allowReadFromHDFS;
}
private static final boolean ORDER_PR_GETS = Boolean.getBoolean("gemfire.order-pr-gets");
@@ -191,7 +188,7 @@ public final class GetMessage extends PartitionMessageWithDirectReply
KeyInfo keyInfo = r.getKeyInfo(key, cbArg);
boolean lockEntry = forceUseOfPRExecutor || isDirectAck();
- val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones, allowReadFromHDFS);
+ val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones);
if(val == BucketRegion.REQUIRES_ENTRY_LOCK) {
Assert.assertTrue(!lockEntry);
@@ -272,14 +269,12 @@ public final class GetMessage extends PartitionMessageWithDirectReply
@Override
protected short computeCompressedShort(short s) {
s = super.computeCompressedShort(s);
- if (this.allowReadFromHDFS) s |= READ_FROM_HDFS;
return s;
}
@Override
protected void setBooleans(short s, DataInput in) throws ClassNotFoundException, IOException {
super.setBooleans(s, in);
- if ((s & READ_FROM_HDFS) != 0) this.allowReadFromHDFS = true;
}
public void setKey(Object key)
@@ -303,15 +298,18 @@ public final class GetMessage extends PartitionMessageWithDirectReply
* @throws ForceReattemptException if the peer is no longer available
*/
public static GetResponse send(InternalDistributedMember recipient,
- PartitionedRegion r, final Object key, final Object aCallbackArgument,
- ClientProxyMembershipID requestingClient, boolean returnTombstones, boolean allowReadFromHDFS)
+ PartitionedRegion r,
+ final Object key,
+ final Object aCallbackArgument,
+ ClientProxyMembershipID requestingClient,
+ boolean returnTombstones)
throws ForceReattemptException
{
Assert.assertTrue(recipient != null,
"PRDistribuedGetReplyMessage NULL reply message");
GetResponse p = new GetResponse(r.getSystem(), Collections.singleton(recipient), key);
GetMessage m = new GetMessage(recipient, r.getPRId(), p,
- key, aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS);
+ key, aCallbackArgument, requestingClient, returnTombstones);
Set failures = r.getDistributionManager().putOutgoing(m);
if (failures != null && failures.size() > 0) {
throw new ForceReattemptException(LocalizedStrings.GetMessage_FAILED_SENDING_0.toLocalizedString(m));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
index a88f96f..8aaf587 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
@@ -101,9 +101,8 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
protected static final short SKIP_CALLBACKS = (HAS_BRIDGE_CONTEXT << 1);
- protected static final short FETCH_FROM_HDFS = (SKIP_CALLBACKS << 1);
//using the left most bit for IS_PUT_DML, the last available bit
- protected static final short IS_PUT_DML = (short) (FETCH_FROM_HDFS << 1);
+ protected static final short IS_PUT_DML = (short) (SKIP_CALLBACKS << 1);
private transient InternalDistributedSystem internalDs;
@@ -118,9 +117,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
transient VersionedObjectList versions = null;
- /** whether this operation should fetch oldValue from HDFS */
- private boolean fetchFromHDFS;
-
private boolean isPutDML;
/**
* Empty constructor to satisfy {@link DataSerializer}requirements
@@ -129,7 +125,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
}
public PutAllPRMessage(int bucketId, int size, boolean notificationOnly,
- boolean posDup, boolean skipCallbacks, Object callbackArg, boolean fetchFromHDFS, boolean isPutDML) {
+ boolean posDup, boolean skipCallbacks, Object callbackArg, boolean isPutDML) {
this.bucketId = Integer.valueOf(bucketId);
putAllPRData = new PutAllEntryData[size];
this.notificationOnly = notificationOnly;
@@ -137,8 +133,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
this.skipCallbacks = skipCallbacks;
this.callbackArg = callbackArg;
initTxMemberId();
- this.fetchFromHDFS = fetchFromHDFS;
- this.isPutDML = isPutDML;
+ this.isPutDML = isPutDML;
}
public void addEntry(PutAllEntryData entry) {
@@ -307,7 +302,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
s = super.computeCompressedShort(s);
if (this.bridgeContext != null) s |= HAS_BRIDGE_CONTEXT;
if (this.skipCallbacks) s |= SKIP_CALLBACKS;
- if (this.fetchFromHDFS) s |= FETCH_FROM_HDFS;
if (this.isPutDML) s |= IS_PUT_DML;
return s;
}
@@ -317,7 +311,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
ClassNotFoundException {
super.setBooleans(s, in);
this.skipCallbacks = ((s & SKIP_CALLBACKS) != 0);
- this.fetchFromHDFS = ((s & FETCH_FROM_HDFS) != 0);
this.isPutDML = ((s & IS_PUT_DML) != 0);
}
@@ -495,9 +488,6 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
ev.setPutAllOperation(dpao);
- // set the fetchFromHDFS flag
- ev.setFetchFromHDFS(this.fetchFromHDFS);
-
// make sure a local update inserts a cache de-serializable
ev.makeSerializedNewValue();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
index d5abaa1..a6a39dc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
@@ -182,9 +182,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
private VersionTag versionTag;
- /** whether this operation should fetch oldValue from HDFS*/
- private transient boolean fetchFromHDFS;
-
private transient boolean isPutDML;
// additional bitmask flags used for serialization/deserialization
@@ -208,7 +205,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
// masks there are taken
// also switching the masks will impact backwards compatibility. Need to
// verify if it is ok to break backwards compatibility
- protected static final int FETCH_FROM_HDFS = getNextByteMask(HAS_CALLBACKARG);
/*
private byte[] oldValBytes;
@@ -608,9 +604,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
this.originalSender = (InternalDistributedMember)DataSerializer
.readObject(in);
}
- if ((extraFlags & FETCH_FROM_HDFS) != 0) {
- this.fetchFromHDFS = true;
- }
this.eventId = new EventID();
InternalDataSerializer.invokeFromData(this.eventId, in);
@@ -697,7 +690,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
extraFlags |= HAS_DELTA_WITH_FULL_VALUE;
}
if (this.originalSender != null) extraFlags |= HAS_ORIGINAL_SENDER;
- if (this.event.isFetchFromHDFS()) extraFlags |= FETCH_FROM_HDFS;
out.writeByte(extraFlags);
DataSerializer.writeObject(getKey(), out);
@@ -822,7 +814,6 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
ev.setCausedByMessage(this);
ev.setInvokePRCallbacks(!notificationOnly);
ev.setPossibleDuplicate(this.posDup);
- ev.setFetchFromHDFS(this.fetchFromHDFS);
ev.setPutDML(this.isPutDML);
/*if (this.hasOldValue) {
if (this.oldValueIsSerialized) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
deleted file mode 100644
index 5c199ae..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import org.apache.hadoop.hbase.util.Bytes;
-
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
-
-/**
- * Compares objects byte-by-byte. This is fast and sufficient for cases when
- * lexicographic ordering is not important or the serialization is order-
- * preserving.
- *
- */
-public class ByteComparator implements SerializedComparator {
- @Override
- public int compare(byte[] rhs, byte[] lhs) {
- return compare(rhs, 0, rhs.length, lhs, 0, lhs.length);
- }
-
- @Override
- public int compare(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) {
- return compareBytes(r, rOff, rLen, l, lOff, lLen);
- }
-
- /**
- * Compares two byte arrays element-by-element.
- *
- * @param r the right array
- * @param rOff the offset of r
- * @param rLen the length of r to compare
- * @param l the left array
- * @param lOff the offset of l
- * @param lLen the length of l to compare
- * @return -1 if r < l; 0 if r == l; 1 if r > 1
- */
-
- public static int compareBytes(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) {
- return Bytes.compareTo(r, rOff, rLen, l, lOff, lLen);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
deleted file mode 100644
index dacc208..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CursorIterator.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.persistence.soplog;
-
-import java.util.Iterator;
-
-/**
- * Provides an {@link Iterator} that allows access to the current iteration
- * element. The implementor must provide access to the current element
- * as well as a means to move to the next element.
- *
- *
- * @param <E> the element type
- */
-public interface CursorIterator<E> extends Iterator<E> {
- /**
- * Returns the element at the current position.
- * @return the current element
- */
- E current();
-
- /**
- * Provides an iteration cursor by wrapping an {@link Iterator}.
- *
- * @param <E> the element type
- */
- public static class WrappedIterator<E> implements CursorIterator<E> {
- /** the underlying iterator */
- private final Iterator<E> src;
-
- /** the current iteration element */
- private E current;
-
- public WrappedIterator(Iterator<E> src) {
- this.src = src;
- }
-
- @Override
- public boolean hasNext() {
- return src.hasNext();
- }
-
- @Override
- public E next() {
- current = src.next();
- return current;
- }
-
- @Override
- public E current() {
- return current;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Returns the unwrapped interator.
- * @return the iterator
- */
- public Iterator<E> unwrap() {
- return src;
- }
- }
-}