You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:59 UTC

[13/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 328c196..c75286e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -92,12 +93,22 @@ import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
 import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
 import com.gemstone.gemfire.cache.TransactionException;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
 import com.gemstone.gemfire.cache.execute.EmtpyRegionFunctionException;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionContext;
 import com.gemstone.gemfire.cache.execute.FunctionException;
 import com.gemstone.gemfire.cache.execute.FunctionService;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.CompactionStatus;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionArgs;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
 import com.gemstone.gemfire.cache.partition.PartitionNotAvailableException;
 import com.gemstone.gemfire.cache.query.FunctionDomainException;
@@ -213,6 +224,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.PutAllPRMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.PutMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.PutMessage.PutResult;
 import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor;
+import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.BucketVisitor;
 import com.gemstone.gemfire.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
 import com.gemstone.gemfire.internal.cache.partitioned.RemoveAllPRMessage;
 import com.gemstone.gemfire.internal.cache.partitioned.RemoveIndexesMessage;
@@ -244,6 +256,7 @@ import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.RegionLogger;
 import com.gemstone.gemfire.internal.util.TransformUtils;
+import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
 import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
 import com.gemstone.gemfire.i18n.StringId;
 
@@ -695,9 +708,17 @@ public class PartitionedRegion extends LocalRegion implements
   private final PartitionListener[] partitionListeners;
 
   private boolean isShadowPR = false;
-
+  private boolean isShadowPRForHDFS = false;
+  
   private AbstractGatewaySender parallelGatewaySender = null;
   
+  private final ThreadLocal<Boolean> queryHDFS = new ThreadLocal<Boolean>() {
+    @Override
+    protected Boolean initialValue() {
+      return false;
+    }
+  };
+  
   public PartitionedRegion(String regionname, RegionAttributes ra,
       LocalRegion parentRegion, GemFireCacheImpl cache,
       InternalRegionArguments internalRegionArgs) {
@@ -717,6 +738,12 @@ public class PartitionedRegion extends LocalRegion implements
     // (which prevents pridmap cleanup).
     cache.getDistributedSystem().addDisconnectListener(dsPRIdCleanUpListener);
     
+    // add an async queue for the region if the store name is not null. 
+    if (this.getHDFSStoreName() != null) {
+      String eventQueueName = getHDFSEventQueueName();
+      super.addAsyncEventQueueId(eventQueueName);
+    }
+
     // this.userScope = ra.getScope();
     this.partitionAttributes = ra.getPartitionAttributes();
     this.localMaxMemory = this.partitionAttributes.getLocalMaxMemory();
@@ -795,6 +822,8 @@ public class PartitionedRegion extends LocalRegion implements
     if (internalRegionArgs.isUsedForParallelGatewaySenderQueue()) {
       this.isShadowPR = true;
       this.parallelGatewaySender = internalRegionArgs.getParallelGatewaySender();
+      if (internalRegionArgs.isUsedForHDFSParallelGatewaySenderQueue())
+        this.isShadowPRForHDFS = true;
     }
     
     
@@ -838,10 +867,38 @@ public class PartitionedRegion extends LocalRegion implements
     });
   }
 
+  @Override
+  public final boolean isHDFSRegion() {
+    return this.getHDFSStoreName() != null;
+  }
+
+  @Override
+  public final boolean isHDFSReadWriteRegion() {
+    return isHDFSRegion() && !getHDFSWriteOnly();
+  }
+
+  @Override
+  protected final boolean isHDFSWriteOnly() {
+    return isHDFSRegion() && getHDFSWriteOnly();
+  }
+
+  public final void setQueryHDFS(boolean includeHDFS) {
+    queryHDFS.set(includeHDFS);
+  }
+
+  @Override
+  public final boolean includeHDFSResults() {
+    return queryHDFS.get();
+  }
+
   public final boolean isShadowPR() {
     return isShadowPR;
   }
 
+  public final boolean isShadowPRForHDFS() {
+    return isShadowPRForHDFS;
+  }
+  
   public AbstractGatewaySender getParallelGatewaySender() {
     return parallelGatewaySender;
   }
