You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:00 UTC

[14/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index c477466..db14e57 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -126,6 +126,16 @@ import com.gemstone.gemfire.cache.client.internal.ClientMetadataService;
 import com.gemstone.gemfire.cache.client.internal.ClientRegionFactoryImpl;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
@@ -922,6 +932,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
       }
       FunctionService.registerFunction(new PRContainsValueFunction());
+      FunctionService.registerFunction(new HDFSLastCompactionTimeFunction());
+      FunctionService.registerFunction(new HDFSForceCompactionFunction());
+      FunctionService.registerFunction(new HDFSFlushQueueFunction());
       this.expirationScheduler = new ExpirationScheduler(this.system);
 
       // uncomment following line when debugging CacheExistsException
@@ -2172,6 +2185,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
           closeDiskStores();
           diskMonitor.close();
           
+          closeHDFSStores();
+          
           // Close the CqService Handle.
           try {
             if (isDebugEnabled) {
@@ -2257,6 +2272,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         } catch (CancelException e) {
           // make sure the disk stores get closed
           closeDiskStores();
+          closeHDFSStores();
           // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
 
           // okay, we're taking too long to do this stuff, so let's
@@ -3103,6 +3119,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
             future = (Future) this.reinitializingRegions.get(fullPath);
           }
           if (future == null) {
+            HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath, attrs, this);
+            attrs = setEvictionAttributesForLargeRegion(attrs);
             if (internalRegionArgs.getInternalMetaRegion() != null) {
               rgn = internalRegionArgs.getInternalMetaRegion();
             } else if (isPartitionedRegion) {
@@ -3227,6 +3245,54 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
   }
 
+  /**
+   * turn on eviction by default for HDFS regions
+   */
+  @SuppressWarnings("deprecation")
+  public <K, V> RegionAttributes<K, V> setEvictionAttributesForLargeRegion(
+      RegionAttributes<K, V> attrs) {
+    RegionAttributes<K, V> ra = attrs;
+    if (DISABLE_AUTO_EVICTION) {
+      return ra;
+    }
+    if (attrs.getDataPolicy().withHDFS()
+        || attrs.getHDFSStoreName() != null) {
+      // make the region overflow by default
+      EvictionAttributes evictionAttributes = attrs.getEvictionAttributes();
+      boolean hasNoEvictionAttrs = evictionAttributes == null
+          || evictionAttributes.getAlgorithm().isNone();
+      AttributesFactory<K, V> af = new AttributesFactory<K, V>(attrs);
+      String diskStoreName = attrs.getDiskStoreName();
+      // set the local persistent directory to be the same as that for
+      // HDFS store
+      if (attrs.getHDFSStoreName() != null) {
+        HDFSStoreImpl hdfsStore = findHDFSStore(attrs.getHDFSStoreName());
+        if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && hdfsStore == null) {
+          // HDFS store expected to be found at this point
+          throw new IllegalStateException(
+              LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND
+                  .toLocalizedString(attrs.getHDFSStoreName()));
+        }
+        // if there is no disk store, use the one configured for hdfs queue
+        if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && diskStoreName == null) {
+          diskStoreName = hdfsStore.getDiskStoreName();
+        }
+      }
+      // set LRU heap eviction with overflow to disk for HDFS stores with
+      // local Oplog persistence
+      // set eviction attributes only if not set
+      if (hasNoEvictionAttrs) {
+        if (diskStoreName != null) {
+          af.setDiskStoreName(diskStoreName);
+        }
+        af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(
+            ObjectSizer.DEFAULT, EvictionAction.OVERFLOW_TO_DISK));
+      }
+      ra = af.create();
+    }
+    return ra;
+  }
+
   public final Region getRegion(String path) {
     return getRegion(path, false);
   }
@@ -4944,6 +5010,48 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         c.setRegionAttributes(pra.toString(), af.create());
         break;
       }
