You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:01 UTC

[15/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
index c5b5d3a..74efd51 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/wan/GatewaySender.java
@@ -96,6 +96,8 @@ public interface GatewaySender {
 
   public static final int DEFAULT_DISPATCHER_THREADS = 5;
   
+  public static final int DEFAULT_HDFS_DISPATCHER_THREADS = 5;
+  
   public static final OrderPolicy DEFAULT_ORDER_POLICY = OrderPolicy.KEY;
   /**
    * The default maximum amount of memory (MB) to allow in the queue before

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
index bd78f5a..77f24a3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DSFIDFactory.java
@@ -52,6 +52,7 @@ import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusRequest;
 import com.gemstone.gemfire.cache.client.internal.locator.LocatorStatusResponse;
 import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
 import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.internal.CqEntry;
 import com.gemstone.gemfire.cache.query.internal.CumulativeNonDistinctResults;
@@ -1022,6 +1023,8 @@ public final class DSFIDFactory implements DataSerializableFixedID {
         RemoteFetchVersionMessage.FetchVersionReplyMessage.class);
     registerDSFID(RELEASE_CLEAR_LOCK_MESSAGE, ReleaseClearLockMessage.class);
     registerDSFID(PR_TOMBSTONE_MESSAGE, PRTombstoneMessage.class);
+    registerDSFID(HDFS_GATEWAY_EVENT_IMPL, HDFSGatewayEventImpl.class);
+    
     registerDSFID(REQUEST_RVV_MESSAGE, InitialImageOperation.RequestRVVMessage.class);
     registerDSFID(RVV_REPLY_MESSAGE, InitialImageOperation.RVVReplyMessage.class);
     registerDSFID(SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE, SnappyCompressedCachedDeserializable.class);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
index 7427f90..5d52346 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/DataSerializableFixedID.java
@@ -103,6 +103,7 @@ public interface DataSerializableFixedID extends SerializationVersions {
   public static final short JOIN_RESPONSE = -143;
   public static final short JOIN_REQUEST = -142;
 
+  public static final short HDFS_GATEWAY_EVENT_IMPL = -141;
   public static final short SNAPPY_COMPRESSED_CACHED_DESERIALIZABLE = -140;
   
   public static final short GATEWAY_EVENT_IMPL = -136;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
index f8740db..9b0446f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/admin/remote/RemoteRegionAttributes.java
@@ -33,6 +33,7 @@ import com.gemstone.gemfire.cache.CacheLoader;
 import com.gemstone.gemfire.cache.CacheLoaderException;
 import com.gemstone.gemfire.cache.CacheWriter;
 import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
 import com.gemstone.gemfire.cache.CustomExpiry;
 import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.Declarable;
@@ -49,7 +50,10 @@ import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.RegionEvent;
 import com.gemstone.gemfire.cache.Scope;
 import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.compression.CompressionException;
 import com.gemstone.gemfire.compression.Compressor;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.EvictionAttributesImpl;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
@@ -104,6 +108,8 @@ public class RemoteRegionAttributes implements RegionAttributes,
   private String[] gatewaySendersDescs;
   private boolean isGatewaySenderEnabled = false;
   private String[] asyncEventQueueDescs;
+  private String hdfsStoreName;
+  private boolean hdfsWriteOnly;
   private String compressorDesc;
   private boolean offHeap;
 
@@ -155,6 +161,8 @@ public class RemoteRegionAttributes implements RegionAttributes,
     this.isDiskSynchronous = attr.isDiskSynchronous();
     this.gatewaySendersDescs = getDescs(attr.getGatewaySenderIds().toArray());
     this.asyncEventQueueDescs = getDescs(attr.getAsyncEventQueueIds().toArray());
+	this.hdfsStoreName = attr.getHDFSStoreName();
+    this.hdfsWriteOnly = attr.getHDFSWriteOnly();
     this.compressorDesc = getDesc(attr.getCompressor());
     this.offHeap = attr.getOffHeap();
   }
@@ -411,6 +419,7 @@ public class RemoteRegionAttributes implements RegionAttributes,
   
     DataSerializer.writeString(this.compressorDesc, out);
     out.writeBoolean(this.offHeap);
+    DataSerializer.writeString(this.hdfsStoreName, out);
   }
   
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
@@ -459,6 +468,7 @@ public class RemoteRegionAttributes implements RegionAttributes,
   
     this.compressorDesc = DataSerializer.readString(in);
     this.offHeap = in.readBoolean();
+    this.hdfsStoreName = DataSerializer.readString(in);
   }
   
   private String[] getDescs(Object[] l) {
@@ -626,6 +636,15 @@ public class RemoteRegionAttributes implements RegionAttributes,
     return this.evictionAttributes;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CustomEvictionAttributes getCustomEvictionAttributes() {
+    // TODO: HDFS: no support for custom eviction attributes from remote yet
+    return null;
+  }
+
   public boolean getCloningEnabled() {
     // TODO Auto-generated method stub
     return this.cloningEnable;
@@ -634,6 +653,12 @@ public class RemoteRegionAttributes implements RegionAttributes,
   public String getDiskStoreName() {
     return this.diskStoreName;
   }
+  public String getHDFSStoreName() {
+	    return this.hdfsStoreName;
+	  }
+  public boolean getHDFSWriteOnly() {
+    return this.hdfsWriteOnly;
+  }
   public boolean isDiskSynchronous() {
     return this.isDiskSynchronous;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
index 92eaa01..1f8da88 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractBucketRegionQueue.java
@@ -34,6 +34,8 @@ import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
 import com.gemstone.gemfire.internal.cache.lru.LRUStatistics;
 import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
@@ -457,8 +459,17 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     }
     waitIfQueueFull();
     
+    int sizeOfHdfsEvent = -1;
     try {
-
+      if (this instanceof HDFSBucketRegionQueue) {
+        // need to fetch the size before event is inserted in queue.
+        // fix for #50016
+        if (this.getBucketAdvisor().isPrimary()) {
+          HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
+          sizeOfHdfsEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
+        }
+      }
+      
       didPut = virtualPut(event, false, false, null, false, startPut, true);
       
       checkReadiness();
@@ -481,7 +492,7 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
       destroyKey(key);
       didPut = false;
     } else {
-      addToEventQueue(key, didPut, event);
+      addToEventQueue(key, didPut, event, sizeOfHdfsEvent);
     }
     return didPut;
   }
@@ -511,7 +522,8 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
   }
   
   protected abstract void clearQueues();
-  protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event);
+  protected abstract void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, 
+      int sizeOfHdfsEvent);
   
   @Override
   public void afterAcquiringPrimaryState() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
index d37f025..10644cb 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegion.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.logging.log4j.Logger;
@@ -45,6 +46,7 @@ import com.gemstone.gemfire.cache.CacheLoaderException;
 import com.gemstone.gemfire.cache.CacheStatistics;
 import com.gemstone.gemfire.cache.CacheWriter;
 import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
 import com.gemstone.gemfire.cache.CustomExpiry;
 import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.DiskWriteAttributes;
@@ -52,6 +54,7 @@ import com.gemstone.gemfire.cache.EntryExistsException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.EvictionAttributes;
 import com.gemstone.gemfire.cache.EvictionAttributesMutator;
+import com.gemstone.gemfire.cache.EvictionCriteria;
 import com.gemstone.gemfire.cache.ExpirationAction;
 import com.gemstone.gemfire.cache.ExpirationAttributes;
 import com.gemstone.gemfire.cache.MembershipAttributes;
@@ -97,6 +100,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.util.ArrayUtils;
 import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
+import com.google.common.util.concurrent.Service.State;
 
 /**
  * Takes care of RegionAttributes, AttributesMutator, and some no-brainer method
@@ -232,6 +236,8 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
 
   protected EvictionAttributesImpl evictionAttributes = new EvictionAttributesImpl();
 
+  protected CustomEvictionAttributes customEvictionAttributes;
+
   /** The membership attributes defining required roles functionality */
   protected MembershipAttributes membershipAttributes;
 
@@ -254,6 +260,10 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
   
   protected String poolName;
   
+  protected String hdfsStoreName;
+  
+  protected boolean hdfsWriteOnly;
+  
   protected Compressor compressor;
   
   /**
@@ -888,6 +898,16 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
     return this.subscriptionAttributes;
   }
   
+  @Override
+  public final String getHDFSStoreName() {
+    return this.hdfsStoreName;
+  }
+  
+  @Override
+  public final boolean getHDFSWriteOnly() {
+    return this.hdfsWriteOnly;
+  }
+  
   /**
    * Get IndexManger for region
    */
@@ -1708,6 +1728,7 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
       this.setEvictionController(this.evictionAttributes
           .createEvictionController(this, attrs.getOffHeap()));
     }
+    this.customEvictionAttributes = attrs.getCustomEvictionAttributes();
     storeCacheListenersField(attrs.getCacheListeners());
     assignCacheLoader(attrs.getCacheLoader());
     assignCacheWriter(attrs.getCacheWriter());
@@ -1765,6 +1786,8 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
             + "when multiuser-authentication is true.");
       }
     }