@@ -1607,7 +1664,7 @@ public class PartitionedRegion extends LocalRegion implements
       try {
         final boolean loc = (this.localMaxMemory > 0) && retryNode.equals(getMyId());
         if (loc) {
-          ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones);
+          ret = this.dataStore.getEntryLocally(bucketId, key, access, allowTombstones, true);
         } else {
           ret = getEntryRemotely(retryNode, bucketIdInt, key, access, allowTombstones);
           // TODO:Suranjan&Yogesh : there should be better way than this one
@@ -2066,7 +2123,8 @@ public class PartitionedRegion extends LocalRegion implements
           bucketStorageAssigned=false;
           // if this is a Delta update, then throw exception since the key doesn't
           // exist if there is no bucket for it yet
-          if (event.hasDelta()) {
+          // For HDFS region, we will recover key, so allow bucket creation
+          if (!this.dataPolicy.withHDFS() && event.hasDelta()) {
             throw new EntryNotFoundException(LocalizedStrings.
               PartitionedRegion_CANNOT_APPLY_A_DELTA_WITHOUT_EXISTING_ENTRY
                 .toLocalizedString());
@@ -3261,9 +3319,9 @@ public class PartitionedRegion extends LocalRegion implements
    */
    @Override
   public Object get(Object key, Object aCallbackArgument,
-                    boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
-                    ClientProxyMembershipID requestingClient,
-                    EntryEventImpl clientEvent, boolean returnTombstones) throws TimeoutException, CacheLoaderException
+      boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+      ClientProxyMembershipID requestingClient,
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException
   {
     validateKey(key);
     validateCallbackArg(aCallbackArgument);
@@ -3277,7 +3335,7 @@ public class PartitionedRegion extends LocalRegion implements
       // if scope is local and there is no loader, then
       // don't go further to try and get value
       Object value = getDataView().findObject(getKeyInfo(key, aCallbackArgument), this, true/*isCreate*/, generateCallbacks,
-                                      null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+                                      null /*no local value*/, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
       if (value != null && !Token.isInvalid(value)) {
         miss = false;
       }
@@ -3323,7 +3381,7 @@ public class PartitionedRegion extends LocalRegion implements
     if (primary == null) {
       return null;
     }
-    if (isTX()) {
+    if (isTX() || this.hdfsStoreName != null) {
       return getNodeForBucketWrite(bucketId, null);
     }
     InternalDistributedMember result =  getRegionAdvisor().getPreferredNode(bucketId);
@@ -3337,7 +3395,7 @@ public class PartitionedRegion extends LocalRegion implements
    */
   private InternalDistributedMember getNodeForBucketReadOrLoad(int bucketId) {
     InternalDistributedMember targetNode;
-    if (!this.haveCacheLoader) {
+    if (!this.haveCacheLoader && (this.hdfsStoreName == null)) {
       targetNode = getNodeForBucketRead(bucketId);
     }
     else {
@@ -3470,16 +3528,9 @@ public class PartitionedRegion extends LocalRegion implements
   }
 
   @Override
-  protected Object findObjectInSystem(KeyInfo keyInfo,
-                                      boolean isCreate,
-                                      TXStateInterface tx,
-                                      boolean generateCallbacks,
-                                      Object localValue,
-                                      boolean disableCopyOnRead,
-                                      boolean preferCD,
-                                      ClientProxyMembershipID requestingClient,
-                                      EntryEventImpl clientEvent,
-                                      boolean returnTombstones)
+  protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+      TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
       throws CacheLoaderException, TimeoutException
   {
     Object obj = null;
@@ -3515,7 +3566,7 @@ public class PartitionedRegion extends LocalRegion implements
         return null;
       }
       
-      obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry);
+      obj = getFromBucket(targetNode, bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowRetry, allowReadFromHDFS);
     }
     finally {
       this.prStats.endGet(startTime);
@@ -4098,22 +4149,15 @@ public class PartitionedRegion extends LocalRegion implements
 
   /**
    * no docs
-   * @param preferCD
+   * @param preferCD 
    * @param requestingClient the client requesting the object, or null if not from a client
    * @param clientEvent TODO
    * @param returnTombstones TODO
    * @param allowRetry if false then do not retry
    */
   private Object getFromBucket(final InternalDistributedMember targetNode,
-                               int bucketId,
-                               final Object key,
-                               final Object aCallbackArgument,
-                               boolean disableCopyOnRead,
-                               boolean preferCD,
-                               ClientProxyMembershipID requestingClient,
-                               EntryEventImpl clientEvent,
-                               boolean returnTombstones,
-                               boolean allowRetry) {
+      int bucketId, final Object key, final Object aCallbackArgument,
+      boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowRetry, boolean allowReadFromHDFS) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
     final int retryAttempts = calcRetry();
@@ -4143,7 +4187,7 @@ public class PartitionedRegion extends LocalRegion implements
       try {
         if (isLocal) {
           obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
-              disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false);
+              disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS);
         }
         else {
             if (localCacheEnabled && null != (obj = localCacheGet(key))) { // OFFHEAP: copy into heap cd; TODO optimize for preferCD case
@@ -4152,14 +4196,14 @@ public class PartitionedRegion extends LocalRegion implements
               }
               return obj;
             }
-            else if (this.haveCacheLoader) {
+            else if (this.haveCacheLoader || this.hdfsStoreName != null) {
               // If the region has a cache loader, 
               // the target node is the primary server of the bucket. But, if the 
               // value can be found in a local bucket, we should first try there. 
 
               /* MergeGemXDHDFSToGFE -readoing from local bucket was disabled in GemXD*/
 			  if (null != ( obj = getFromLocalBucket(bucketId, key, aCallbackArgument,
-                  disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones))) {
+                  disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS))) {
                 return obj;
               } 
             }
@@ -4167,7 +4211,7 @@ public class PartitionedRegion extends LocalRegion implements
           //  Test hook
           if (((LocalRegion)this).isTest())
             ((LocalRegion)this).incCountNotFoundInLocal();
-          obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones);
+          obj = getRemotely(retryNode, bucketId, key, aCallbackArgument, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
  
           // TODO:Suranjan&Yogesh : there should be better way than this one
           String name = Thread.currentThread().getName();
@@ -4265,9 +4309,9 @@ public class PartitionedRegion extends LocalRegion implements
    *   
    */
   public Object getFromLocalBucket(int bucketId, final Object key,
-                                   final Object aCallbackArgument, boolean disableCopyOnRead,
-                                   boolean preferCD, ClientProxyMembershipID requestingClient,
-                                   EntryEventImpl clientEvent, boolean returnTombstones)
+		final Object aCallbackArgument, boolean disableCopyOnRead,
+		boolean preferCD, ClientProxyMembershipID requestingClient,
+		EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
 		throws ForceReattemptException, PRLocallyDestroyedException {
     Object obj;
     // try reading locally. 
@@ -4276,7 +4320,7 @@ public class PartitionedRegion extends LocalRegion implements
       return null; // fixes 51657
     }
     if (readNode.equals(getMyId()) && null != ( obj = this.dataStore.getLocally(bucketId, key, aCallbackArgument,
-      disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true))) {
+      disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, true, allowReadFromHDFS))) {
 	  if (logger.isTraceEnabled()) {
             logger.trace("getFromBucket: Getting key {} ({}) locally - success", key, key.hashCode());
 	  }
@@ -5072,13 +5116,7 @@ public class PartitionedRegion extends LocalRegion implements
    *                 if the peer is no longer available
    */
   public Object getRemotely(InternalDistributedMember targetNode,
-                            int bucketId,
-                            final Object key,
-                            final Object aCallbackArgument,
-                            boolean preferCD,
-                            ClientProxyMembershipID requestingClient,
-                            EntryEventImpl clientEvent,
-                            boolean returnTombstones) throws PrimaryBucketException,
+      int bucketId, final Object key, final Object aCallbackArgument, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
       ForceReattemptException {
     Object value;
     if (logger.isDebugEnabled()) {
@@ -5086,7 +5124,7 @@ public class PartitionedRegion extends LocalRegion implements
           getPRId(), BUCKET_ID_SEPARATOR, bucketId, key);
     }
     GetResponse response = GetMessage.send(targetNode, this, key,
-        aCallbackArgument, requestingClient, returnTombstones);
+        aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS);
     this.prStats.incPartitionMessagesSent();
     value = response.waitForResponse(preferCD);
     if (clientEvent != null) {
@@ -7040,6 +7078,9 @@ public class PartitionedRegion extends LocalRegion implements
   public int entryCount(Set<Integer> buckets,
       boolean estimate) {
     Map<Integer, SizeEntry> bucketSizes = null;
+ 	if (isHDFSReadWriteRegion() && (includeHDFSResults() || estimate)) {
+      bucketSizes = getSizeForHDFS( buckets, estimate);
+	} else {
     if (buckets != null) {
       if (this.dataStore != null) {
         List<Integer> list = new ArrayList<Integer>();	
@@ -7071,6 +7112,7 @@ public class PartitionedRegion extends LocalRegion implements
         }
       }
     }
+ 	}
 
     int size = 0;
     if (bucketSizes != null) {
@@ -7093,7 +7135,81 @@ public class PartitionedRegion extends LocalRegion implements
       return 0;
     }
   }
+  private Map<Integer, SizeEntry> getSizeForHDFS(final Set<Integer> buckets, boolean estimate) {
+    // figure out which buckets to include
+    Map<Integer, SizeEntry> bucketSizes = new HashMap<Integer, SizeEntry>();
+    getRegionAdvisor().accept(new BucketVisitor<Map<Integer, SizeEntry>>() {
+      @Override
+      public boolean visit(RegionAdvisor advisor, ProxyBucketRegion pbr,
+          Map<Integer, SizeEntry> map) {
+        if (buckets == null || buckets.contains(pbr.getBucketId())) {
+          map.put(pbr.getBucketId(), null);
+          // ensure that the bucket has been created
+          pbr.getPartitionedRegion().getOrCreateNodeForBucketWrite(pbr.getBucketId(), null);
+        }
+        return true;
+      }
+    }, bucketSizes);
 
+    RetryTimeKeeper retry = new RetryTimeKeeper(retryTimeout);
+
+    while (true) {
+      // get the size from local buckets
+      if (dataStore != null) {
+        Map<Integer, SizeEntry> localSizes;
+        if (estimate) {
+          localSizes = dataStore.getSizeEstimateForLocalPrimaryBuckets();
+        } else {
+          localSizes = dataStore.getSizeForLocalPrimaryBuckets();
+        }
+        for (Map.Entry<Integer, SizeEntry> me : localSizes.entrySet()) {
+          if (bucketSizes.containsKey(me.getKey())) {
+            bucketSizes.put(me.getKey(), me.getValue());
+          }
+        }
+      }
+      // all done
+      int count = 0;
+      Iterator it = bucketSizes.values().iterator();
+      while (it.hasNext()) {
+        if (it.next() != null) count++;
+      }
+      if (bucketSizes.size() == count) {
+        return bucketSizes;
+      }
+      
+      Set<InternalDistributedMember> remotes = getRegionAdvisor().adviseDataStore(true);
+      remotes.remove(getMyId());
+      
+      // collect remote sizes
+      if (!remotes.isEmpty()) {
+        Map<Integer, SizeEntry> remoteSizes = new HashMap<Integer, PartitionedRegion.SizeEntry>();
+        try {
+          remoteSizes = getSizeRemotely(remotes, estimate);
+        } catch (ReplyException e) {
+          // Remote member will never throw ForceReattemptException or
+          // PrimaryBucketException, so any exception on the remote member
+          // should be re-thrown
+          e.handleAsUnexpected();
+        }
+        for (Map.Entry<Integer, SizeEntry> me : remoteSizes.entrySet()) {
+          Integer k = me.getKey();
+          if (bucketSizes.containsKey(k) && me.getValue().isPrimary()) {
+            bucketSizes.put(k, me.getValue());
+          }
+        }
+      }
+      
+      if (retry.overMaximum()) {
+        checkReadiness();
+        PRHARedundancyProvider.timedOut(this, null, null, "calculate size", retry.getRetryTime());
+      }
+      
+      // throttle subsequent attempts
+      retry.waitForBucketsRecovery();
+    }
+  }
+  
   /**
    * This method gets a PartitionServerSocketConnection to targetNode and sends
    * size request to the node. It returns size of all the buckets "primarily"
@@ -7491,7 +7607,9 @@ public class PartitionedRegion extends LocalRegion implements
       .append("; isClosed=").append(this.isClosed)
       .append("; retryTimeout=").append(this.retryTimeout)
       .append("; serialNumber=").append(getSerialNumber())
-
+	  .append("; hdfsStoreName=").append(getHDFSStoreName())
+      .append("; hdfsWriteOnly=").append(getHDFSWriteOnly())
+      
       .append("; partition attributes=").append(getPartitionAttributes().toString())
       .append("; on VM ").append(getMyId())
       .append("]")
@@ -7634,6 +7752,18 @@ public class PartitionedRegion extends LocalRegion implements
   @Override
   public void destroyRegion(Object aCallbackArgument)
       throws CacheWriterException, TimeoutException {
+    //For HDFS regions, we need a data store
+    //to do the global destroy so that it can delete
+    //the data from HDFS as well.
+    if(!isDataStore() && this.dataPolicy.withHDFS()) {
+      if(destroyOnDataStore(aCallbackArgument)) {
+        //If we were able to find a data store to do the destroy,
+        //stop here.
+        //otherwise go ahead and destroy the region from this member
+        return;
+      }
+    }
+
     checkForColocatedChildren();
     getDataView().checkSupportsRegionDestroy();
     checkForLimitedOrNoAccess();
@@ -7681,6 +7811,7 @@ public class PartitionedRegion extends LocalRegion implements
 
     boolean keepWaiting = true;
 
+    AsyncEventQueueImpl hdfsQueue = getHDFSEventQueue();
     while(true) {
       List<String> pausedSenders = new ArrayList<String>();
       List<ConcurrentParallelGatewaySenderQueue> parallelQueues = new ArrayList<ConcurrentParallelGatewaySenderQueue>();
@@ -7798,6 +7929,11 @@ public class PartitionedRegion extends LocalRegion implements
         }
       }
     }
+    
+    if(hdfsQueue != null) {
+      hdfsQueue.destroy();
+      cache.removeAsyncEventQueue(hdfsQueue);
+    }
   }
         
   @Override
@@ -7978,6 +8114,9 @@ public class PartitionedRegion extends LocalRegion implements
     final boolean isClose = event.getOperation().isClose();
     destroyPartitionedRegionLocally(!isClose);
     destroyCleanUp(event, serials);
+	if(!isClose) {
+      destroyHDFSData();
+    }
     return true;
   }
 
@@ -8270,6 +8409,8 @@ public class PartitionedRegion extends LocalRegion implements
       }
     }
     
+    HDFSRegionDirector.getInstance().clear(getFullPath());
+    
     RegionLogger.logDestroy(getName(), cache.getMyId(), null, op.isClose());
   }
 
@@ -10914,6 +11055,11 @@ public class PartitionedRegion extends LocalRegion implements
         }
       }
       
+      //hoplogs - pause HDFS dispatcher while we 
+      //clear the buckets to avoid missing some files
+      //during the clear
+      pauseHDFSDispatcher();
+
       try {
         // now clear the bucket regions; we go through the primary bucket
         // regions so there is distribution for every bucket but that
@@ -10929,6 +11075,7 @@ public class PartitionedRegion extends LocalRegion implements
           }
         }
       } finally {
+        resumeHDFSDispatcher();
         // release the bucket locks
         for (BucketRegion br : lockedRegions) {
           try {
@@ -10944,6 +11091,247 @@ public class PartitionedRegion extends LocalRegion implements
     }
     
   }
+  
+  /**Destroy all data in HDFS, if this region is using HDFS persistence.*/
+  private void destroyHDFSData() {
+    if(getHDFSStoreName() == null) {
+      return;
+    }
+    
+    try {
+      hdfsManager.destroyData();
+    } catch (IOException e) {
+      logger.warn(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA, e);
+    }
+  }
+
+  private void pauseHDFSDispatcher() {
+    if(!isHDFSRegion()) {
+      return;
+    }
+    AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
+    if (eventProcessor == null) return;
+    eventProcessor.pauseDispatching();
+    eventProcessor.waitForDispatcherToPause();
+  }
+  
+  /**
+   * Get the statistics for the HDFS event queue associated with this region,
+   * if any
+   */
+  public AsyncEventQueueStats getHDFSEventQueueStats() {
+    AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
+    if(asyncQ == null) {
+      return null;
+    }
+    return asyncQ.getStatistics();
+  }
+  
+  protected AbstractGatewaySenderEventProcessor getHDFSEventProcessor() {
+    final AsyncEventQueueImpl asyncQ = getHDFSEventQueue();
+    final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+    AbstractGatewaySenderEventProcessor eventProcessor = gatewaySender.getEventProcessor();
+    return eventProcessor;
+  }
+
+  public AsyncEventQueueImpl getHDFSEventQueue() {
+    String asyncQId = getHDFSEventQueueName();
+    if(asyncQId == null) {
+      return null;
+    }
+    final AsyncEventQueueImpl asyncQ =  (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
+    return asyncQ;
+  }
+  
+  private void resumeHDFSDispatcher() {
+    if(!isHDFSRegion()) {
+      return;
+    }
+    AbstractGatewaySenderEventProcessor eventProcessor = getHDFSEventProcessor();
+    if (eventProcessor == null) return;
+    eventProcessor.resumeDispatching();
+  }
+
+  protected String getHDFSEventQueueName() {
+    if (!this.getDataPolicy().withHDFS()) return null;
+    String colocatedWith = this.getPartitionAttributes().getColocatedWith();
+    String eventQueueName;
+    if (colocatedWith != null) {
+      PartitionedRegion leader = ColocationHelper.getLeaderRegionName(this);
+      eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(leader
+          .getFullPath());
+    }
+    else {
+      eventQueueName = HDFSStoreFactoryImpl.getEventQueueName(getFullPath());
+    }
+    return eventQueueName;
+  }
+
+  /**
+   * schedules compaction on all members where this region is hosted.
+   * 
+   * @param isMajor
+   *          true for major compaction
+   * @param maxWaitTime
+   *          time to wait for the operation to complete, 0 will wait forever
+   */
+  @Override
+  public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) {
+    if (!this.isHDFSReadWriteRegion()) {
+      if (this.isHDFSRegion()) {
+        throw new UnsupportedOperationException(
+            LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
+                .toLocalizedString(getName()));
+      }
+      throw new UnsupportedOperationException(
+          LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+              .toLocalizedString(getName()));
+    }
+    // send request to remote data stores
+    long start = System.currentTimeMillis();
+    int waitTime = maxWaitTime * 1000;
+    HDFSForceCompactionArgs args = new HDFSForceCompactionArgs(getRegionAdvisor().getBucketSet(), isMajor, waitTime);
+    HDFSForceCompactionResultCollector rc = new HDFSForceCompactionResultCollector();
+    AbstractExecution execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
+    execution.setWaitOnExceptionFlag(true); // wait for all exceptions
+    if (logger.isDebugEnabled()) {
+      logger.debug("HDFS: ForceCompat invoking function with arguments "+args);
+    }
+    execution.execute(HDFSForceCompactionFunction.ID);
+    List<CompactionStatus> result = rc.getResult();
+    Set<Integer> successfulBuckets = rc.getSuccessfulBucketIds();
+    if (rc.shouldRetry()) {
+      int retries = 0;
+      while (retries < HDFSForceCompactionFunction.FORCE_COMPACTION_MAX_RETRIES) {
+        waitTime -= System.currentTimeMillis() - start;
+        if (maxWaitTime > 0 && waitTime < 0) {
+          break;
+        }
+        start = System.currentTimeMillis();
+        retries++;
+        Set<Integer> retryBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
+        retryBuckets.removeAll(successfulBuckets);
+        
+        for (int bucketId : retryBuckets) {
+          getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper(waitTime));
+          long now = System.currentTimeMillis();
+          waitTime -= now - start;
+          start = now;
+        }
+        
+        args = new HDFSForceCompactionArgs(retryBuckets, isMajor, waitTime);
+        rc = new HDFSForceCompactionResultCollector();
+        execution = (AbstractExecution) FunctionService.onRegion(this).withArgs(args).withCollector(rc);
+        execution.setWaitOnExceptionFlag(true); // wait for all exceptions
+        if (logger.isDebugEnabled()) {
+          logger.debug("HDFS: ForceCompat re-invoking function with arguments "+args+" filter:"+retryBuckets);
+        }
+        execution.execute(HDFSForceCompactionFunction.ID);
+        result = rc.getResult();
+        successfulBuckets.addAll(rc.getSuccessfulBucketIds());
+      }
+    }
+    if (successfulBuckets.size() != getRegionAdvisor().getBucketSet().size()) {
+      checkReadiness();
+      Set<Integer> uncessfulBuckets = new HashSet<Integer>(getRegionAdvisor().getBucketSet());
+      uncessfulBuckets.removeAll(successfulBuckets);
+      throw new FunctionException("Could not run compaction on following buckets:"+uncessfulBuckets);
+    }
+  }
+
+  /**
+   * Schedules compaction on local buckets
+   * @param buckets the set of buckets to compact
+   * @param isMajor true for major compaction
+   * @param time TODO use this
+   * @return a list of futures for the scheduled compaction tasks
+   */
+  public List<Future<CompactionStatus>> forceLocalHDFSCompaction(Set<Integer> buckets, boolean isMajor, long time) {
+    List<Future<CompactionStatus>> futures = new ArrayList<Future<CompactionStatus>>();
+    if (!isDataStore() || hdfsManager == null || buckets == null || buckets.isEmpty()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "HDFS: did not schedule local " + (isMajor ? "Major" : "Minor") + " compaction");
+      }
+      // nothing to do
+      return futures;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "HDFS: scheduling local " + (isMajor ? "Major" : "Minor") + " compaction for buckets:"+buckets);
+    }
+    Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers(buckets);
+    
+    for (HoplogOrganizer hoplogOrganizer : organizers) {
+      Future<CompactionStatus> f = hoplogOrganizer.forceCompaction(isMajor);
+      futures.add(f);
+    }
+    return futures;
+  }
+  
+  @Override
+  public void flushHDFSQueue(int maxWaitTime) {
+    if (!this.isHDFSRegion()) {
+      throw new UnsupportedOperationException(
+          LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+              .toLocalizedString(getName()));
+    }
+    HDFSFlushQueueFunction.flushQueue(this, maxWaitTime);
+  }
+  
+  @Override
+  public long lastMajorHDFSCompaction() {
+    if (!this.isHDFSReadWriteRegion()) {
+      if (this.isHDFSRegion()) {
+        throw new UnsupportedOperationException(
+            LocalizedStrings.HOPLOG_CONFIGURED_AS_WRITEONLY
+                .toLocalizedString(getName()));
+      }
+      throw new UnsupportedOperationException(
+          LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+              .toLocalizedString(getName()));
+    }
+    List<Long> result = (List<Long>) FunctionService.onRegion(this)
+        .execute(HDFSLastCompactionTimeFunction.ID)
+        .getResult();
+    if (logger.isDebugEnabled()) {
+      logger.debug("HDFS: Result of LastCompactionTimeFunction "+result);
+    }
+    long min = Long.MAX_VALUE;
+    for (long ts : result) {
+      if (ts !=0 && ts < min) {
+        min = ts;
+      }
+    }
+    min = min == Long.MAX_VALUE ? 0 : min;
+    return min;
+  }
+
+  public long lastLocalMajorHDFSCompaction() {
+    if (!isDataStore() || hdfsManager == null) {
+      // nothing to do
+      return 0;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "HDFS: getting local Major compaction time");
+    }
+    Collection<HoplogOrganizer> organizers = hdfsManager.getBucketOrganizers();
+    long minTS = Long.MAX_VALUE;
+    for (HoplogOrganizer hoplogOrganizer : organizers) {
+      long ts = hoplogOrganizer.getLastMajorCompactionTimestamp();
+      if (ts !=0 && ts < minTS) {
+        minTS = ts;
+      }
+    }
+    minTS = minTS == Long.MAX_VALUE ? 0 : minTS;
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "HDFS: local Major compaction time: "+minTS);
+    }
+    return minTS;
+  }
+
 
   public void shadowPRWaitForBucketRecovery() {
     assert this.isShadowPR();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
index bda68e3..57b1e71 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataStore.java
@@ -64,6 +64,7 @@ import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionException;
 import com.gemstone.gemfire.cache.execute.ResultSender;
 import com.gemstone.gemfire.cache.query.QueryInvalidException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
 import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
 import com.gemstone.gemfire.cache.query.internal.QCompiler;
 import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
@@ -2058,13 +2059,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
       ForceReattemptException, PRLocallyDestroyedException
   {
 	  return getLocally(bucketId, key,aCallbackArgument, disableCopyOnRead, preferCD, requestingClient, 
-			  clientEvent, returnTombstones, false);
+			  clientEvent, returnTombstones, false, false);
   }
   /**
    * Returns value corresponding to this key.
    * @param key
    *          the key to look for
-   * @param preferCD
+   * @param preferCD 
    * @param requestingClient the client making the request, or null
    * @param clientEvent client's event (for returning version tag)
    * @param returnTombstones whether tombstones should be returned
@@ -2075,28 +2076,21 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
    * @throws PrimaryBucketException if the locally managed bucket is not primary
    * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed
    */