+      case PARTITION_HDFS: {
+    	  AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(false);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
+      case PARTITION_REDUNDANT_HDFS: {
+    	  AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(1);
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(false);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
+      case PARTITION_WRITEONLY_HDFS_STORE: {
+        AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(true);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
+      case PARTITION_REDUNDANT_WRITEONLY_HDFS_STORE: {
+        AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(1);
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(true);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
       default:
         throw new IllegalStateException("unhandled enum " + pra);
       }
@@ -5337,6 +5445,45 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
   }
   
+  @Override
+  public HDFSStoreFactory createHDFSStoreFactory() {
+    // TODO Auto-generated method stub
+    return new HDFSStoreFactoryImpl(this);
+  }
+  
+  public HDFSStoreFactory createHDFSStoreFactory(HDFSStoreCreation creation) {
+    return new HDFSStoreFactoryImpl(this, creation);
+  }
+  public void addHDFSStore(HDFSStoreImpl hsi) {
+    HDFSStoreDirector.getInstance().addHDFSStore(hsi);
+    //TODO:HDFS Add a resource event for hdfs store creation as well 
+    // like the following disk store event
+    //system.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi);
+  }
+
+  public void removeHDFSStore(HDFSStoreImpl hsi) {
+    //hsi.destroy();
+    HDFSStoreDirector.getInstance().removeHDFSStore(hsi.getName());
+    //TODO:HDFS Add a resource event for hdfs store as well 
+    // like the following disk store event
+    //system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
+  }
+
+  public void closeHDFSStores() {
+    HDFSRegionDirector.reset();
+    HDFSStoreDirector.getInstance().closeHDFSStores();
+  }
+
+  
+  public HDFSStoreImpl findHDFSStore(String name) {
+    return HDFSStoreDirector.getInstance().getHDFSStore(name);
+  }
+  
+  public Collection<HDFSStoreImpl> getHDFSStores() {
+    return HDFSStoreDirector.getInstance().getAllHDFSStores();
+  }
+  
+  
   public TemporaryResultSetFactory getResultSetFactory() {
     return this.resultSetFactory;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
index c924be5..3896800 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
@@ -373,20 +373,13 @@ public final class HARegion extends DistributedRegion
   
   /**
    * @return the deserialized value
-   * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
+   * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean)
    *      
    */
   @Override
-  protected Object findObjectInSystem(KeyInfo keyInfo,
-                                      boolean isCreate,
-                                      TXStateInterface txState,
-                                      boolean generateCallbacks,
-                                      Object localValue,
-                                      boolean disableCopyOnRead,
-                                      boolean preferCD,
-                                      ClientProxyMembershipID requestingClient,
-                                      EntryEventImpl clientEvent,
-                                      boolean returnTombstones)
+  protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+      TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
+      boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
     throws CacheLoaderException, TimeoutException  {
 
     Object value = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
new file mode 100644
index 0000000..f6c6aa7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Collection;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
+import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Implementation of RegionMap that reads data from HDFS and adds LRU behavior
+ * 
+ */
+public class HDFSLRURegionMap extends AbstractLRURegionMap implements HDFSRegionMap {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final HDFSRegionMapDelegate delegate;
+
+  /**
+   *  A tool from the eviction controller for sizing entries and
+   *  expressing limits.
+   */
+  private EnableLRU ccHelper;
+
+  /**  The list of nodes in LRU order */
+  private NewLRUClockHand lruList;
+
+  private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+
+  public HDFSLRURegionMap(LocalRegion owner, Attributes attrs,
+      InternalRegionArguments internalRegionArgs) {
+    super(internalRegionArgs);
+    assert owner instanceof BucketRegion;
+    initialize(owner, attrs, internalRegionArgs);
+    this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
+  }
+
+  @Override
+  public RegionEntry getEntry(Object key) {
+    return delegate.getEntry(key, null);
+  }
+
+  @Override
+  protected RegionEntry getEntry(EntryEventImpl event) {
+    return delegate.getEntry(event);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Collection<RegionEntry> regionEntries() {
+    return delegate.regionEntries();
+  }
+    
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+    
+  @Override
+  public boolean isEmpty() {
+    return delegate.isEmpty();
+  }
+
+  @Override
+  protected void _setCCHelper(EnableLRU ccHelper) {
+    this.ccHelper = ccHelper;
+  }
+
+  @Override
+  protected EnableLRU _getCCHelper() {
+    return this.ccHelper;
+  }
+
+  @Override
+  protected void _setLruList(NewLRUClockHand lruList) {
+    this.lruList = lruList;
+  }
+
+  @Override
+  protected NewLRUClockHand _getLruList() {
+    return this.lruList;
+  }
+
+  @Override
+  public HDFSRegionMapDelegate getDelegate() {
+    return this.delegate;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
new file mode 100644
index 0000000..2a7baef
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+/**
+ * Interface implemented by RegionMap implementations that
+ * read from HDFS.
+ * 
+ *
+ */
+public interface HDFSRegionMap {
+
+  /**
+   * @return the {@link HDFSRegionMapDelegate} that does
+   * all the work
+   */
+  public HDFSRegionMapDelegate getDelegate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
new file mode 100644
index 0000000..a2ef653
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet.HDFSIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.RegionMap.Attributes;
+import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
+import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
+
+/**
+ * This class encapsulates all the functionality of HDFSRegionMap, so
+ * that it can be provided to HDFSLRURegionMap. 
+ * 
+ */
+public class HDFSRegionMapDelegate {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final BucketRegion owner;
+
+  private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+
+  private final RegionMap backingRM;
+
+  /** queue of dead iterators */
+  private final ReferenceQueue<HDFSIterator> refs;
+  
+  private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+  
+  /**
+   * used for serializing fetches from HDFS
+   */
+  private ConcurrentMap<Object, FutureResult> futures = new ConcurrentHashMap<Object, FutureResult>();
+
+  public HDFSRegionMapDelegate(LocalRegion owner, Attributes attrs,
+      InternalRegionArguments internalRegionArgs, RegionMap backingRM) {
+    assert owner instanceof BucketRegion;
+    this.owner = (BucketRegion) owner;
+    this.backingRM = backingRM;
+    refs = new ReferenceQueue<HDFSEntriesSet.HDFSIterator>();
+  }
+
+  public RegionEntry getEntry(Object key, EntryEventImpl event) {
+    
+    RegionEntry re = getEntry(key, event, true);
+    // get from tx should put the entry back in map
+    // it should be evicted once tx completes
+    /**MergeGemXDHDFSToGFE txstate does not apply for this*/
+    /* if (re != null && getTXState(event) != null) {
+    if (re != null) {
+      // put the region entry in backing CHM of AbstractRegionMap so that
+      // it can be locked in basicPut/destroy
+      RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+      if (oldRe != null) {
+        if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+          ((OffHeapRegionEntry)re).release();
+        }
+        return oldRe;
+      }
+      re.setMarkedForEviction();
+      owner.updateSizeOnCreate(key,
+          owner.calculateRegionEntryValueSize(re));
+      ((AbstractRegionMap)backingRM).incEntryCount(1);
+      ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+    }*/
+    return re;
+  }
+
+  /*
+  private TXStateInterface getTXState(EntryEventImpl event) {
+    return event != null ? event.getTXState(this.owner) : this.owner
+        .getTXState();
+  }*/
+
+  /**
+   * 
+   * @param key
+   * @param event
+   * @param forceOnHeap if true will return heap version of off-heap region entries
+   */
+  private RegionEntry getEntry(Object key, EntryEventImpl event, boolean forceOnHeap) {
+    closeDeadIterators();
+    
+    RegionEntry re = backingRM.getEntryInVM(key);
+    if (logger.isTraceEnabled() || DEBUG) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: Found the key in CHM: " + key
+          + " ,value=" + (re == null? "null" : "[" + re._getValue() + " or (" + re.getValueAsToken() + ")]")));
+    }
+    if ((re == null || (re.isRemoved() && !re.isTombstone()))
+        && owner.getBucketAdvisor().isPrimary() && allowReadFromHDFS()) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: fetching from hdfs key:" + key));
+      }
+      try {
+        this.owner.getPartitionedRegion().hdfsCalled(key);
+        re = getEntryFromFuture(key);
+        if (re != null) {
+          return re;
+        }
+
+        assert this.owner.getPartitionedRegion().getDataPolicy()
+            .withHDFS();
+        byte[] k = EntryEventImpl.serialize(key);
+      
+        // for destroy ops we will retain the entry in the region map so
+        // tombstones can be tracked
+        //final boolean forceOnHeap = (event==null || !event.getOperation().isDestroy());
+        
+        // get from queue
+        re = getFromHDFSQueue(key, k, forceOnHeap);
+        if (re == null) {
+          // get from HDFS
+          re = getFromHDFS(key, k, forceOnHeap);
+        }
+        if (re != null && re.isTombstone()) {
+          RegionVersionVector vector = this.owner.getVersionVector();
+//          if (vector == null) {
+//            this.owner.getLogWriterI18n().info(LocalizedStrings.DEBUG,
+//            "found a tombstone in a region w/o a version vector: " + re + "; region: " + this.owner);
+//          }
+          if (vector == null
+              || vector.isTombstoneTooOld(re.getVersionStamp().getMemberID(),
+                                    re.getVersionStamp().getRegionVersion())) {
+            re = null;
+          }
+        }
+        if (logger.isTraceEnabled() || DEBUG) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: returning from hdfs re:" + re));
+        }
+      } catch (ForceReattemptException e) {
+        throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+      } catch (IOException e) {
+        throw new HDFSIOException("Error reading from HDFS", e);
+      } finally {
+        notifyFuture(key, re);
+        // If we mark it here, the table scan may miss it causing updates/delete using table
+        // scan to fail.
+//        if (re != null) {
+//          re.setMarkedForEviction();
+//        }
+        if(re != null && event != null && !re.isTombstone()) {
+          if (logger.isTraceEnabled() || DEBUG) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: loaded from hdfs re:" + re));
+          }
+          BucketRegion br = (BucketRegion)owner;
+          //CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
+          //if(csAttr!=null)
+          event.setLoadedFromHDFS(true);
+        }
+      }
+    }
+    if(re!=null && re.isMarkedForEviction() && !re.isTombstone()) {
+      if(event!=null) {
+        event.setLoadedFromHDFS(true);
+      }
+    }
+
+    return re;
+  }
+
+  /**
+   * This method returns true if the RegionEntry should be read from HDFS.
+   * fixes #49101 by not allowing reads from HDFS for persistent regions
+   * that do not define an eviction criteria.
+   * 
+   * @return true if RegionEntry should be read from HDFS
+   */
+  private boolean allowReadFromHDFS() {
+    if (!owner.getDataPolicy().withPersistence()
+        || owner.getCustomEvictionAttributes() != null
+        || isEvictionActionLocalDestroy()){
+        /**MergeGemXDHDFSToGFE this is used for global index. Hence not required here*/ 
+        //|| owner.isUsedForIndex()) {
+      // when region does not have persistence, we have to read from HDFS (even
+      // though there is no eviction criteria) for constraint checks
+      return true;
+    }
+    return false;
+  }
+
+  private boolean isEvictionActionLocalDestroy() {
+    PartitionedRegion pr = owner.getPartitionedRegion();
+    if (pr.getEvictionAttributes() != null) {
+      return pr.getEvictionAttributes().getAction() == EvictionAction.LOCAL_DESTROY;
+    }
+    return false;
+  }
+
+  protected RegionEntry getEntry(EntryEventImpl event) {
+    RegionEntry re = getEntry(event.getKey(), event, false);
+    if (re != null && event.isLoadedFromHDFS()) {
+      // put the region entry in backing CHM of AbstractRegionMap so that
+      // it can be locked in basicPut/destroy
+      RegionEntry oldRe = backingRM.putEntryIfAbsent(event.getKey(), re);
+      if (oldRe != null) {
+        if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+          ((OffHeapRegionEntry) re).release();
+        }
+        return oldRe;
+      }
+      // since the entry is faulted in from HDFS, it must have
+      // satisfied the eviction criteria in the past, so mark it for eviction
+      re.setMarkedForEviction();
+
+      owner.updateSizeOnCreate(event.getKey(), owner.calculateRegionEntryValueSize(re));
+      ((AbstractRegionMap) backingRM).incEntryCount(1);
+      ((AbstractRegionMap) backingRM).lruEntryCreate(re);
+    }
+    return re;
+  }
+
+  @SuppressWarnings("unchecked")
+  public Collection<RegionEntry> regionEntries() {
+    closeDeadIterators();
+    if (!owner.getPartitionedRegion().includeHDFSResults()) {
+      if (logger.isDebugEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #regionEntries"));
+      }
+      return backingRM.regionEntriesInVM();
+    }
+
+    try {
+      return createEntriesSet(IteratorType.ENTRIES);
+    } catch (ForceReattemptException e) {
+      throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+    }
+  }
+    
+  public int size() {
+    closeDeadIterators();
+    if (!owner.getPartitionedRegion().includeHDFSResults()) {
+      if (logger.isDebugEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #size"));
+      }
+      return backingRM.sizeInVM();
+    }
+
+    try {
+      return createEntriesSet(IteratorType.KEYS).size();
+    } catch (ForceReattemptException e) {
+      throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+    }
+  }
+    
+  public boolean isEmpty() {
+    closeDeadIterators();
+    if (!owner.getPartitionedRegion().includeHDFSResults()) {
+      if (logger.isDebugEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #isEmpty"));
+      }
+      return backingRM.sizeInVM() == 0;
+    }
+
+    try {
+      return createEntriesSet(IteratorType.KEYS).isEmpty();
+    } catch (ForceReattemptException e) {
+      throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+    }
+  }
+  
+  private void notifyFuture(Object key, RegionEntry re) {
+    FutureResult future = this.futures.remove(key);
+    if (future != null) {
+      future.set(re);
+    }
+  }
+
+  private RegionEntry getEntryFromFuture(Object key) {
+    FutureResult future = new FutureResult(this.owner.getCancelCriterion());
+    FutureResult old = this.futures.putIfAbsent(key, future);
+    if (old != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: waiting for concurrent fetch to complete for key:" + key));
+      }
+      try {
+        return (RegionEntry) old.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        this.owner.getCache().getCancelCriterion().checkCancelInProgress(null);
+      }
+    }
+    return null;
+  }
+
+  private RegionEntry getFromHDFS(Object key, byte[] k, boolean forceOnHeap) throws IOException, ForceReattemptException {
+    SortedHoplogPersistedEvent ev;
+    try {
+      ev = (SortedHoplogPersistedEvent) owner.getHoplogOrganizer().read(k);
+    } catch (IOException e) {
+      owner.checkForPrimary();
+      throw e;
+    }
+    if (ev != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs ev:" + ev));
+      }
+      return getEntryFromEvent(key, ev, forceOnHeap, false);
+    }
+    return null;
+  }
+
+  /**
+   * set the versionTag on the newly faulted-in entry
+   */
+  private void setVersionTag(RegionEntry re, VersionTag versionTag) {
+    if (owner.concurrencyChecksEnabled) {
+      versionTag.setMemberID(
+            owner.getVersionVector().getCanonicalId(versionTag.getMemberID()));
+      VersionStamp versionedRe = (VersionStamp) re;
+      versionedRe.setVersions(versionTag);
+    }
+  }
+
+  private RegionEntry getFromHDFSQueue(Object key, byte[] k, boolean forceOnHeap) throws ForceReattemptException {
+    ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+    if (q == null) return null;
+    HDFSGatewayEventImpl hdfsGatewayEvent = (HDFSGatewayEventImpl) q.get(owner.getPartitionedRegion(), k, owner.getId());
+    if (hdfsGatewayEvent != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs queue: " + hdfsGatewayEvent));
+      }
+      return getEntryFromEvent(key, hdfsGatewayEvent, forceOnHeap, false);
+    }
+    return null;
+  }
+
+  private ConcurrentParallelGatewaySenderQueue getHDFSQueue()
+      throws ForceReattemptException {
+    if (this.hdfsQueue == null) {
+      String asyncQId = this.owner.getPartitionedRegion().getHDFSEventQueueName();
+      final AsyncEventQueueImpl asyncQ =  (AsyncEventQueueImpl)this.owner.getCache().getAsyncEventQueue(asyncQId);
+      final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+      AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
+      if (ep == null) return null;
+      hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
+    }
+    
+    // Check whether the queue has become primary here.
+    // There could be some time between bucket becoming a primary
+    // and underlying queue becoming a primary, so isPrimaryWithWait()
+    // waits for some time for the queue to become a primary on this member
+    final HDFSBucketRegionQueue brq = hdfsQueue.getBucketRegionQueue(
+        this.owner.getPartitionedRegion(), this.owner.getId());
+    if (brq != null) {
+      if (owner.getBucketAdvisor().isPrimary()
+          && !brq.getBucketAdvisor().isPrimaryWithWait()) {
+        InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
+            .basicGetPrimaryMember();
+        throw new PrimaryBucketException("Bucket " + brq.getName()
+            + " is not primary. Current primary holder is " + primaryHolder);
+      }
+    }
+      
+    return hdfsQueue;
+  }
+
+  public RegionEntry getEntryFromEvent(Object key, HDFSGatewayEventImpl event, boolean forceOnHeap, boolean forUpdate) {
+    Object val;
+    if (event.getOperation().isDestroy()) {
+      val = Token.TOMBSTONE;
+    } else if (event.getOperation().isInvalidate()) {
+      val = Token.INVALID;
+    } else {
+      val = event.getValue();
+    }
+    RegionEntry re = null;
+    final TXStateInterface tx = owner.getTXState();
+    if (tx == null) {
+      re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
+      return re;
+    }
+    else
+    if (val != null) {
+      if (((re = this.backingRM.getEntryInVM(key)) == null)
+          || (re.isRemoved() && !re.isTombstone())) {
+        boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate); 
+        re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
+        if (forUpdate) {
+          if (re != null && tx != null) {
+            // put the region entry in backing CHM of AbstractRegionMap so that
+            // it can be locked in basicPut/destroy
+            RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+            if (oldRe != null) {
+              if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+                ((OffHeapRegionEntry)re).release();
+              }
+              return oldRe;
+            }
+            re.setMarkedForEviction();
+            owner.updateSizeOnCreate(key,
+                owner.calculateRegionEntryValueSize(re));
+            ((AbstractRegionMap)backingRM).incEntryCount(1);
+            ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+          }
+        }
+      }
+    }
+    return re;
+  }
+
+  public RegionEntry getEntryFromEvent(Object key, SortedHoplogPersistedEvent event, boolean forceOnHeap, boolean forUpdate) {
+    Object val = getValueFromEvent(event);
+    RegionEntry re = null;
+    final TXStateInterface tx = owner.getTXState();
+    if (tx == null) {
+      re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
+      return re;
+    }
+    else // FOR TX case, we need to create off heap entry if required
+    if (val != null) {
+      if (((re = this.backingRM.getEntryInVM(key)) == null)
+          || (re.isRemoved() && !re.isTombstone())) {
+        boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate); 
+        re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
+        if(forUpdate) {
+          if (re != null && tx != null) {
+            // put the region entry in backing CHM of AbstractRegionMap so that
+            // it can be locked in basicPut/destroy
+            RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+            if (oldRe != null) {
+              if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+                ((OffHeapRegionEntry)re).release();
+              }
+              return oldRe;
+            }
+            re.setMarkedForEviction();
+            owner.updateSizeOnCreate(key,
+                owner.calculateRegionEntryValueSize(re));
+            ((AbstractRegionMap)backingRM).incEntryCount(1);
+            ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+          }
+        }
+      }
+    }
+    return re;
+  }
+
+  private RegionEntry createRegionEntry(Object key, Object value, VersionTag tag, boolean forceOnHeap) {
+    RegionEntryFactory ref = backingRM.getEntryFactory();
+    if (forceOnHeap) {
+      ref = ref.makeOnHeap();
+    }
+    value = getValueDuringGII(key, value);
+    RegionEntry re = ref.createEntry(this.owner, key, value);
+    setVersionTag(re, tag);
+    if (re instanceof LRUEntry) {
+      assert backingRM instanceof AbstractLRURegionMap;
+      EnableLRU ccHelper = ((AbstractLRURegionMap)backingRM)._getCCHelper();
+      ((LRUEntry)re).updateEntrySize(ccHelper);
+    }
+    return re;
+  }
+
+  private Object getValueDuringGII(Object key, Object value) {
+    if (owner.getIndexUpdater() != null && !owner.isInitialized()) {
+      return AbstractRegionMap.listOfDeltasCreator.newValue(key, owner, value,
+          null);
+    }
+    return value;
+  }
+
+  private Set createEntriesSet(IteratorType type)
+      throws ForceReattemptException {
+    ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+    if (q == null) return Collections.emptySet();
+    HDFSBucketRegionQueue brq = q.getBucketRegionQueue(this.owner.getPartitionedRegion(), owner.getId());
+    return new HDFSEntriesSet(owner, brq, owner.getHoplogOrganizer(), type, refs);
+  }
+
+  private void closeDeadIterators() {
+    Reference<? extends HDFSIterator> weak;
+    while ((weak = refs.poll()) != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Closing weak ref for iterator "
+            + weak.get()));
+      }
+      weak.get().close();
+    }
+  }
+
+  /**
+   * gets the value from event, deserializing if necessary.
+   */
+  private Object getValueFromEvent(PersistedEventImpl ev) {
+    if (ev.getOperation().isDestroy()) {
+      return Token.TOMBSTONE;
+    } else if (ev.getOperation().isInvalidate()) {
+      return Token.INVALID;
+    }
+    return ev.getValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
new file mode 100644
index 0000000..9336ed7
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Collection;
+
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.internal.size.SingleObjectSizer;
+
+/**
+ * Implementation of RegionMap that reads data from HDFS.
+ * 
+ */
+public class HDFSRegionMapImpl extends AbstractRegionMap implements HDFSRegionMap {
+
+  private final HDFSRegionMapDelegate delegate;
+
+  private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+
+  public HDFSRegionMapImpl(LocalRegion owner, Attributes attrs,
+      InternalRegionArguments internalRegionArgs) {
+    super(internalRegionArgs);
+    assert owner instanceof BucketRegion;
+    initialize(owner, attrs, internalRegionArgs, false);
+    this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
+  }
+
+  @Override
+  public RegionEntry getEntry(Object key) {
+    return delegate.getEntry(key, null);
+  }
+
+  @Override
+  protected RegionEntry getEntry(EntryEventImpl event) {
+    return delegate.getEntry(event);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Collection<RegionEntry> regionEntries() {
+    return delegate.regionEntries();
+  }
+    
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+    
+  @Override
+  public boolean isEmpty() {
+    return delegate.isEmpty();
+  }
+
+  @Override
+  public HDFSRegionMapDelegate getDelegate() {
+    return this.delegate;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
index bda5a27..36eee80 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalCache.java
@@ -20,6 +20,8 @@ package com.gemstone.gemfire.internal.cache;
 import java.util.Collection;
 
 import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.internal.cache.extension.Extensible;
@@ -43,5 +45,7 @@ public interface InternalCache extends Cache, Extensible<Cache> {
 
   public CqService getCqService();
   
+  public Collection<HDFSStoreImpl> getHDFSStores() ;
+  
   public <T extends CacheService> T getService(Class<T> clazz);
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
index 0885477..e506f2e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalDataView.java
@@ -39,22 +39,17 @@ public interface InternalDataView {
    * @param keyInfo
    * @param localRegion
    * @param updateStats
-   * @param disableCopyOnRead
-   * @param preferCD
+   * @param disableCopyOnRead 
+   * @param preferCD 
    * @param clientEvent TODO
    * @param returnTombstones TODO
    * @param retainResult if true then the result may be a retained off-heap reference
    * @return the object associated with the key
    */
   @Retained
-  Object getDeserializedValue(KeyInfo keyInfo,
-                              LocalRegion localRegion,
-                              boolean updateStats,
-                              boolean disableCopyOnRead,
-                              boolean preferCD,
-                              EntryEventImpl clientEvent,
-                              boolean returnTombstones,
-                              boolean retainResult);
+  Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, 
+      boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult);
 
   /**
    * @param event
@@ -187,8 +182,8 @@ public interface InternalDataView {
    * @return the Object associated with the key
    */
   Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks,
-                    Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
-                    EntryEventImpl clientEvent, boolean returnTombstones);
+      Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+      EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS);
 
 
   /**
@@ -229,18 +224,13 @@ public interface InternalDataView {
    * 
    * @param localRegion
    * @param key
-   * @param doNotLockEntry
+   * @param doNotLockEntry 
    * @param requestingClient the client that made the request, or null if not from a client
    * @param clientEvent the client event, if any
    * @param returnTombstones TODO
    * @return the serialized value from the cache
    */
-  Object getSerializedValue(LocalRegion localRegion,
-                            KeyInfo key,
-                            boolean doNotLockEntry,
-                            ClientProxyMembershipID requestingClient,
-                            EntryEventImpl clientEvent,
-                            boolean returnTombstones) throws DataLocationException;
+  Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException;
 
   abstract void checkSupportsRegionDestroy() throws UnsupportedOperationInTransactionException;
   abstract void checkSupportsRegionInvalidate() throws UnsupportedOperationInTransactionException;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
index f7d46fe..41e763d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/InternalRegionArguments.java
@@ -37,6 +37,7 @@ public final class InternalRegionArguments
   private boolean isUsedForPartitionedRegionAdmin;
   private boolean isUsedForSerialGatewaySenderQueue;
   private boolean isUsedForParallelGatewaySenderQueue;
+  private boolean isUsedForHDFSParallelGatewaySenderQueue = false;
   private int bucketRedundancy;
   private boolean isUsedForPartitionedRegionBucket;
   private RegionAdvisor partitionedRegionAdvisor;
@@ -272,11 +273,26 @@ public final class InternalRegionArguments
     this.isUsedForParallelGatewaySenderQueue = queueFlag;
     return this;
   }
+  public InternalRegionArguments setIsUsedForHDFSParallelGatewaySenderQueue(
+      boolean queueFlag) {
+    this.isUsedForHDFSParallelGatewaySenderQueue = queueFlag;
+    return this;
+  }
 
   public boolean isUsedForParallelGatewaySenderQueue() {
     return this.isUsedForParallelGatewaySenderQueue;
   }
   
+  public boolean isUsedForHDFSParallelGatewaySenderQueue() {
+    return this.isUsedForHDFSParallelGatewaySenderQueue;
+  }
+  
+  public boolean isReadWriteHDFSRegion() {
+    return isUsedForPartitionedRegionBucket()
+        && getPartitionedRegion().getHDFSStoreName() != null
+        && !getPartitionedRegion().getHDFSWriteOnly();
+  }
+
   public InternalRegionArguments setParallelGatewaySender(
       AbstractGatewaySender pgSender) {
     this.parallelGatewaySender = pgSender;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 3ad294c..b3de9b7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -116,6 +116,11 @@ import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
 import com.gemstone.gemfire.cache.control.ResourceManager;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
 import com.gemstone.gemfire.cache.partition.PartitionRegionHelper;
 import com.gemstone.gemfire.cache.query.FunctionDomainException;
 import com.gemstone.gemfire.cache.query.Index;
@@ -460,6 +465,10 @@ public class LocalRegion extends AbstractRegion
   // Lock for updating PR MetaData on client side 
   public final Lock clientMetaDataLock = new ReentrantLock();
   
+  
+  protected HdfsRegionManager hdfsManager;
+  protected HoplogListenerForRegion hoplogListener;
+
   /**
    * There seem to be cases where a region can be created and yet the
    * distributed system is not yet in place...
@@ -632,6 +641,7 @@ public class LocalRegion extends AbstractRegion
       }
     }
 
+    this.hdfsManager = initHDFSManager();
     this.dsi = findDiskStore(attrs, internalRegionArgs);
     this.diskRegion = createDiskRegion(internalRegionArgs);
     this.entries = createRegionMap(internalRegionArgs);
@@ -686,8 +696,22 @@ public class LocalRegion extends AbstractRegion
     
   }
 
+  private HdfsRegionManager initHDFSManager() {
+    HdfsRegionManager hdfsMgr = null;
+    if (this.getHDFSStoreName() != null) {
+      this.hoplogListener = new HoplogListenerForRegion();
+      HDFSRegionDirector.getInstance().setCache(cache);
+      hdfsMgr = HDFSRegionDirector.getInstance().manageRegion(this, 
+          this.getHDFSStoreName(), hoplogListener);
+    }
+    return hdfsMgr;
+  }
+
   private RegionMap createRegionMap(InternalRegionArguments internalRegionArgs) {
     RegionMap result = null;
+	if ((internalRegionArgs.isReadWriteHDFSRegion()) && this.diskRegion != null) {
+      this.diskRegion.setEntriesMapIncompatible(true);
+    }
     if (this.diskRegion != null) {
       result = this.diskRegion.useExistingRegionMap(this);
     }
@@ -953,6 +977,11 @@ public class LocalRegion extends AbstractRegion
           existing = (LocalRegion)this.subregions.get(subregionName);
 
           if (existing == null) {
+            // create the async queue for HDFS if required. 
+            HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath,
+                regionAttributes, this.cache);
+            regionAttributes = cache.setEvictionAttributesForLargeRegion(
+                regionAttributes);
             if (regionAttributes.getScope().isDistributed()
                 && internalRegionArgs.isUsedForPartitionedRegionBucket()) {
               final PartitionedRegion pr = internalRegionArgs
@@ -962,8 +991,15 @@ public class LocalRegion extends AbstractRegion
               internalRegionArgs.setKeyRequiresRegionContext(pr
                   .keyRequiresRegionContext());
               if (pr.isShadowPR()) {
-                newRegion = new BucketRegionQueue(subregionName, regionAttributes,
-                  this, this.cache, internalRegionArgs);
+                if (!pr.isShadowPRForHDFS()) {
+                    newRegion = new BucketRegionQueue(subregionName, regionAttributes,
+                      this, this.cache, internalRegionArgs);
+                }
+                else {
+                   newRegion = new HDFSBucketRegionQueue(subregionName, regionAttributes,
+                      this, this.cache, internalRegionArgs);
+                }
+                
               } else {
                 newRegion = new BucketRegion(subregionName, regionAttributes,
                     this, this.cache, internalRegionArgs);  
@@ -1098,6 +1134,7 @@ public class LocalRegion extends AbstractRegion
       if (event.getEventId() == null && generateEventID()) {
         event.setNewEventId(cache.getDistributedSystem());
       }
+      assert event.isFetchFromHDFS() : "validatedPut() should have been called";
       // Fix for 42448 - Only make create with null a local invalidate for
       // normal regions. Otherwise, it will become a distributed invalidate.
       if (getDataPolicy() == DataPolicy.NORMAL) {
@@ -1224,20 +1261,18 @@ public class LocalRegion extends AbstractRegion
    * @param retainResult if true then the result may be a retained off-heap reference
    * @return the value for the given key
    */
-  public final Object getDeserializedValue(RegionEntry re,
-                                           final KeyInfo keyInfo,
-                                           final boolean updateStats,
-                                           boolean disableCopyOnRead,
-                                           boolean preferCD,
-                                           EntryEventImpl clientEvent,
-                                           boolean returnTombstones,
-                                           boolean retainResult) {
+  public final Object getDeserializedValue(RegionEntry re, final KeyInfo keyInfo, final boolean updateStats, boolean disableCopyOnRead, 
+  boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS, boolean retainResult) {
     if (this.diskRegion != null) {
       this.diskRegion.setClearCountReference();
     }
     try {
       if (re == null) {
-        re = this.entries.getEntry(keyInfo.getKey());
+        if (allowReadFromHDFS) {
+          re = this.entries.getEntry(keyInfo.getKey());
+        } else {
+          re = this.entries.getOperationalEntryInVM(keyInfo.getKey());
+        }
       }
       //skip updating the stats if the value is null
       // TODO - We need to clean up the callers of the this class so that we can
@@ -1347,7 +1382,7 @@ public class LocalRegion extends AbstractRegion
   public Object get(Object key, Object aCallbackArgument,
       boolean generateCallbacks, EntryEventImpl clientEvent) throws TimeoutException, CacheLoaderException
   {
-    Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false);
+    Object result = get(key, aCallbackArgument, generateCallbacks, false, false, null, clientEvent, false, true/*allowReadFromHDFS*/);
     if (Token.isInvalid(result)) {
       result = null;
     }
@@ -1357,16 +1392,11 @@ public class LocalRegion extends AbstractRegion
   /*
    * @see BucketRegion#getSerialized(KeyInfo, boolean, boolean)
    */
-  public Object get(Object key,
-                    Object aCallbackArgument,
-                    boolean generateCallbacks,
-                    boolean disableCopyOnRead,
-                    boolean preferCD,
-                    ClientProxyMembershipID requestingClient,
-                    EntryEventImpl clientEvent,
-                    boolean returnTombstones) throws TimeoutException, CacheLoaderException {
+  public Object get(Object key, Object aCallbackArgument,
+	      boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+	      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws TimeoutException, CacheLoaderException {
 	  return get(key, aCallbackArgument,
-		      generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, false);
+		      generateCallbacks, disableCopyOnRead, preferCD,requestingClient, clientEvent, returnTombstones, false, allowReadFromHDFS, false);
   }
   
   /**
@@ -1388,17 +1418,16 @@ public class LocalRegion extends AbstractRegion
   public Object getRetained(Object key, Object aCallbackArgument,
       boolean generateCallbacks, boolean disableCopyOnRead,
       ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) throws TimeoutException, CacheLoaderException {
-    return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal,
-      false /* see GEODE-1291*/);
+    return get(key, aCallbackArgument, generateCallbacks, disableCopyOnRead, true, requestingClient, clientEvent, returnTombstones, opScopeIsLocal, true, false/* see GEODE-1291*/);
   }
   /**
    * @param opScopeIsLocal if true then just check local storage for a value; if false then try to find the value if it is not local
    * @param retainResult if true then the result may be a retained off-heap reference.
    */
   public Object get(Object key, Object aCallbackArgument,
-                    boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
-                    ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones,
-                    boolean opScopeIsLocal, boolean retainResult) throws TimeoutException, CacheLoaderException
+      boolean generateCallbacks, boolean disableCopyOnRead, boolean preferCD,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, 
+	  boolean opScopeIsLocal, boolean allowReadFromHDFS, boolean retainResult) throws TimeoutException, CacheLoaderException
   {
     assert !retainResult || preferCD;
     validateKey(key);
@@ -1411,8 +1440,7 @@ public class LocalRegion extends AbstractRegion
     boolean isMiss = true;
     try {
       KeyInfo keyInfo = getKeyInfo(key, aCallbackArgument);
-      Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
-        retainResult);
+      Object value = getDataView().getDeserializedValue(keyInfo, this, true, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS, retainResult);
       final boolean isCreate = value == null;
       isMiss = value == null || Token.isInvalid(value)
           || (!returnTombstones && value == Token.TOMBSTONE);
@@ -1425,13 +1453,13 @@ public class LocalRegion extends AbstractRegion
         // if scope is local and there is no loader, then
         // don't go further to try and get value
         if (!opScopeIsLocal
-            && ((getScope().isDistributed())
+            && ((getScope().isDistributed() && !isHDFSRegion())
                 || hasServerProxy()
                 || basicGetLoader() != null)) { 
           // serialize search/load threads if not in txn
           value = getDataView().findObject(keyInfo,
               this, isCreate, generateCallbacks, value, disableCopyOnRead,
-              preferCD, requestingClient, clientEvent, returnTombstones);
+              preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
           if (!returnTombstones && value == Token.TOMBSTONE) {
             value = null;
           }
@@ -1457,7 +1485,7 @@ public class LocalRegion extends AbstractRegion
    */
   final public void recordMiss(final RegionEntry re, Object key) {
     final RegionEntry e;
-    if (re == null && !isTX()) {
+    if (re == null && !isTX() && !isHDFSRegion()) {
       e = basicGetEntry(key);
     } else {
       e = re;
@@ -1466,30 +1494,60 @@ public class LocalRegion extends AbstractRegion
   }
 
   /**
+   * @return true if this region has been configured for HDFS persistence
+   */
+  public boolean isHDFSRegion() {
+    return false;
+  }
+
+  /**
+   * @return true if this region is configured to read and write data from HDFS
+   */
+  public boolean isHDFSReadWriteRegion() {
+    return false;
+  }
+
+  /**
+   * @return true if this region is configured to only write to HDFS
+   */
+  protected boolean isHDFSWriteOnly() {
+    return false;
+  }
+
+  /**
+   * FOR TESTING ONLY
+   */
+  public HoplogListenerForRegion getHoplogListener() {
+    return hoplogListener;
+  }
+  
+  /**
+   * FOR TESTING ONLY
+   */
+  public HdfsRegionManager getHdfsRegionManager() {
+    return hdfsManager;
+  }
+  
+  /**
    * optimized to only allow one thread to do a search/load, other threads wait
    * on a future
-   *  @param keyInfo
+   *
+   * @param keyInfo
    * @param p_isCreate
    *                true if call found no entry; false if updating an existing
    *                entry
    * @param generateCallbacks
    * @param p_localValue
-*                the value retrieved from the region for this object.
+   *                the value retrieved from the region for this object.
    * @param disableCopyOnRead if true then do not make a copy
    * @param preferCD true if the preferred result form is CachedDeserializable
    * @param clientEvent the client event, if any
    * @param returnTombstones whether to return tombstones
    */
   @Retained
-  Object nonTxnFindObject(KeyInfo keyInfo,
-                          boolean p_isCreate,
-                          boolean generateCallbacks,
-                          Object p_localValue,
-                          boolean disableCopyOnRead,
-                          boolean preferCD,
-                          ClientProxyMembershipID requestingClient,
-                          EntryEventImpl clientEvent,
-                          boolean returnTombstones)
+  Object nonTxnFindObject(KeyInfo keyInfo, boolean p_isCreate,
+      boolean generateCallbacks, Object p_localValue, boolean disableCopyOnRead, boolean preferCD,
+      ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) 
       throws TimeoutException, CacheLoaderException
   {
     final Object key = keyInfo.getKey();
@@ -1548,8 +1606,7 @@ public class LocalRegion extends AbstractRegion
     try {
       boolean partitioned = this.getDataPolicy().withPartitioning();
       if (!partitioned) {
-        localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false,
-          false);
+        localValue = getDeserializedValue(null, keyInfo, isCreate, disableCopyOnRead, preferCD, clientEvent, false, false/*allowReadFromHDFS*/, false);
 
         // stats have now been updated
         if (localValue != null && !Token.isInvalid(localValue)) {
@@ -1558,7 +1615,7 @@ public class LocalRegion extends AbstractRegion
         }
         isCreate = localValue == null;
         result = findObjectInSystem(keyInfo, isCreate, null, generateCallbacks,
-            localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+            localValue, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, false/*allowReadFromHDFS*/);
 
       } else {
         
@@ -1566,7 +1623,7 @@ public class LocalRegion extends AbstractRegion
         // For PRs we don't want to deserialize the value and we can't use findObjectInSystem because
         // it can invoke code that is transactional.
         result = getSharedDataView().findObject(keyInfo, this, true/*isCreate*/, generateCallbacks,
-            localValue, disableCopyOnRead, preferCD, null, null, false);
+            localValue, disableCopyOnRead, preferCD, null, null, false, allowReadFromHDFS);
         // TODO why are we not passing the client event or returnTombstones in the above invokation?
       }
 
@@ -1749,6 +1806,7 @@ public class LocalRegion extends AbstractRegion
   public final EntryEventImpl newPutEntryEvent(Object key, Object value,
       Object aCallbackArgument) {
     EntryEventImpl ev = newUpdateEntryEvent(key, value, aCallbackArgument);
+    ev.setFetchFromHDFS(false);
     ev.setPutDML(true);
     return ev;
   }
@@ -1880,11 +1938,23 @@ public class LocalRegion extends AbstractRegion
     }
   }
 
+  protected boolean includeHDFSResults() {
+    return isUsedForPartitionedRegionBucket() 
+        && isHDFSReadWriteRegion() 
+        && getPartitionedRegion().includeHDFSResults();
+  }
+  
+
   /** a fast estimate of total number of entries locally in the region */
   public long getEstimatedLocalSize() {
     RegionMap rm;
     if (!this.isDestroyed) {
       long size;
+      if (isHDFSReadWriteRegion() && this.initialized) {
+        // this size is not used by HDFS region iterators
+        // fixes bug 49239
+        return 0;
+      }
       // if region has not been initialized yet, then get the estimate from
       // disk region's recovery map if available
       if (!this.initialized && this.diskRegion != null
@@ -2196,6 +2266,9 @@ public class LocalRegion extends AbstractRegion
       if (this.imageState.isClient() && !this.concurrencyChecksEnabled) {
         return result - this.imageState.getDestroyedEntriesCount();
       }
+	if (includeHDFSResults()) {
+      return result;
+    }
       return result - this.tombstoneCount.get();
     }
   }
@@ -2931,18 +3004,11 @@ public class LocalRegion extends AbstractRegion
    * @param clientEvent the client's event, if any.  If not null, we set the version tag
    * @param returnTombstones TODO
    * @return the deserialized value
-   * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
-   */
-  protected Object findObjectInSystem(KeyInfo keyInfo,
-                                      boolean isCreate,
-                                      TXStateInterface tx,
-                                      boolean generateCallbacks,
-                                      Object localValue,
-                                      boolean disableCopyOnRead,
-                                      boolean preferCD,
-                                      ClientProxyMembershipID requestingClient,
-                                      EntryEventImpl clientEvent,
-                                      boolean returnTombstones)
+   * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean )
+   */
+  protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
+      TXStateInterface tx, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient,
+      EntryEventImpl clientEvent, boolean returnTombstones,  boolean allowReadFromHDFS)
       throws CacheLoaderException, TimeoutException
   {
     final Object key = keyInfo.getKey();
@@ -5317,6 +5383,9 @@ public class LocalRegion extends AbstractRegion
     // Notify bridge clients (if this is a BridgeServer)
     event.setEventType(eventType);
     notifyBridgeClients(event);
+  if (this.hdfsStoreName != null) {
+    notifyGatewaySender(eventType, event);
+    }
     if(callDispatchListenerEvent){
       dispatchListenerEvent(eventType, event);
     }
@@ -7202,8 +7271,24 @@ public class LocalRegion extends AbstractRegion
     if (generateEventID()) {
       event.setNewEventId(cache.getDistributedSystem());
     }
+    event.setFetchFromHDFS(false);
+    return event;
+  }
+  
+  @Retained
+  protected EntryEventImpl generateCustomEvictDestroyEvent(final Object key) {
+    @Retained EntryEventImpl event =  EntryEventImpl.create(
+        this, Operation.CUSTOM_EVICT_DESTROY, key, null/* newValue */,
+        null, false, getMyId());
+    
+    // Fix for bug#36963
+    if (generateEventID()) {
+      event.setNewEventId(cache.getDistributedSystem());
+    }
+    event.setFetchFromHDFS(false);
     return event;
   }
+  
   /**
    * @return true if the evict destroy was done; false if it was not needed
    */
@@ -9856,6 +9941,8 @@ public class LocalRegion extends AbstractRegion
       }
     }
     
+    clearHDFSData();
+    
     if (!isProxy()) {
       // Now we need to recreate all the indexes.
       //If the indexManager is null we don't have to worry
@@ -9894,6 +9981,11 @@ public class LocalRegion extends AbstractRegion
     }
   }
 
+  /**Clear HDFS data, if present */
+  protected void clearHDFSData() {
+    //do nothing, clear is implemented for subclasses like BucketRegion.
+  }
+
   @Override
   void basicLocalClear(RegionEventImpl rEvent)
   {
@@ -10670,6 +10762,7 @@ public class LocalRegion extends AbstractRegion
   }
     public final DistributedPutAllOperation newPutAllForPUTDmlOperation(Map<?, ?> map, Object callbackArg) {
     DistributedPutAllOperation dpao = newPutAllOperation(map, callbackArg);
+    dpao.getEvent().setFetchFromHDFS(false);
     dpao.getEvent().setPutDML(true);
     return dpao;
   }
@@ -10725,6 +10818,7 @@ public class LocalRegion extends AbstractRegion
         putallOp, this, Operation.PUTALL_CREATE, key, value);
 
     try {
+	event.setFetchFromHDFS(putallOp.getEvent().isFetchFromHDFS());
     event.setPutDML(putallOp.getEvent().isPutDML());
     
     if (tagHolder != null) {
@@ -12827,6 +12921,22 @@ public class LocalRegion extends AbstractRegion
   public Integer getCountNotFoundInLocal() {
     return countNotFoundInLocal.get();
   }
+  /// End of Variables and methods for test Hook for HDFS ///////
+  public void forceHDFSCompaction(boolean isMajor, Integer maxWaitTime) {
+    throw new UnsupportedOperationException(
+        LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+            .toLocalizedString(getName()));
+  }
+
+  public void flushHDFSQueue(int maxWaitTime) {
+    throw new UnsupportedOperationException(
+        LocalizedStrings.HOPLOG_DOES_NOT_USE_HDFSSTORE
+            .toLocalizedString(getName()));
+  }
+  
+  public long lastMajorHDFSCompaction() {
+    throw new UnsupportedOperationException();
+  }
 
   public static void simulateClearForTests(boolean flag) {
     simulateClearForTests = flag;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
index c26ff10..5193a17 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
@@ -17,6 +17,7 @@
 package com.gemstone.gemfire.internal.cache;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Set;
 
 import com.gemstone.gemfire.cache.EntryNotFoundException;
@@ -35,16 +36,9 @@ public class LocalRegionDataView implements InternalDataView {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
-  public Object getDeserializedValue(KeyInfo keyInfo,
-                                     LocalRegion localRegion,
-                                     boolean updateStats,
-                                     boolean disableCopyOnRead,
-                                     boolean preferCD,
-                                     EntryEventImpl clientEvent,
-                                     boolean returnTombstones,
-                                     boolean retainResult) {
-    return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones,
-      retainResult);
+  public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult) {
+    return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadsFromHDFS, retainResult);
   }
 
   /* (non-Javadoc)
@@ -142,17 +136,9 @@ public class LocalRegionDataView implements InternalDataView {
   /* (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
    */
-  public Object findObject(KeyInfo keyInfo,
-                           LocalRegion r,
-                           boolean isCreate,
-                           boolean generateCallbacks,
-                           Object value,
-                           boolean disableCopyOnRead,
-                           boolean preferCD,
-                           ClientProxyMembershipID requestingClient,
-                           EntryEventImpl clientEvent,
-                           boolean returnTombstones) {
-   return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones);
+  public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
+      boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+   return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, requestingClient, clientEvent, returnTombstones, allowReadFromHDFS);
   }
 
   /* (non-Javadoc)
@@ -194,12 +180,7 @@ public class LocalRegionDataView implements InternalDataView {
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.BucketRegion, java.lang.Object, java.lang.Object)
    */
-  public Object getSerializedValue(LocalRegion localRegion,
-                                   KeyInfo key,
-                                   boolean doNotLockEntry,
-                                   ClientProxyMembershipID requestingClient,
-                                   EntryEventImpl clientEvent,
-                                   boolean returnTombstones) throws DataLocationException {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     throw new IllegalStateException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
index 4c1fa7f..bb83383 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
@@ -461,6 +461,26 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
   }
 
   @Override
+  public boolean isMarkedForEviction() {
+    throw new UnsupportedOperationException(LocalizedStrings
+        .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+            .toLocalizedString());
+  }
+  @Override
+  public void setMarkedForEviction() {
+    throw new UnsupportedOperationException(LocalizedStrings
+        .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+            .toLocalizedString());
+  }
+
+  @Override
+  public void clearMarkedForEviction() {
+    throw new UnsupportedOperationException(LocalizedStrings
+        .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+            .toLocalizedString());
+  }
+
+  @Override
   public boolean isValueNull() {
     return (null == getValueAsToken());
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
index 4728594..fe8813e 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
@@ -7384,6 +7384,19 @@ public final class Oplog implements CompactableOplog, Flushable {
       // TODO Auto-generated method stub
     }
     @Override
+    public boolean isMarkedForEviction() {
+      // TODO Auto-generated method stub
+      return false;
+    }
+    @Override
+    public void setMarkedForEviction() {
+      // TODO Auto-generated method stub
+    }
+    @Override
+    public void clearMarkedForEviction() {
+      // TODO Auto-generated method stub
+    }
+    @Override
     public boolean isInvalid() {
       // TODO Auto-generated method stub
       return false;