+    this.hdfsStoreName = attrs.getHDFSStoreName();
+    this.hdfsWriteOnly = attrs.getHDFSWriteOnly();
 
     this.diskStoreName = attrs.getDiskStoreName();
     this.isDiskSynchronous = attrs.isDiskSynchronous();
@@ -1830,12 +1853,52 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
     return this.evictionAttributes;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CustomEvictionAttributes getCustomEvictionAttributes() {
+    return this.customEvictionAttributes;
+  }
+
   public EvictionAttributesMutator getEvictionAttributesMutator()
   {
     return this.evictionAttributes;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CustomEvictionAttributes setCustomEvictionAttributes(long newStart,
+      long newInterval) {
+    checkReadiness();
+
+    if (this.customEvictionAttributes == null) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.AbstractRegion_NO_CUSTOM_EVICTION_SET
+              .toLocalizedString(getFullPath()));
+    }
+
+    if (newStart == 0) {
+      newStart = this.customEvictionAttributes.getEvictorStartTime();
+    }
+    this.customEvictionAttributes = new CustomEvictionAttributesImpl(
+        this.customEvictionAttributes.getCriteria(), newStart, newInterval,
+        newStart == 0 && newInterval == 0);
+
+//    if (this.evService == null) {
+//      initilializeCustomEvictor();
+//    } else {// we are changing the earlier one which is already started.
+//      EvictorService service = getEvictorTask();
+//      service.changeEvictionInterval(newInterval);
+//      if (newStart != 0)
+//        service.changeStartTime(newStart);
+//    }
 