-  public Object getLocally(int bucketId,
-                           final Object key,
-                           final Object aCallbackArgument,
-                           boolean disableCopyOnRead,
-                           boolean preferCD,
-                           ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent,
-                           boolean returnTombstones,
-                           boolean opScopeIsLocal) throws PrimaryBucketException,
+  public Object getLocally(int bucketId, final Object key,
+      final Object aCallbackArgument, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, 
+      boolean returnTombstones, boolean opScopeIsLocal, boolean allowReadFromHDFS) throws PrimaryBucketException,
       ForceReattemptException, PRLocallyDestroyedException
   {
     final BucketRegion bucketRegion = getInitializedBucketForId(key, Integer.valueOf(bucketId));
     //  check for primary (when a loader is present) done deeper in the BucketRegion
     Object ret=null;
     if (logger.isDebugEnabled()) {
-      logger.debug("getLocally:  key {}) bucketId={}{}{} region {} returnTombstones {} ", key,
-          this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones);
+      logger.debug("getLocally:  key {}) bucketId={}{}{} region {} returnTombstones {} allowReadFromHDFS {}", key,
+          this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, bucketRegion.getName(), returnTombstones, allowReadFromHDFS);
     }
     invokeBucketReadHook();
     try {
-      ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal,
-        false);
+      ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead , preferCD, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, allowReadFromHDFS, false);
       checkIfBucketMoved(bucketRegion);
     }
     catch (RegionDestroyedException rde) {
@@ -2128,11 +2122,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
    * @throws PrimaryBucketException if the locally managed bucket is not primary
    * @see #getLocally(int, Object, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
    */
-  public RawValue getSerializedLocally(KeyInfo keyInfo,
-                                       boolean doNotLockEntry,
-                                       ClientProxyMembershipID requestingClient,
-                                       EntryEventImpl clientEvent,
-                                       boolean returnTombstones) throws PrimaryBucketException,
+  public RawValue getSerializedLocally(KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws PrimaryBucketException,
       ForceReattemptException {
     final BucketRegion bucketRegion = getInitializedBucketForId(keyInfo.getKey(), keyInfo.getBucketId());
     //  check for primary (when loader is present) done deeper in the BucketRegion
@@ -2143,7 +2133,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
     invokeBucketReadHook();
 
     try {
-      RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
+      RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
       checkIfBucketMoved(bucketRegion);
       return result;
     } catch (RegionDestroyedException rde) {
@@ -2167,7 +2157,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
    * @param access
    *          true if caller wants last accessed time updated
    * @param allowTombstones whether a tombstoned entry can be returned
-   *
+   * 
    * @throws ForceReattemptException
    *           if bucket region is not present in this process
    * @return a RegionEntry for the given key, which will be null if the key is
@@ -2178,7 +2168,7 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
    *           if the PartitionRegion is locally destroyed
    */
   public EntrySnapshot getEntryLocally(int bucketId, final Object key,
-                                       boolean access, boolean allowTombstones)
+      boolean access, boolean allowTombstones, boolean allowReadFromHDFS)
       throws EntryNotFoundException, PrimaryBucketException,
       ForceReattemptException, PRLocallyDestroyedException
   {
@@ -2191,7 +2181,12 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
     EntrySnapshot res = null;
     RegionEntry ent = null;
     try {
-      ent = bucketRegion.entries.getEntry(key);
+      if (allowReadFromHDFS) {
+        ent = bucketRegion.entries.getEntry(key);
+      }
+      else {
+        ent = bucketRegion.entries.getOperationalEntryInVM(key);
+      }
 
       if (ent == null) {
         this.getPartitionedRegion().checkReadiness();
@@ -2301,8 +2296,14 @@ public class PartitionedRegionDataStore implements HasCachePerfStats
     try{
       if (r != null) {
         Set keys = r.keySet(allowTombstones);
+        if (getPartitionedRegion().isHDFSReadWriteRegion()) {
+          // hdfs regions can't copy all keys into memory
+          ret = keys;
+
+        } else  { 
         // A copy is made so that the bucket is free to move
         ret = new HashSet(r.keySet(allowTombstones));
+		}
         checkIfBucketMoved(r);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
index de1f7d8..f083268 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDataView.java
@@ -65,19 +65,12 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
   }
 
   @Override
-  public Object findObject(KeyInfo key,
-                           LocalRegion r,
-                           boolean isCreate,
-                           boolean generateCallbacks,
-                           Object value,
-                           boolean disableCopyOnRead,
-                           boolean preferCD,
-                           ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent,
-                           boolean returnTombstones) {
+  public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
+      boolean generateCallbacks, Object value, boolean disableCopyOnRead,
+      boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
     TXStateProxy tx = r.cache.getTXMgr().internalSuspend();
     try {
-      return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+      return r.findObjectInSystem(key, isCreate, tx, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
     } finally {
       r.cache.getTXMgr().resume(tx);
     }
@@ -89,14 +82,10 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
     return pr.nonTXContainsKey(keyInfo);
   }
   @Override
-  public Object getSerializedValue(LocalRegion localRegion,
-                                   KeyInfo keyInfo,
-                                   boolean doNotLockEntry,
-                                   ClientProxyMembershipID requestingClient,
-                                   EntryEventImpl clientEvent,
-                                   boolean returnTombstones) throws DataLocationException {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient,
+  EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     PartitionedRegion pr = (PartitionedRegion)localRegion;
-    return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
+    return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
   }
   @Override
   public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
@@ -129,7 +118,7 @@ public class PartitionedRegionDataView extends LocalRegionDataView {
       boolean allowTombstones) throws DataLocationException {
     PartitionedRegion pr = (PartitionedRegion)localRegion;
     return pr.getDataStore().getEntryLocally(keyInfo.getBucketId(),
-        keyInfo.getKey(), false, allowTombstones);
+        keyInfo.getKey(), false, allowTombstones, true);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
index 6ce783a..a3ed32a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
@@ -122,6 +122,8 @@ public class PartitionedRegionHelper
     Set policies = new HashSet();
     policies.add(DEFAULT_DATA_POLICY);
     policies.add(DataPolicy.PERSISTENT_PARTITION);
+    policies.add(DataPolicy.HDFS_PARTITION);
+    policies.add(DataPolicy.HDFS_PERSISTENT_PARTITION);
 //    policies.add(DataPolicy.NORMAL);
     ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index 74c134b..f0a6543 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -626,6 +626,27 @@ final class ProxyRegionMap implements RegionMap {
     }
 
     @Override
+    public boolean isMarkedForEviction() {
+      throw new UnsupportedOperationException(LocalizedStrings
+          .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+              .toLocalizedString(DataPolicy.EMPTY));
+    }
+
+    @Override
+    public void setMarkedForEviction() {
+      throw new UnsupportedOperationException(LocalizedStrings
+          .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+              .toLocalizedString(DataPolicy.EMPTY));
+    }
+
+    @Override
+    public void clearMarkedForEviction() {
+      throw new UnsupportedOperationException(LocalizedStrings
+          .ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
+              .toLocalizedString(DataPolicy.EMPTY));
+    }
+
+    @Override
     public boolean isValueNull() {
       throw new UnsupportedOperationException(LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0.toLocalizedString(DataPolicy.EMPTY));
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
index bedbf81..5838ead 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEntry.java
@@ -35,6 +35,7 @@ import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+import com.gemstone.gemfire.cache.EvictionCriteria;
 
 /**
  * Internal interface for a region entry.
@@ -414,6 +415,25 @@ public interface RegionEntry {
   public void setUpdateInProgress(final boolean underUpdate);
 
   /**
+   * Returns true if this entry has been marked for eviction for custom eviction
+   * via {@link EvictionCriteria}.
+   */
+  public boolean isMarkedForEviction();
+
+  /**
+   * Marks this entry for eviction by custom eviction via
+   * {@link EvictionCriteria}.
+   */
+  public void setMarkedForEviction();
+
+  /**
+   * Clears this entry as for eviction by custom eviction via
+   * {@link EvictionCriteria} or when an update is done after it was marked for
+   * eviction.
+   */
+  public void clearMarkedForEviction();
+
+  /**
    * Event containing this RegionEntry is being passed through
    * dispatchListenerEvent for CacheListeners under RegionEntry lock. This is
    * used during deserialization for a VMCacheSerializable value contained by

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
index 7a97408..2a7f0c4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMapFactory.java
@@ -39,6 +39,12 @@ class RegionMapFactory {
     //.getDataPolicy().withPartitioning());
     if (owner.isProxy() /*|| owner instanceof PartitionedRegion*/) { // TODO enabling this causes eviction tests to fail
       return new ProxyRegionMap(owner, attrs, internalRegionArgs);
+    } else if (internalRegionArgs.isReadWriteHDFSRegion()) {
+      if (owner.getEvictionController() == null) {
+        return new HDFSRegionMapImpl(owner, attrs, internalRegionArgs);
+      }
+      return new HDFSLRURegionMap(owner, attrs, internalRegionArgs);
+    //else if (owner.getEvictionController() != null && isNotPartitionedRegion) {
     } else if (owner.getEvictionController() != null ) {
       return new VMLRURegionMap(owner, attrs,internalRegionArgs);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
index b565a2c..c754339 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RemoteGetMessage.java
@@ -122,7 +122,7 @@ public final class RemoteGetMessage extends RemoteOperationMessageWithDirectRepl
           ((KeyWithRegionContext)this.key).setRegionContext(r);
         }
         KeyInfo keyInfo = r.getKeyInfo(key, cbArg);
-        val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false /*for replicate regions*/);
+        val = r.getDataView().getSerializedValue(r, keyInfo, false, this.context, null, false, false/*for replicate regions*/);
         valueBytes = val instanceof RawValue ? (RawValue)val : new RawValue(val);
 
         if (logger.isTraceEnabled(LogMarker.DM)) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
index 2906ff6..983f928 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEntry.java
@@ -113,8 +113,7 @@ public class TXEntry implements Region.Entry
   {
     checkTX();
 //    Object value = this.localRegion.getDeserialized(this.key, false, this.myTX, this.rememberReads);
-    @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false,
-      false);
+    @Unretained Object value = this.myTX.getDeserializedValue(keyInfo, this.localRegion, false, false, false, null, false, false, false);
     if (value == null) {
       throw new EntryDestroyedException(this.keyInfo.getKey().toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index 617873c..a67d3cc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -1407,14 +1407,7 @@ public class TXState implements TXStateInterface {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
-  public Object getDeserializedValue(KeyInfo keyInfo,
-                                     LocalRegion localRegion,
-                                     boolean updateStats,
-                                     boolean disableCopyOnRead,
-                                     boolean preferCD,
-                                     EntryEventImpl clientEvent,
-                                     boolean returnTombstones,
-                                     boolean retainResult) {
+  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
     TXEntryState tx = txReadEntry(keyInfo, localRegion, true, true/*create txEntry is absent*/);
     if (tx != null) {
       Object v = tx.getValue(keyInfo, localRegion, preferCD);
@@ -1423,8 +1416,7 @@ public class TXState implements TXStateInterface {
       }
       return v;
     } else {
-      return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
-        retainResult);
+      return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
     }
   }
 
@@ -1433,19 +1425,15 @@ public class TXState implements TXStateInterface {
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
    */
   @Retained
-  public Object getSerializedValue(LocalRegion localRegion,
-                                   KeyInfo keyInfo,
-                                   boolean doNotLockEntry,
-                                   ClientProxyMembershipID requestingClient,
-                                   EntryEventImpl clientEvent,
-                                   boolean returnTombstones) throws DataLocationException {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo keyInfo, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, 
+      boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     final Object key = keyInfo.getKey();
     TXEntryState tx = txReadEntry(keyInfo, localRegion, true,true/*create txEntry is absent*/);
     if (tx != null) {
       Object val = tx.getPendingValue();
       if(val==null || Token.isInvalidOrRemoved(val)) {
         val = findObject(keyInfo,localRegion, val!=Token.INVALID,
-            true, val, false, false, requestingClient, clientEvent, false);
+            true, val, false, false, requestingClient, clientEvent, false, allowReadFromHDFS);
       }
       return val;
     } else {
@@ -1453,7 +1441,7 @@ public class TXState implements TXStateInterface {
       // so we should never come here
       assert localRegion instanceof PartitionedRegion;
       PartitionedRegion pr = (PartitionedRegion)localRegion;
-      return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones);
+      return pr.getDataStore().getSerializedLocally(keyInfo, doNotLockEntry, null, null, returnTombstones, allowReadFromHDFS);
     }
   }
 
@@ -1531,17 +1519,9 @@ public class TXState implements TXStateInterface {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
    */
-  public Object findObject(KeyInfo key,
-                           LocalRegion r,
-                           boolean isCreate,
-                           boolean generateCallbacks,
-                           Object value,
-                           boolean disableCopyOnRead,
-                           boolean preferCD,
-                           ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent,
-                           boolean returnTombstones) {
-    return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+  public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
+      boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+    return r.findObjectInSystem(key, isCreate, this, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
   }
 
   private boolean readEntryAndCheckIfDestroyed(KeyInfo keyInfo, LocalRegion localRegion,

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
index 3fa9351..5da20d8 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateInterface.java
@@ -123,14 +123,8 @@ public interface TXStateInterface extends Synchronization, InternalDataView {
    * @param localRegion
    * @param updateStats TODO
    */
-  public Object getDeserializedValue(KeyInfo keyInfo,
-                                     LocalRegion localRegion,
-                                     boolean updateStats,
-                                     boolean disableCopyOnRead,
-                                     boolean preferCD,
-                                     EntryEventImpl clientEvent,
-                                     boolean returnTombstones,
-                                     boolean retainResult);
+  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult);
 
   public TXEvent getEvent();
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
index 0939ab0..e66302e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateProxyImpl.java
@@ -341,16 +341,9 @@ public class TXStateProxyImpl implements TXStateProxy {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
-  public Object getDeserializedValue(KeyInfo keyInfo,
-                                     LocalRegion localRegion,
-                                     boolean updateStats,
-                                     boolean disableCopyOnRead,
-                                     boolean preferCD,
-                                     EntryEventImpl clientEvent,
-                                     boolean returnTombstones,
-                                     boolean retainResult) {
-    Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false,
-      retainResult);
+  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
+    Object val = getRealDeal(keyInfo, localRegion).getDeserializedValue(keyInfo, localRegion, updateStats, disableCopyOnRead, preferCD, null, false, allowReadFromHDFS, retainResult);
     if (val != null) {
       // fixes bug 51057: TXStateStub  on client always returns null, so do not increment
       // the operation count it will be incremented in findObject()
@@ -606,13 +599,13 @@ public class TXStateProxyImpl implements TXStateProxy {
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
    */
   public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate,
-                           boolean generateCallbacks, Object value, boolean disableCopyOnRead,
-                           boolean preferCD, ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent, boolean returnTombstones) {
+      boolean generateCallbacks, Object value, boolean disableCopyOnRead,
+      boolean preferCD, ClientProxyMembershipID requestingClient,
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
     try {
       this.operationCount++;
       Object retVal = getRealDeal(key, r).findObject(key, r, isCreate, generateCallbacks,
-          value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false);
+          value, disableCopyOnRead, preferCD, requestingClient, clientEvent, false, allowReadFromHDFS);
       trackBucketForTx(key);
       return retVal;
     } catch (TransactionDataRebalancedException | PrimaryBucketException re) {
@@ -727,14 +720,9 @@ public class TXStateProxyImpl implements TXStateProxy {
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
    */
-  public Object getSerializedValue(LocalRegion localRegion,
-                                   KeyInfo key,
-                                   boolean doNotLockEntry,
-                                   ClientProxyMembershipID requestingClient,
-                                   EntryEventImpl clientEvent,
-                                   boolean returnTombstones) throws DataLocationException {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     this.operationCount++;
-    return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones);
+    return getRealDeal(key, localRegion).getSerializedValue(localRegion, key, doNotLockEntry, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
   }
 
   /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
index 0b226e0..ac35425 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXStateStub.java
@@ -184,14 +184,8 @@ public abstract class TXStateStub implements TXStateInterface {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
-  public Object getDeserializedValue(KeyInfo keyInfo,
-                                     LocalRegion localRegion,
-                                     boolean updateStats,
-                                     boolean disableCopyOnRead,
-                                     boolean preferCD,
-                                     EntryEventImpl clientEvent,
-                                     boolean returnTombstones,
-                                     boolean retainResult) {
+  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS,  boolean retainResult) {
     // We never have a local value if we are a stub...
     return null;
   }
@@ -379,17 +373,10 @@ public abstract class TXStateStub implements TXStateInterface {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
    */
-  public Object findObject(KeyInfo keyInfo,
-                           LocalRegion r,
-                           boolean isCreate,
-                           boolean generateCallbacks,
-                           Object value,
-                           boolean disableCopyOnRead,
-                           boolean preferCD,
-                           ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent,
-                           boolean returnTombstones) {
-    return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent);
+  public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
+      boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+    return getTXRegionStub(r).findObject(keyInfo,isCreate,generateCallbacks,value, preferCD, requestingClient, clientEvent, allowReadFromHDFS);
   }
 
   /* (non-Javadoc)
@@ -445,12 +432,7 @@ public abstract class TXStateStub implements TXStateInterface {
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object)
    */
-  public Object getSerializedValue(LocalRegion localRegion,
-                                   KeyInfo key,
-                                   boolean doNotLockEntry,
-                                   ClientProxyMembershipID requestingClient,
-                                   EntryEventImpl clientEvent,
-                                   boolean returnTombstones) {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
     throw new UnsupportedOperationException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
index 269f891..a17650c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/UserSpecifiedRegionAttributes.java
@@ -114,6 +114,10 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
    */
   private boolean hasCloningEnabled = false;
   
+  private boolean hasHDFSStoreName = false;
+  
+  private boolean hasHDFSWriteOnly = false;
+  
 /**
    * Whether this region has entry value compression.
    * 
@@ -522,7 +526,7 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
   {
     this.hasDiskSynchronous = val;
   }
-  private static final int HAS_COUNT = 41;
+  private static final int HAS_COUNT = 43;
   
   public void initHasFields(UserSpecifiedRegionAttributes<K,V> other)
   {
@@ -598,4 +602,22 @@ public abstract class UserSpecifiedRegionAttributes<K,V> implements RegionAttrib
   public List getIndexes() {
     return this.indexes;
   }
+
+  public boolean hasHDFSStoreName()
+  {
+    return this.hasHDFSStoreName;
+  }
+  public void setHasHDFSStoreName(boolean val)
+  {
+    this.hasHDFSStoreName = val;
+  }
+  
+  public void setHasHDFSWriteOnly(boolean val)
+  {
+    this.hasHDFSWriteOnly = val;
+  }
+  public boolean hasHDFSWriteOnly()
+  {
+    return this.hasHDFSWriteOnly;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
index 54133cc..f587e39 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ValidatingDiskRegion.java
@@ -408,6 +408,19 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor
       // TODO Auto-generated method stub
     }
     @Override
+    public boolean isMarkedForEviction() {
+      // TODO Auto-generated method stub
+      return false;
+    }
+    @Override
+    public void setMarkedForEviction() {
+      // TODO Auto-generated method stub
+    }
+    @Override
+    public void clearMarkedForEviction() {
+      // TODO Auto-generated method stub
+    }
+    @Override
     public boolean isInvalid() {
       // TODO Auto-generated method stub
       return false;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
index ea47e91..d3078a9 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/FetchBulkEntriesMessage.java
@@ -299,7 +299,7 @@ public final class FetchBulkEntriesMessage extends PartitionMessage
             Object key = it.next();
             VersionTagHolder clientEvent = new VersionTagHolder();
             Object value = map.get(key, null, true, true, true, null,
-                clientEvent, allowTombstones);
+                clientEvent, allowTombstones, false);
 
             if (needToWriteBucketInfo) {
               DataSerializer.writePrimitiveInt(map.getId(), mos);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
index 3fef790..d7e50f1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/GetMessage.java
@@ -93,9 +93,11 @@ public final class GetMessage extends PartitionMessageWithDirectReply
   
   private boolean returnTombstones;
 
+  private boolean allowReadFromHDFS;
   // reuse some flags
   protected static final int HAS_LOADER = NOTIFICATION_ONLY;
   protected static final int CAN_START_TX = IF_NEW;
+  protected static final int READ_FROM_HDFS = IF_OLD;
 
   /**
    * Empty constructor to satisfy {@link DataSerializer} requirements
@@ -104,14 +106,15 @@ public final class GetMessage extends PartitionMessageWithDirectReply
   }
   
   private GetMessage(InternalDistributedMember recipient, int regionId,
-                     DirectReplyProcessor processor,
-                     final Object key, final Object aCallbackArgument, ClientProxyMembershipID context,
-                     boolean returnTombstones) {
+      DirectReplyProcessor processor,
+      final Object key, final Object aCallbackArgument, ClientProxyMembershipID context,
+      boolean returnTombstones, boolean allowReadFromHDFS) {
     super(recipient, regionId, processor);
     this.key = key;
     this.cbArg = aCallbackArgument;
     this.context = context;
     this.returnTombstones = returnTombstones;
+	this.allowReadFromHDFS = allowReadFromHDFS;
   }
 
   private static final boolean ORDER_PR_GETS = Boolean.getBoolean("gemfire.order-pr-gets");
@@ -188,7 +191,7 @@ public final class GetMessage extends PartitionMessageWithDirectReply
         KeyInfo keyInfo = r.getKeyInfo(key, cbArg);
         boolean lockEntry = forceUseOfPRExecutor || isDirectAck();
         
-        val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones);
+        val = r.getDataView().getSerializedValue(r, keyInfo, !lockEntry, this.context, event, returnTombstones, allowReadFromHDFS);
         
         if(val == BucketRegion.REQUIRES_ENTRY_LOCK) {
           Assert.assertTrue(!lockEntry);
@@ -269,12 +272,14 @@ public final class GetMessage extends PartitionMessageWithDirectReply
   @Override
   protected short computeCompressedShort(short s) {
     s = super.computeCompressedShort(s);
+    if (this.allowReadFromHDFS) s |= READ_FROM_HDFS;
     return s;
   }
 
   @Override
   protected void setBooleans(short s, DataInput in) throws ClassNotFoundException, IOException {
     super.setBooleans(s, in);
+    if ((s & READ_FROM_HDFS) != 0) this.allowReadFromHDFS = true;
   }
 
   public void setKey(Object key)
@@ -298,18 +303,15 @@ public final class GetMessage extends PartitionMessageWithDirectReply
    * @throws ForceReattemptException if the peer is no longer available
    */
   public static GetResponse send(InternalDistributedMember recipient,
-                                 PartitionedRegion r,
-                                 final Object key,
-                                 final Object aCallbackArgument,
-                                 ClientProxyMembershipID requestingClient,
-                                 boolean returnTombstones)
+      PartitionedRegion r, final Object key, final Object aCallbackArgument,
+      ClientProxyMembershipID requestingClient, boolean returnTombstones, boolean allowReadFromHDFS)
       throws ForceReattemptException
   {
     Assert.assertTrue(recipient != null,
         "PRDistribuedGetReplyMessage NULL reply message");
     GetResponse p = new GetResponse(r.getSystem(), Collections.singleton(recipient), key);
     GetMessage m = new GetMessage(recipient, r.getPRId(), p,
-        key, aCallbackArgument, requestingClient, returnTombstones);
+        key, aCallbackArgument, requestingClient, returnTombstones, allowReadFromHDFS);
     Set failures = r.getDistributionManager().putOutgoing(m);
     if (failures != null && failures.size() > 0) {
       throw new ForceReattemptException(LocalizedStrings.GetMessage_FAILED_SENDING_0.toLocalizedString(m));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
index 8aaf587..a88f96f 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutAllPRMessage.java
@@ -101,8 +101,9 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
 
   protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
   protected static final short SKIP_CALLBACKS = (HAS_BRIDGE_CONTEXT << 1);
+  protected static final short FETCH_FROM_HDFS = (SKIP_CALLBACKS << 1);
   //using the left most bit for IS_PUT_DML, the last available bit
-  protected static final short IS_PUT_DML = (short) (SKIP_CALLBACKS << 1);
+  protected static final short IS_PUT_DML = (short) (FETCH_FROM_HDFS << 1);
 
   private transient InternalDistributedSystem internalDs;
 
@@ -117,6 +118,9 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
   
   transient VersionedObjectList versions = null;
 
+  /** whether this operation should fetch oldValue from HDFS */
+  private boolean fetchFromHDFS;
+  
   private boolean isPutDML;
   /**
    * Empty constructor to satisfy {@link DataSerializer}requirements
@@ -125,7 +129,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
   }
 
   public PutAllPRMessage(int bucketId, int size, boolean notificationOnly,
-      boolean posDup, boolean skipCallbacks, Object callbackArg, boolean isPutDML) {
+      boolean posDup, boolean skipCallbacks, Object callbackArg, boolean fetchFromHDFS, boolean isPutDML) {
     this.bucketId = Integer.valueOf(bucketId);
     putAllPRData = new PutAllEntryData[size];
     this.notificationOnly = notificationOnly;
@@ -133,7 +137,8 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
     this.skipCallbacks = skipCallbacks;
     this.callbackArg = callbackArg;
     initTxMemberId();
-    this.isPutDML = isPutDML;
+    this.fetchFromHDFS = fetchFromHDFS;
+    this.isPutDML = isPutDML; 
   }
 
   public void addEntry(PutAllEntryData entry) {
@@ -302,6 +307,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
     s = super.computeCompressedShort(s);
     if (this.bridgeContext != null) s |= HAS_BRIDGE_CONTEXT;
     if (this.skipCallbacks) s |= SKIP_CALLBACKS;
+    if (this.fetchFromHDFS) s |= FETCH_FROM_HDFS;
     if (this.isPutDML) s |= IS_PUT_DML;
     return s;
   }
@@ -311,6 +317,7 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
       ClassNotFoundException {
     super.setBooleans(s, in);
     this.skipCallbacks = ((s & SKIP_CALLBACKS) != 0);
+    this.fetchFromHDFS = ((s & FETCH_FROM_HDFS) != 0);
     this.isPutDML = ((s & IS_PUT_DML) != 0);
   }
 
@@ -488,6 +495,9 @@ public final class PutAllPRMessage extends PartitionMessageWithDirectReply
 
             ev.setPutAllOperation(dpao);
 
+            // set the fetchFromHDFS flag
+            ev.setFetchFromHDFS(this.fetchFromHDFS);
+            
             // make sure a local update inserts a cache de-serializable
             ev.makeSerializedNewValue();
             

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
index a6a39dc..d5abaa1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PutMessage.java
@@ -182,6 +182,9 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
 
   private VersionTag versionTag;
 
+  /** whether this operation should fetch oldValue from HDFS*/
+  private transient boolean fetchFromHDFS;
+
   private transient boolean isPutDML;
   
   // additional bitmask flags used for serialization/deserialization
@@ -205,6 +208,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
   // masks there are taken
   // also switching the masks will impact backwards compatibility. Need to
   // verify if it is ok to break backwards compatibility
+  protected static final int FETCH_FROM_HDFS = getNextByteMask(HAS_CALLBACKARG);  
 
   /*
   private byte[] oldValBytes;
@@ -604,6 +608,9 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
       this.originalSender = (InternalDistributedMember)DataSerializer
         .readObject(in);
     }
+    if ((extraFlags & FETCH_FROM_HDFS) != 0) {
+      this.fetchFromHDFS = true;
+    }
     this.eventId = new EventID();
     InternalDataSerializer.invokeFromData(this.eventId, in);
     
@@ -690,6 +697,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
       extraFlags |= HAS_DELTA_WITH_FULL_VALUE;
     }
     if (this.originalSender != null) extraFlags |= HAS_ORIGINAL_SENDER;
+    if (this.event.isFetchFromHDFS()) extraFlags |= FETCH_FROM_HDFS;
     out.writeByte(extraFlags);
 
     DataSerializer.writeObject(getKey(), out);
@@ -814,6 +822,7 @@ public final class PutMessage extends PartitionMessageWithDirectReply implements
     ev.setCausedByMessage(this);
     ev.setInvokePRCallbacks(!notificationOnly);
     ev.setPossibleDuplicate(this.posDup);
+	ev.setFetchFromHDFS(this.fetchFromHDFS);
     ev.setPutDML(this.isPutDML);
     /*if (this.hasOldValue) {
       if (this.oldValueIsSerialized) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
new file mode 100644
index 0000000..5c199ae
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ByteComparator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.persistence.soplog;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
+
+/**
+ * Compares objects byte-by-byte.  This is fast and sufficient for cases when
+ * lexicographic ordering is not important or the serialization is order-
+ * preserving. 
+ * 
+ */
+public class ByteComparator implements SerializedComparator {
+  @Override
+  public int compare(byte[] rhs, byte[] lhs) {
+    return compare(rhs, 0, rhs.length, lhs, 0, lhs.length);
+  }
+
+  @Override
+  public int compare(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) {
+    return compareBytes(r, rOff, rLen, l, lOff, lLen);
+  }
+  
+  /**
+   * Compares two byte arrays element-by-element.
+   * 
+   * @param r the right array
+   * @param rOff the offset of r
+   * @param rLen the length of r to compare
+   * @param l the left array
+   * @param lOff the offset of l
+   * @param lLen the length of l to compare
+   * @return -1 if r < l; 0 if r == l; 1 if r > 1
+   */
+  
+  public static int compareBytes(byte[] r, int rOff, int rLen, byte[] l, int lOff, int lLen) {
+    return Bytes.compareTo(r, rOff, rLen, l, lOff, lLen);
+  }
+}