+    return this.customEvictionAttributes;
+  }
+  
   public void setEvictionController(LRUAlgorithm evictionController)
   {
     this.evictionController = evictionController;
@@ -1974,6 +2037,7 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
   
   /**
   * @since 8.1
+  * property used to find region operations that reach out to HDFS multiple times
   */
   @Override
   public ExtensionPoint<Region<?, ?>> getExtensionPoint() {
@@ -1983,4 +2047,87 @@ public abstract class AbstractRegion implements Region, RegionAttributes,
   public boolean getOffHeap() {
     return this.offHeap;
   }
+  /**
+   * property used to find region operations that reach out to HDFS multiple times
+   */
+  private static final boolean DEBUG_HDFS_CALLS = Boolean.getBoolean("DebugHDFSCalls");
+
+  /**
+   * throws exception if region operation goes out to HDFS multiple times
+   */
+  private static final boolean THROW_ON_MULTIPLE_HDFS_CALLS = Boolean.getBoolean("throwOnMultipleHDFSCalls");
+
+  private ThreadLocal<CallLog> logHDFSCalls = DEBUG_HDFS_CALLS ? new ThreadLocal<CallLog>() : null;
+
+  public void hdfsCalled(Object key) {
+    if (!DEBUG_HDFS_CALLS) {
+      return;
+    }
+    logHDFSCalls.get().addStack(new Throwable());
+    logHDFSCalls.get().setKey(key);
+  }
+  public final void operationStart() {
+    if (!DEBUG_HDFS_CALLS) {
+      return;
+    }
+    if (logHDFSCalls.get() == null) {
+      logHDFSCalls.set(new CallLog());
+      //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationStart", new Throwable());
+    } else {
+      logHDFSCalls.get().incNestedCall();
+      //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:incNestedCall:", new Throwable());
+    }
+  }
+  public final void operationCompleted() {
+    if (!DEBUG_HDFS_CALLS) {
+      return;
+    }
+    //InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:operationCompleted", new Throwable());
+    if (logHDFSCalls.get() != null && logHDFSCalls.get().decNestedCall() < 0) {
+      logHDFSCalls.get().assertCalls();
+      logHDFSCalls.set(null);
+    }
+  }
+
+  public static class CallLog {
+    private List<Throwable> stackTraces = new ArrayList<Throwable>();
+    private Object key;
+    private int nestedCall = 0;
+    public void incNestedCall() {
+      nestedCall++;
+    }
+    public int decNestedCall() {
+      return --nestedCall;
+    }
+    public void addStack(Throwable stack) {
+      this.stackTraces.add(stack);
+    }
+    public void setKey(Object key) {
+      this.key = key;
+    }
+    public void assertCalls() {
+      if (stackTraces.size() > 1) {
+        Throwable firstTrace = new Throwable();
+        Throwable lastTrace = firstTrace;
+        for (Throwable t : this.stackTraces) {
+          lastTrace.initCause(t);
+          lastTrace = t;
+        }
+        if (THROW_ON_MULTIPLE_HDFS_CALLS) {
+          throw new RuntimeException("SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
+        } else {
+          InternalDistributedSystem.getLoggerI18n().warning(LocalizedStrings.DEBUG, "SWAP:For key:"+key+" HDFS get called more than once: ", firstTrace);
+        }
+      }
+    }
+  }
+
+  public EvictionCriteria getEvictionCriteria() {
+    EvictionCriteria criteria = null;
+    if (this.customEvictionAttributes != null
+        && !this.customEvictionAttributes.isEvictIncoming()) {
+      criteria = this.customEvictionAttributes.getCriteria();
+    }
+    return criteria;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
index 46a851d..b936e3f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionEntry.java
@@ -870,7 +870,15 @@ public abstract class AbstractRegionEntry implements RegionEntry,
         removeEntry = true;
       }
 
-      if (removeEntry) {
+      // See #47887, we do not insert a tombstone for evicted HDFS
+      // entries since the value is still present in HDFS
+      // Check if we have to evict or just do destroy.
+      boolean forceRemoveEntry = 
+          (event.isEviction() || event.isExpiration()) 
+          && event.getRegion().isUsedForPartitionedRegionBucket()
+          && event.getRegion().getPartitionedRegion().isHDFSRegion();
+
+      if (removeEntry || forceRemoveEntry) {
         boolean isThisTombstone = isTombstone();
         if(inTokenMode && !event.getOperation().isEviction()) {
           setValue(region, Token.DESTROYED);  
@@ -1390,7 +1398,27 @@ public abstract class AbstractRegionEntry implements RegionEntry,
   /**
    * {@inheritDoc}
    */
+  @Override
+  public final boolean isMarkedForEviction() {
+    return areAnyBitsSet(MARKED_FOR_EVICTION);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void setMarkedForEviction() {
+    setBits(MARKED_FOR_EVICTION);
+  }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public final void clearMarkedForEviction() {
+    clearBits(~MARKED_FOR_EVICTION);
+  }
+  
   @Override
   public final synchronized void decRefCount(NewLRUClockHand lruList, LocalRegion lr) {
     if (TXManagerImpl.decRefCount(this)) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 75a1e32..3286373 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -18,6 +18,7 @@
 package com.gemstone.gemfire.internal.cache;
 
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.HashSet;
@@ -35,6 +36,7 @@ import com.gemstone.gemfire.InvalidDeltaException;
 import com.gemstone.gemfire.cache.CacheRuntimeException;
 import com.gemstone.gemfire.cache.CacheWriter;
 import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
 import com.gemstone.gemfire.cache.DiskAccessException;
 import com.gemstone.gemfire.cache.EntryExistsException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -81,6 +83,9 @@ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
 import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
+import com.gemstone.gemfire.pdx.PdxInstance;
+import com.gemstone.gemfire.pdx.PdxSerializationException;
+import com.gemstone.gemfire.pdx.internal.ConvertableToBytes;
 
 /**
  * Abstract implementation of {@link RegionMap}that has all the common
@@ -298,6 +303,10 @@ public abstract class AbstractRegionMap implements RegionMap {
 
   public RegionEntry getEntry(Object key) {
     RegionEntry re = (RegionEntry)_getMap().get(key);
+    if (re != null && re.isMarkedForEviction()) {
+      // entry has been faulted in from HDFS
+      return null;
+    }
     return re;
   }
 
@@ -328,12 +337,16 @@ public abstract class AbstractRegionMap implements RegionMap {
   @Override
   public final RegionEntry getOperationalEntryInVM(Object key) {
     RegionEntry re = (RegionEntry)_getMap().get(key);
+    if (re != null && re.isMarkedForEviction()) {
+      // entry has been faulted in from HDFS
+      return null;
+    }
     return re;
   }
  
 
   public final void removeEntry(Object key, RegionEntry re, boolean updateStat) {
-    if (re.isTombstone() && _getMap().get(key) == re){
+    if (re.isTombstone() && _getMap().get(key) == re && !re.isMarkedForEviction()){
       logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
       return; // can't remove tombstones except from the tombstone sweeper
     }
@@ -349,7 +362,7 @@ public abstract class AbstractRegionMap implements RegionMap {
       EntryEventImpl event, final LocalRegion owner,
       final IndexUpdater indexUpdater) {
     boolean success = false;
-    if (re.isTombstone()&& _getMap().get(key) == re) {
+    if (re.isTombstone()&& _getMap().get(key) == re && !re.isMarkedForEviction()) {
       logger.fatal(LocalizedMessage.create(LocalizedStrings.AbstractRegionMap_ATTEMPT_TO_REMOVE_TOMBSTONE), new Exception("stack trace"));
       return; // can't remove tombstones except from the tombstone sweeper
     }
@@ -358,6 +371,18 @@ public abstract class AbstractRegionMap implements RegionMap {
         indexUpdater.onEvent(owner, event, re);
       }
 
+      //This is messy, but custom eviction calls removeEntry
+      //rather than re.destroy I think to avoid firing callbacks, etc.
+      //However, the value still needs to be set to removePhase1
+      //in order to remove the entry from disk.
+      if(event.isCustomEviction() && !re.isRemoved()) {
+        try {
+          re.removePhase1(owner, false);
+        } catch (RegionClearedException e) {
+          //that's ok, we were just trying to do evict incoming eviction
+        }
+      }
+      
       if (_getMap().remove(key, re)) {
         re.removePhase2();
         success = true;
@@ -1144,7 +1169,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                         // transaction conflict (caused by eviction) when the entry
                         // is being added to transaction state.
                         if (isEviction) {
-                          if (!confirmEvictionDestroy(oldRe)) {
+                          if (!confirmEvictionDestroy(oldRe) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
                             opCompleted = false;
                             return opCompleted;
                           }
@@ -1399,7 +1424,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                   // See comment above about eviction checks
                   if (isEviction) {
                     assert expectedOldValue == null;
-                    if (!confirmEvictionDestroy(re)) {
+                    if (!confirmEvictionDestroy(re) || (owner.getEvictionCriteria() != null && !owner.getEvictionCriteria().doEvict(event))) {
                       opCompleted = false;
                       return opCompleted;
                     }
@@ -1481,6 +1506,12 @@ public abstract class AbstractRegionMap implements RegionMap {
                   }
                 } // !isRemoved
                 else { // already removed
+                  if (owner.isHDFSReadWriteRegion() && re.isRemovedPhase2()) {
+                    // For HDFS region there may be a race with eviction
+                    // so retry the operation. fixes bug 49150
+                    retry = true;
+                    continue;
+                  }
                   if (re.isTombstone() && event.getVersionTag() != null) {
                     // if we're dealing with a tombstone and this is a remote event
                     // (e.g., from cache client update thread) we need to update
@@ -2654,7 +2685,11 @@ public abstract class AbstractRegionMap implements RegionMap {
       boolean onlyExisting, boolean returnTombstone) {
     Object key = event.getKey();
     RegionEntry retVal = null;
-    retVal = getEntry(event);
+    if (event.isFetchFromHDFS()) {
+      retVal = getEntry(event);
+    } else {
+      retVal = getEntryInVM(key);
+    }
     if (onlyExisting) {
       if (!returnTombstone && (retVal != null && retVal.isTombstone())) {
         return null;
@@ -2953,6 +2988,47 @@ public abstract class AbstractRegionMap implements RegionMap {
                   else if (re != null && owner.isUsedForPartitionedRegionBucket()) {
                   BucketRegion br = (BucketRegion)owner;
                   CachePerfStats stats = br.getPartitionedRegion().getCachePerfStats();
+                  long startTime= stats.startCustomEviction();
+                  CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
+                  // No need to update indexes if entry was faulted in but operation did not succeed. 
+                  if (csAttr != null && (csAttr.isEvictIncoming() || re.isMarkedForEviction())) {
+                    
+                    if (csAttr.getCriteria().doEvict(event)) {
+                      stats.incEvictionsInProgress();
+                      // set the flag on event saying the entry should be evicted 
+                      // and not indexed
+                      @Released EntryEventImpl destroyEvent = EntryEventImpl.create (owner, Operation.DESTROY, event.getKey(),
+                          null/* newValue */, null, false, owner.getMyId());
+                      try {
+
+                      destroyEvent.setOldValueFromRegion();
+                      destroyEvent.setCustomEviction(true);
+                      destroyEvent.setPossibleDuplicate(event.isPossibleDuplicate());
+                      if(logger.isDebugEnabled()) {
+                        logger.debug("Evicting the entry " + destroyEvent);
+                      }
+                      if(result != null) {
+                        removeEntry(event.getKey(),re, true, destroyEvent,owner, indexUpdater);
+                      }
+                      else{
+                        removeEntry(event.getKey(),re, true, destroyEvent,owner, null);
+                      }
+                      //mark the region entry for this event as evicted 
+                      event.setEvicted();
+                      stats.incEvictions();
+                      if(logger.isDebugEnabled()) {
+                        logger.debug("Evicted the entry " + destroyEvent);
+                      }
+                      //removeEntry(event.getKey(), re);
+                      } finally {
+                        destroyEvent.release();
+                        stats.decEvictionsInProgress();
+                      }
+                    } else {
+                      re.clearMarkedForEviction();
+                    }
+                  }
+                  stats.endCustomEviction(startTime);
                 }
               } // try
             }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
index c241c6b..3038059 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java
@@ -1316,6 +1316,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor  {
             ((BucketRegion)br).processPendingSecondaryExpires();
           }
           if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
+            // i.e. this stats is not getting incremented for HDFSBucketRegionQueue!!
             BucketRegionQueue brq = (BucketRegionQueue)br;
             brq.incQueueSize(brq.size());
           }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index f5ae0fb..6e4f426 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 
 import org.apache.logging.log4j.Logger;
@@ -34,6 +35,7 @@ import com.gemstone.gemfire.CancelException;
 import com.gemstone.gemfire.CopyHelper;
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.DeltaSerializationException;
+import com.gemstone.gemfire.GemFireIOException;
 import com.gemstone.gemfire.InternalGemFireError;
 import com.gemstone.gemfire.InvalidDeltaException;
 import com.gemstone.gemfire.SystemFailure;
@@ -41,16 +43,20 @@ import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.CacheWriter;
 import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
 import com.gemstone.gemfire.cache.DiskAccessException;
 import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.EvictionAction;
 import com.gemstone.gemfire.cache.EvictionAlgorithm;
 import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.EvictionCriteria;
 import com.gemstone.gemfire.cache.ExpirationAction;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
 import com.gemstone.gemfire.cache.query.internal.IndexUpdater;
 import com.gemstone.gemfire.distributed.DistributedMember;
@@ -84,11 +90,13 @@ import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
 import com.gemstone.gemfire.internal.concurrent.Atomics;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
 import com.gemstone.gemfire.internal.offheap.annotations.Released;
 import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
@@ -225,6 +233,8 @@ implements Bucket
     return eventSeqNum;
   }
 
+  protected final AtomicReference<HoplogOrganizer> hoplog = new AtomicReference<HoplogOrganizer>();
+  
   public BucketRegion(String regionName, RegionAttributes attrs,
       LocalRegion parentRegion, GemFireCacheImpl cache,
       InternalRegionArguments internalRegionArgs) {
@@ -882,6 +892,12 @@ implements Bucket
 
     beginLocalWrite(event);
     try {
+      // increment the tailKey so that invalidate operations are written to HDFS
+      if (this.partitionedRegion.hdfsStoreName != null) {
+        /* MergeGemXDHDFSToGFE Disabled this while porting. Is this required? */
+        //assert this.partitionedRegion.isLocalParallelWanEnabled();
+        handleWANEvent(event);
+      }
       // which performs the local op.
       // The ARM then calls basicInvalidatePart2 with the entry synchronized.
       if ( !hasSeenEvent(event) ) {
@@ -1136,6 +1152,20 @@ implements Bucket
       if (this.partitionedRegion.isParallelWanEnabled()) {
         handleWANEvent(event);
       }
+      // In GemFire EVICT_DESTROY is not distributed, so in order to remove the entry
+      // from memory, allow the destroy to proceed. fixes #49784
+      if (event.isLoadedFromHDFS() && !getBucketAdvisor().isPrimary()) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Put the destory event in HDFS queue on secondary "
+              + "and return as event is HDFS loaded " + event);
+        }
+        notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
+        return;
+      }else{
+        if (logger.isDebugEnabled()) {
+          logger.debug("Going ahead with the destroy on GemFire system");
+        }
+      }
       // This call should invoke AbstractRegionMap (aka ARM) destroy method
       // which calls the CacheWriter, then performs the local op.
       // The ARM then calls basicDestroyPart2 with the entry synchronized.
@@ -1334,7 +1364,39 @@ implements Bucket
   }
 
   @Override
+  public boolean isHDFSRegion() {
+    return this.partitionedRegion.isHDFSRegion();
+  }
+
+  @Override
+  public boolean isHDFSReadWriteRegion() {
+    return this.partitionedRegion.isHDFSReadWriteRegion();
+  }
+
+  @Override
+  protected boolean isHDFSWriteOnly() {
+    return this.partitionedRegion.isHDFSWriteOnly();
+  }
+
+  @Override
   public int sizeEstimate() {
+    if (isHDFSReadWriteRegion()) {
+      try {
+        checkForPrimary();
+        ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+        if (q == null) return 0;
+        int hdfsBucketRegionSize = q.getBucketRegionQueue(
+            partitionedRegion, getId()).size();
+        int hoplogEstimate = (int) getHoplogOrganizer().sizeEstimate();
+        if (logger.isDebugEnabled()) {
+          logger.debug("for bucket " + getName() + " estimateSize returning "
+                  + (hdfsBucketRegionSize + hoplogEstimate));
+        }
+        return hdfsBucketRegionSize + hoplogEstimate;
+      } catch (ForceReattemptException e) {
+        throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+      }
+    }
     return size();
   }
 
@@ -1391,14 +1453,14 @@ implements Bucket
    *                 if there is a serialization problem
    * see LocalRegion#getDeserializedValue(RegionEntry, KeyInfo, boolean, boolean,  boolean, EntryEventImpl, boolean, boolean, boolean)
    */
-  private RawValue getSerialized(Object key,
-                                 boolean updateStats,
-                                 boolean doNotLockEntry,
-                                 EntryEventImpl clientEvent,
-                                 boolean returnTombstones)
+  private RawValue getSerialized(Object key, boolean updateStats, boolean doNotLockEntry, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) 
       throws EntryNotFoundException, IOException {
     RegionEntry re = null;
-    re = this.entries.getEntry(key);
+    if (allowReadFromHDFS) {
+      re = this.entries.getEntry(key);
+    } else {
+      re = this.entries.getOperationalEntryInVM(key);
+    }
     if (re == null) {
       return NULLVALUE;
     }
@@ -1442,18 +1504,13 @@ implements Bucket
    * 
    * @param keyInfo
    * @param generateCallbacks
-   * @param clientEvent holder for the entry's version information
+   * @param clientEvent holder for the entry's version information 
    * @param returnTombstones TODO
    * @return serialized (byte) form
    * @throws IOException if the result is not serializable
    * @see LocalRegion#get(Object, Object, boolean, EntryEventImpl)
    */
-  public RawValue getSerialized(KeyInfo keyInfo,
-                                boolean generateCallbacks,
-                                boolean doNotLockEntry,
-                                ClientProxyMembershipID requestingClient,
-                                EntryEventImpl clientEvent,
-                                boolean returnTombstones) throws IOException {
+  public RawValue getSerialized(KeyInfo keyInfo, boolean generateCallbacks, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws IOException {
     checkReadiness();
     checkForNoAccess();
     CachePerfStats stats = getCachePerfStats();
@@ -1463,7 +1520,7 @@ implements Bucket
     try {
       RawValue valueBytes = NULLVALUE;
       boolean isCreate = false;
-      RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones);
+      RawValue result = getSerialized(keyInfo.getKey(), true, doNotLockEntry, clientEvent, returnTombstones, allowReadFromHDFS);
       isCreate = result == NULLVALUE || (result.getRawValue() == Token.TOMBSTONE && !returnTombstones);
       miss = (result == NULLVALUE || Token.isInvalid(result.getRawValue()));
       if (miss) {
@@ -1475,7 +1532,7 @@ implements Bucket
             return REQUIRES_ENTRY_LOCK;
           }
           Object value = nonTxnFindObject(keyInfo, isCreate,
-              generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false);
+              generateCallbacks, result.getRawValue(), true, true, requestingClient, clientEvent, false, allowReadFromHDFS);
           if (value != null) {
             result = new RawValue(value);
           }
@@ -2414,8 +2471,36 @@ implements Bucket
   }
 
   public void beforeAcquiringPrimaryState() {
+    try {
+      createHoplogOrganizer();
+    } catch (IOException e) {
+      // 48990: when HDFS was down, gemfirexd should still start normally
+      logger.warn(LocalizedStrings.HOPLOG_NOT_STARTED_YET, e);
+    } catch(Throwable e) {
+      /*MergeGemXDHDFSToGFE changed this code to checkReadiness*/
+      // SystemFailure.checkThrowable(e);
+      this.checkReadiness();
+      //49333 - no matter what, we should elect a primary.
+      logger.error(LocalizedStrings.LocalRegion_UNEXPECTED_EXCEPTION, e);
+    }
+  }
+
+  public HoplogOrganizer<?> createHoplogOrganizer() throws IOException {
+    if (getPartitionedRegion().isHDFSRegion()) {
+      HoplogOrganizer<?> organizer = hoplog.get();
+      if (organizer != null) {
+        //  hoplog is recreated by anther thread
+        return organizer;
+      }
+
+      HoplogOrganizer hdfs = hoplog.getAndSet(getPartitionedRegion().hdfsManager.create(getId()));
+      assert hdfs == null;
+      return hoplog.get();
+    } else {
+      return null;
+    }
   }
-
+  
   public void afterAcquiringPrimaryState() {
     
   }
@@ -2423,13 +2508,105 @@ implements Bucket
    * Invoked when a primary bucket is demoted.
    */
   public void beforeReleasingPrimaryLockDuringDemotion() {
+    releaseHoplogOrganizer();
   }
 
+  protected void releaseHoplogOrganizer() {
+    // release resources during a clean transition
+    HoplogOrganizer hdfs = hoplog.getAndSet(null);
+    if (hdfs != null) {
+      getPartitionedRegion().hdfsManager.close(getId());
+    }
+  }
+  
+  public HoplogOrganizer<?> getHoplogOrganizer() throws HDFSIOException {
+    HoplogOrganizer<?> organizer = hoplog.get();
+    if (organizer == null) {
+      synchronized (getBucketAdvisor()) {
+        checkForPrimary();
+        try {
+          organizer = createHoplogOrganizer();
+        } catch (IOException e) {
+          throw new HDFSIOException("Failed to create Hoplog organizer due to ", e);
+        }
+        if (organizer == null) {
+          throw new HDFSIOException("Hoplog organizer is not available for " + this);
+        }
+      }
+    }
+    return organizer;
+  }
+  
   @Override
   public RegionAttributes getAttributes() {
     return this;
   }
 
+  @Override
+  public void hdfsCalled(Object key) {
+    this.partitionedRegion.hdfsCalled(key);
+  }
+
+  @Override
+  protected void clearHDFSData() {
+    //clear the HDFS data if present
+    if (getPartitionedRegion().isHDFSReadWriteRegion()) {
+      // Clear the queue
+      ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+      if (q == null) return;
+      q.clear(getPartitionedRegion(), this.getId());
+      HoplogOrganizer organizer = hoplog.get();
+      if (organizer != null) {
+        try {
+          organizer.clear();
+        } catch (IOException e) {
+          throw new GemFireIOException(LocalizedStrings.HOPLOG_UNABLE_TO_DELETE_HDFS_DATA.toLocalizedString(), e);
+        }
+      }
+    }
+  }
+  
+  public EvictionCriteria getEvictionCriteria() {
+    return this.partitionedRegion.getEvictionCriteria();
+  }
+  
+  public CustomEvictionAttributes getCustomEvictionAttributes() {
+    return this.partitionedRegion.getCustomEvictionAttributes();
+  }
+  
+  /**
+   * @return true if the evict destroy was done; false if it was not needed
+   */
+  public boolean customEvictDestroy(Object key)
+  {
+    checkReadiness();
+    @Released final EntryEventImpl event = 
+          generateCustomEvictDestroyEvent(key);
+    event.setCustomEviction(true);
+    boolean locked = false;
+    try {
+      locked = beginLocalWrite(event);
+      return mapDestroy(event,
+                        false, // cacheWrite
+                        true,  // isEviction
+                        null); // expectedOldValue
+    }
+    catch (CacheWriterException error) {
+      throw new Error(LocalizedStrings.LocalRegion_CACHE_WRITER_SHOULD_NOT_HAVE_BEEN_CALLED_FOR_EVICTDESTROY.toLocalizedString(), error);
+    }
+    catch (TimeoutException anotherError) {
+      throw new Error(LocalizedStrings.LocalRegion_NO_DISTRIBUTED_LOCK_SHOULD_HAVE_BEEN_ATTEMPTED_FOR_EVICTDESTROY.toLocalizedString(), anotherError);
+    }
+    catch (EntryNotFoundException yetAnotherError) {
+      throw new Error(LocalizedStrings.LocalRegion_ENTRYNOTFOUNDEXCEPTION_SHOULD_BE_MASKED_FOR_EVICTDESTROY.toLocalizedString(), yetAnotherError);
+    } finally {
+      if (locked) {
+        endLocalWrite(event);
+      }
+      event.release();
+    }
+  }
+
   public boolean areSecondariesPingable() {
     
     Set<InternalDistributedMember> hostingservers = this.partitionedRegion.getRegionAdvisor()

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
index 0243cde..0facd93 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegionQueue.java
@@ -441,7 +441,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
     }
   }
 
-  protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event) {
+  protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
     if (didPut) {
       if (this.initialized) {
         this.eventSeqNumQueue.add(key);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
index 4a34771..6f673c7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheDistributionAdvisor.java
@@ -38,6 +38,8 @@ import com.gemstone.gemfire.cache.InterestPolicy;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.Scope;
 import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
 import com.gemstone.gemfire.distributed.Role;
 import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
@@ -1226,16 +1228,30 @@ public class CacheDistributionAdvisor extends DistributionAdvisor  {
       public boolean include(final Profile profile) {
         if (profile instanceof CacheProfile) {
           final CacheProfile cp = (CacheProfile)profile;
-          if (allAsyncEventIds.equals(cp.asyncEventQueueIds)) {
+          /*Since HDFS queues are created only when a region is created, this check is 
+           * unnecessary. Also this check is creating problem because hdfs queue is not 
+           * created on an accessor. Hence removing this check for hdfs queues. */
+          Set<String> allAsyncEventIdsNoHDFS = removeHDFSQueues(allAsyncEventIds);
+          Set<String> profileQueueIdsNoHDFS = removeHDFSQueues(cp.asyncEventQueueIds);
+          if (allAsyncEventIdsNoHDFS.equals(profileQueueIdsNoHDFS)) {
             return true;
           }else{
-            differAsycnQueueIds.add(allAsyncEventIds);
-            differAsycnQueueIds.add(cp.asyncEventQueueIds);
+            differAsycnQueueIds.add(allAsyncEventIdsNoHDFS);
+            differAsycnQueueIds.add(profileQueueIdsNoHDFS);
             return false;
           }
         }
         return false;
       }
+      private Set<String> removeHDFSQueues(Set<String> queueIds){
+        Set<String> queueIdsWithoutHDFSQueues = new HashSet<String>();
+        for (String queueId: queueIds){
+          if (!queueId.startsWith(HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS)){
+            queueIdsWithoutHDFSQueues.add(queueId);
+          }
+        }
+        return queueIdsWithoutHDFSQueues;
+      }
     });
     return differAsycnQueueIds;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
index ad84963..382c537 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CachePerfStats.java
@@ -156,6 +156,13 @@ public class CachePerfStats {
   protected static final int compressionPreCompressedBytesId;
   protected static final int compressionPostCompressedBytesId;
   
+  protected static final int evictByCriteria_evictionsId;// total actual evictions (entries evicted)
+  protected static final int evictByCriteria_evictionTimeId;// total eviction time including product + user expr. 
+  protected static final int evictByCriteria_evictionsInProgressId;
+  protected static final int evictByCriteria_evaluationsId;// total eviction attempts
+  protected static final int evictByCriteria_evaluationTimeId;// time taken to evaluate user expression.
+  
+
   /** The Statistics object that we delegate most behavior to */
   protected final Statistics stats;
 
@@ -514,6 +521,12 @@ public class CachePerfStats {
     compressionDecompressionsId = type.nameToId("decompressions");
     compressionPreCompressedBytesId = type.nameToId("preCompressedBytes");
     compressionPostCompressedBytesId = type.nameToId("postCompressedBytes");
+    
+    evictByCriteria_evictionsId = type.nameToId("evictByCriteria_evictions");
+    evictByCriteria_evictionTimeId = type.nameToId("evictByCriteria_evictionTime"); 
+    evictByCriteria_evictionsInProgressId = type.nameToId("evictByCriteria_evictionsInProgress");
+    evictByCriteria_evaluationsId= type.nameToId("evictByCriteria_evaluations");
+    evictByCriteria_evaluationTimeId = type.nameToId("evictByCriteria_evaluationTime");
   }
   
   ////////////////////////  Constructors  ////////////////////////
@@ -1341,4 +1354,66 @@ public class CachePerfStats {
       stats.incLong(exportTimeId, getStatTime() - start);
     }
   }
+  
+//  // used for the case of evict on incoming
+  public long startCustomEviction() {
+    return NanoTimer.getTime();
+  }
+
+  // used for the case of evict on incoming
+  public void endCustomEviction(long start) {
+    long ts = NanoTimer.getTime();
+    stats.incLong(evictByCriteria_evictionTimeId, ts - start);
+  }
+
+  public void incEvictionsInProgress() {
+    this.stats.incLong(evictByCriteria_evictionsInProgressId, 1);
+  }
+
+  public void decEvictionsInProgress() {
+    this.stats.incLong(evictByCriteria_evictionsInProgressId, -1);
+  }
+
+  public void incEvictions() {
+    this.stats.incLong(evictByCriteria_evictionsId, 1);
+  }
+
+  public void incEvaluations() {
+    this.stats.incLong(evictByCriteria_evaluationsId, 1);
+  }
+
+  public void incEvaluations(int delta) {
+    this.stats.incLong(evictByCriteria_evaluationsId, delta);
+  }
+  
+  public long startEvaluation() {
+    return NanoTimer.getTime();
+  }
+
+  public void endEvaluation(long start, long notEvaluationTime) {
+    long ts = NanoTimer.getTime();
+    long totalTime = ts - start;
+    long evaluationTime = totalTime - notEvaluationTime;
+    stats.incLong(evictByCriteria_evaluationTimeId, evaluationTime);
+  }
+
+  public long getEvictions() {
+    return stats.getLong(evictByCriteria_evictionsId);
+  }
+
+  public long getEvictionsInProgress() {
+    return stats.getLong(evictByCriteria_evictionsInProgressId);
+  }
+
+  public long getEvictionsTime() {
+    return stats.getLong(evictByCriteria_evictionTimeId);
+  }
+
+  public long getEvaluations() {
+    return stats.getLong(evictByCriteria_evaluationsId);
+  }
+
+  public long getEvaluationTime() {
+    return stats.getLong(evictByCriteria_evaluationTimeId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
index 72edc10..1441144 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ColocationHelper.java
@@ -107,6 +107,9 @@ public class ColocationHelper {
   }
     private static PartitionedRegion getColocatedPR(
       final PartitionedRegion partitionedRegion, final String colocatedWith) {
+    logger.info(LocalizedMessage.create(
+        LocalizedStrings.HOPLOG_0_COLOCATE_WITH_REGION_1_NOT_INITIALIZED_YET,
+        new Object[] { partitionedRegion.getFullPath(), colocatedWith }));
     PartitionedRegion colocatedPR = (PartitionedRegion) partitionedRegion
         .getCache().getPartitionedRegion(colocatedWith, false);
     assert colocatedPR != null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
new file mode 100644
index 0000000..0c82f97
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/CustomEvictionAttributesImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+import com.gemstone.gemfire.cache.EvictionCriteria;
+
+/**
+ * Concrete instance of {@link CustomEvictionAttributes}.
+ * 
+ * @since gfxd 1.0
+ */
+public final class CustomEvictionAttributesImpl extends
+    CustomEvictionAttributes {
+
+  public CustomEvictionAttributesImpl(EvictionCriteria<?, ?> criteria,
+      long startTime, long interval, boolean evictIncoming) {
+    super(criteria, startTime, interval, evictIncoming);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
index cafdb80..f8475ae 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistTXState.java
@@ -145,7 +145,7 @@ public class DistTXState extends TXState {
               } 
             } 
           } // end if primary
-        }
+        } // end non-hdfs buckets
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
index 6a7b4f2..a6d2488 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedCacheOperation.java
@@ -863,6 +863,8 @@ public abstract class DistributedCacheOperation {
 
     private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
 
+	protected final static short FETCH_FROM_HDFS = 0x200;
+    
     protected final static short IS_PUT_DML = 0x100;
 
     public boolean needsRouting;
@@ -1365,6 +1367,7 @@ public abstract class DistributedCacheOperation {
       if ((extBits & INHIBIT_NOTIFICATIONS_MASK) != 0) {
         this.inhibitAllNotifications = true;
 	  if (this instanceof PutAllMessage) {
+        ((PutAllMessage) this).setFetchFromHDFS((extBits & FETCH_FROM_HDFS) != 0);
         ((PutAllMessage) this).setPutDML((extBits & IS_PUT_DML) != 0);
       }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
index b6aa1b6..2817fdd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedPutAllOperation.java
@@ -856,6 +856,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     PutAllMessage msg = new PutAllMessage();
     msg.eventId = event.getEventId();
     msg.context = event.getContext();
+	msg.setFetchFromHDFS(event.isFetchFromHDFS());
     msg.setPutDML(event.isPutDML());
     return msg;
   }
@@ -870,7 +871,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
   public PutAllPRMessage createPRMessagesNotifyOnly(int bucketId) {
     final EntryEventImpl event = getBaseEvent();
     PutAllPRMessage prMsg = new PutAllPRMessage(bucketId, putAllDataSize, true,
-        event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false /*isPutDML*/);
+        event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), false, false /*isPutDML*/);
     if (event.getContext() != null) {
       prMsg.setBridgeContext(event.getContext());
     }
@@ -899,7 +900,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
       PutAllPRMessage prMsg = (PutAllPRMessage)prMsgMap.get(bucketId);
       if (prMsg == null) {
         prMsg = new PutAllPRMessage(bucketId.intValue(), putAllDataSize, false,
-            event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isPutDML());
+            event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument(), event.isFetchFromHDFS(), event.isPutDML());
         prMsg.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
 
         // set dpao's context(original sender) into each PutAllMsg
@@ -1076,6 +1077,9 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
 
     protected EventID eventId = null;
     
+    // By default, fetchFromHDFS == true;
+    private transient boolean fetchFromHDFS = true;
+    
     private transient boolean isPutDML = false;
 
     protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
@@ -1133,11 +1137,12 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
      *          the region the entry is put in
      */
     public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn,
-        boolean requiresRegionContext, boolean isPutDML) {
+        boolean requiresRegionContext, boolean fetchFromHDFS, boolean isPutDML) {
       @Released EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(), 
           this.context, rgn,
           requiresRegionContext, this.possibleDuplicate,
           this.needsRouting, this.callbackArg, true, skipCallbacks);
+	  ev.setFetchFromHDFS(fetchFromHDFS);
       ev.setPutDML(isPutDML);
       // we don't need to set old value here, because the msg is from remote. local old value will get from next step
       try {
@@ -1232,7 +1237,7 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
               logger.debug("putAll processing {} with {} sender={}", putAllData[i], putAllData[i].versionTag, sender);
             }
             putAllData[i].setSender(sender);
-            doEntryPut(putAllData[i], rgn, requiresRegionContext,  isPutDML);
+            doEntryPut(putAllData[i], rgn, requiresRegionContext,  fetchFromHDFS, isPutDML);
           }
         }
       }, ev.getEventId());
@@ -1361,6 +1366,10 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
       return Arrays.asList(ops);
     }
     
+    public void setFetchFromHDFS(boolean val) {
+      this.fetchFromHDFS = val;
+    }
+    
     public void setPutDML(boolean val) {
       this.isPutDML = val;
     }
@@ -1368,6 +1377,9 @@ public class DistributedPutAllOperation extends AbstractUpdateOperation
     @Override
     protected short computeCompressedExtBits(short bits) {
       bits = super.computeCompressedExtBits(bits);
+      if (fetchFromHDFS) {
+        bits |= FETCH_FROM_HDFS;
+      }
       if (isPutDML) {
         bits |= IS_PUT_DML;
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 226d914..addba8e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -17,6 +17,8 @@
 
 package com.gemstone.gemfire.internal.cache;
 
+import static com.gemstone.gemfire.internal.offheap.annotations.OffHeapIdentifier.ABSTRACT_REGION_ENTRY_FILL_IN_VALUE;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -111,6 +113,8 @@ import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationE
 import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 import com.gemstone.gemfire.internal.cache.versions.VersionSource;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueConfigurationException;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderConfigurationException;
 import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
@@ -1260,6 +1264,8 @@ public class DistributedRegion extends LocalRegion implements
   private final Set<DistributedMember> memoryThresholdReachedMembers =
     new CopyOnWriteArraySet<DistributedMember>();
 
+  private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+
   /** Sets and returns giiMissingRequiredRoles */
   private boolean checkInitialImageForReliability(
       InternalDistributedMember imageTarget,
@@ -2418,16 +2424,9 @@ public class DistributedRegion extends LocalRegion implements
   /** @return the deserialized value */
   @Override
   @Retained
-  protected Object findObjectInSystem(KeyInfo keyInfo,
-                                      boolean isCreate,
-                                      TXStateInterface txState,
-                                      boolean generateCallbacks,
-                                      Object localValue,
-                                      boolean disableCopyOnRead,
-                                      boolean preferCD,
-                                      ClientProxyMembershipID requestingClient,
-                                      EntryEventImpl clientEvent,
-                                      boolean returnTombstones)
+  protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+      TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
+        boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
       throws CacheLoaderException, TimeoutException
   {
     checkForLimitedOrNoAccess();
@@ -2546,6 +2545,18 @@ public class DistributedRegion extends LocalRegion implements
     }
   }
   
+  protected ConcurrentParallelGatewaySenderQueue getHDFSQueue() {
+    if (this.hdfsQueue == null) {
+      String asyncQId = this.getPartitionedRegion().getHDFSEventQueueName();
+      final AsyncEventQueueImpl asyncQ =  (AsyncEventQueueImpl)this.getCache().getAsyncEventQueue(asyncQId);
+      final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+      AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
+      if (ep == null) return null;
+      hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
+    }
+    return hdfsQueue;
+  }
+
   /** hook for subclasses to note that a cache load was performed
    * @see BucketRegion#performedLoad
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index e241622..2b826ce 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -193,8 +193,16 @@ public class EntryEventImpl
   /** version tag for concurrency checks */
   protected VersionTag versionTag;
 
+  /** boolean to indicate that this operation should be optimized by not fetching from HDFS*/
+  private transient boolean fetchFromHDFS = true;
+  
   private transient boolean isPutDML = false;
 
+  /** boolean to indicate that the RegionEntry for this event was loaded from HDFS*/
+  private transient boolean loadedFromHDFS= false;
+  
+  private transient boolean isCustomEviction = false;
+  
   /** boolean to indicate that the RegionEntry for this event has been evicted*/
   private transient boolean isEvicted = false;
   
@@ -650,6 +658,14 @@ public class EntryEventImpl
     return this.op.isEviction();
   }
 
+  public final boolean isCustomEviction() {
+    return this.isCustomEviction;
+  }
+  
+  public final void setCustomEviction(boolean customEvict) {
+    this.isCustomEviction = customEvict;
+  }
+  
   public final void setEvicted() {
     this.isEvicted = true;
   }
@@ -3031,6 +3047,13 @@ public class EntryEventImpl
   public boolean isOldValueOffHeap() {
     return isOffHeapReference(this.oldValue);
   }
+  public final boolean isFetchFromHDFS() {
+    return fetchFromHDFS;
+  }
+
+  public final void setFetchFromHDFS(boolean fetchFromHDFS) {
+    this.fetchFromHDFS = fetchFromHDFS;
+  }
 
   public final boolean isPutDML() {
     return this.isPutDML;
@@ -3039,4 +3062,12 @@ public class EntryEventImpl
   public final void setPutDML(boolean val) {
     this.isPutDML = val;
   }
+
+  public final boolean isLoadedFromHDFS() {
+    return loadedFromHDFS;
+  }
+
+  public final void setLoadedFromHDFS(boolean loadedFromHDFS) {
+    this.loadedFromHDFS = loadedFromHDFS;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
new file mode 100644
index 0000000..9054d6d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.EvictionCriteria;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.gemstone.gemfire.internal.offheap.Releasable;
+/**
+ * Schedules each iteration periodically. EvictorService takes absolute time and
+ * a period as input and schedules Eviction at absolute times by calculating the
+ * interval. For scheduling the next eviction iteration it also takes into
+ * account the time taken to complete one iteration. If an iteration takes more
+ * time than the specified period then another iteration is scheduled
+ * immediately.
+ * 
+ * 
+ */
+
+public class EvictorService extends AbstractScheduledService {
+
+  private final EvictionCriteria<Object, Object> criteria;
+
+  // period is always in seconds
+  private long interval;
+
+  private volatile boolean stopScheduling;
+
+  private long nextScheduleTime;
+
+  private GemFireCacheImpl cache;
+
+  private Region region;
+  
+  private volatile ScheduledExecutorService executorService;
+
+  public EvictorService(EvictionCriteria<Object, Object> criteria,
+      long evictorStartTime, long evictorInterval, TimeUnit unit, Region r) {
+    this.criteria = criteria;
+    this.interval = unit.toSeconds(evictorInterval);
+    this.region = r;
+    try {
+      this.cache = GemFireCacheImpl.getExisting();
+    } catch (CacheClosedException cce) {
+      
+    }
+    //TODO: Unless we revisit System.currentTimeMillis or cacheTimeMillis keep the default
+//    long now = (evictorStartTime != 0 ? evictorStartTime
+//        + this.cache.getDistributionManager().getCacheTimeOffset() : this.cache
+//        .getDistributionManager().cacheTimeMillis()) / 1000;
+    long now = this.cache.getDistributionManager().cacheTimeMillis() / 1000;
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n().fine(
+          "EvictorService: The startTime(now) is " + now + " evictorStartTime : " + evictorStartTime);
+    }
+    
+    this.nextScheduleTime = now + 10;
+
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n().fine(
+          "EvictorService: The startTime is " + this.nextScheduleTime);
+    }
+  }
+
+  @Override
+  protected void runOneIteration() throws Exception {
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n()
+          .fine(
+              "EvictorService: Running the iteration at "
+                  + cache.cacheTimeMillis());
+    }
+    if (stopScheduling || checkCancelled(cache)) {
+      stopScheduling(); // if check cancelled
+      if (this.cache.getLoggerI18n().fineEnabled()) {
+        this.cache
+            .getLoggerI18n()
+            .fine(
+                "EvictorService: Abort eviction since stopScheduling OR cancel in progress. Evicted 0 entries ");
+      }
+      return;
+    }
+    CachePerfStats stats = ((LocalRegion)this.region).getCachePerfStats();
+    long startEvictionTime = stats.startCustomEviction();
+    int evicted = 0;
+    long startEvaluationTime = stats.startEvaluation();
+    Iterator<Entry<Object, Object>> keysItr = null;
+    long totalIterationsTime = 0;
+    
+    keysItr = this.criteria.getKeysToBeEvicted(this.cache
+        .getDistributionManager().cacheTimeMillis(), this.region);
+    try {
+    stats.incEvaluations(this.region.size());
+    // if we have been asked to stop scheduling
+    // or the cache is closing stop in between.
+    
+    
+    while (keysItr.hasNext() && !stopScheduling && !checkCancelled(cache)) {
+      Map.Entry<Object, Object> entry = keysItr.next();
+      long startIterationTime = this.cache
+          .getDistributionManager().cacheTimeMillis();
+      Object routingObj = entry.getValue();
+      if (this.cache.getLoggerI18n().fineEnabled()) {
+        this.cache.getLoggerI18n().fine(
+            "EvictorService: Going to evict the following entry " + entry);
+      }
+      if (this.region instanceof PartitionedRegion) {
+        try {
+          PartitionedRegion pr = (PartitionedRegion)this.region;
+          stats.incEvictionsInProgress();
+          int bucketId = PartitionedRegionHelper.getHashKey(pr, routingObj);
+          BucketRegion br = pr.getDataStore().getLocalBucketById(bucketId);
+          // This has to be called on BucketRegion directly and not on the PR as
+          // PR doesn't allow operation on Secondary buckets.
+          if (br != null) {
+            if (this.cache.getLoggerI18n().fineEnabled()) {
+              this.cache.getLoggerI18n().fine(
+                  "EvictorService: Going to evict the following entry " + entry
+                      + " from bucket " + br);
+            }
+            if (br.getBucketAdvisor().isPrimary()) {
+              boolean succ = false;
+              try {
+                succ = br.customEvictDestroy(entry.getKey());
+              } catch (PrimaryBucketException e) {
+                if (this.cache.getLoggerI18n().fineEnabled()) {
+                  this.cache.getLoggerI18n().warning(
+                      LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
+                }
+              }
+              
+              if (succ)
+                evicted++;
+              if (this.cache.getLoggerI18n().fineEnabled()) {
+                this.cache.getLoggerI18n()
+                    .fine(
+                        "EvictorService: Evicted the following entry " + entry
+                            + " from bucket " + br + " successfully " + succ
+                            + " the value in buk is " /*
+                                                       * +
+                                                       * br.get(entry.getKey())
+                                                       */);
+              }
+            }
+          }
+          stats.incEvictions();
+        } catch (Exception e) {
+          if (this.cache.getLoggerI18n().fineEnabled()) {
+            this.cache.getLoggerI18n().warning(
+                LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
+          }
+          // TODO:
+          // Do the exception handling .
+          // Check if the bucket is present
+          // If the entry could not be evicted then log the warning
+          // Log any other exception.
+        }finally{
+          stats.decEvictionsInProgress();
+          long endIterationTime = this.cache
+              .getDistributionManager().cacheTimeMillis();
+          totalIterationsTime += (endIterationTime - startIterationTime);
+        }
+      }
+    }
+    }finally {
+      if(keysItr instanceof Releasable) {
+        ((Releasable)keysItr).release();
+      }
+    }
+    stats.endEvaluation(startEvaluationTime, totalIterationsTime);    
+    
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n().fine(
+          "EvictorService: Completed an iteration at time "
+              + this.cache.getDistributionManager().cacheTimeMillis() / 1000
+              + ". Evicted " + evicted + " entries.");
+    }
+    stats.endCustomEviction(startEvictionTime);
+  }
+
+  private boolean checkCancelled(GemFireCacheImpl cache) {
+    if (cache.getCancelCriterion().cancelInProgress() != null) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected Scheduler scheduler() {
+    return new CustomScheduler() {
+      @Override
+      protected Schedule getNextSchedule() throws Exception {
+        // get the current time in seconds from DM.
+        // it takes care of clock skew etc in different VMs
+        long now = cache.getDistributionManager().cacheTimeMillis() / 1000;
+        if (cache.getLoggerI18n().fineEnabled()) {
+          cache.getLoggerI18n().fine("EvictorService: Now is " + now);
+        }
+        long delay = 0;
+        if (now < nextScheduleTime) {
+          delay = nextScheduleTime - now;
+        }
+        nextScheduleTime += interval;
+        // calculate the next immediate time i.e. schedule time in seconds
+        // set the schedule.delay to that scheduletime - currenttime
+        if (cache.getLoggerI18n().fineEnabled()) {
+          cache.getLoggerI18n().fine(
+              "EvictorService: Returning the next schedule with delay " + delay
+                  + " next schedule is at : " + nextScheduleTime);
+        }
+
+        return new Schedule(delay, TimeUnit.SECONDS);
+      }
+    };
+  }
+
+  /**
+   * Region.destroy and Region.close should make sure to call this method. This
+   * will be called here.
+   */
+  public void stopScheduling() {
+    this.stopScheduling = true;
+  }
+
+  // this will be called when we stop the service.
+  // not sure if we have to do any cleanup
+  // to stop the service call stop()
+  @Override
+  protected void shutDown() throws Exception {
+    this.executorService.shutdownNow();
+    this.region= null;
+    this.cache = null;
+  }
+
+  // This will be called when we start the service.
+  // not sure if we have to any intialization
+  @Override
+  protected void startUp() throws Exception {
+
+  }
+
+  public void changeEvictionInterval(long newInterval) {
+    this.interval = newInterval / 1000;
+    if (cache.getLoggerI18n().fineEnabled()) {
+      cache.getLoggerI18n().fine(
+          "EvictorService: New interval is " + this.interval);
+    }
+  }
+
+  public void changeStartTime(long newStart) {
+    this.nextScheduleTime = newStart/1000;
+    if (cache.getLoggerI18n().fineEnabled()) {
+      cache.getLoggerI18n().fine("EvictorService: New start time is " + this.nextScheduleTime);
+    }
+  }
+  
+  protected ScheduledExecutorService executor() {
+    this.executorService = super.executor();
+    return this.executorService;
+  }
+
+}