You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:10 UTC

[24/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
new file mode 100644
index 0000000..f69b3dc
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Observes and reacts to flush events.
+ * 
+ */
+public interface FlushObserver {
+  public interface AsyncFlushResult {
+    /**
+     * Waits for the most recently enqueued batch to completely flush.
+     * 
+     * @param time the time to wait
+     * @param unit the time unit
+     * @return true if flushed before the timeout
+     * @throws InterruptedException interrupted while waiting
+     */
+    public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
+  }
+
+  /**
+   * Returns true when the queued events should be drained from the queue
+   * immediately.
+   * 
+   * @return true if draining
+   */
+  boolean shouldDrainImmediately();
+  
+  /**
+   * Begins the flushing the queued events.
+   * 
+   * @return the async result
+   */
+  public AsyncFlushResult flush();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
new file mode 100644
index 0000000..9127e4d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
@@ -0,0 +1,1232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventBuffer.BufferIterator;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.RegionEventImpl;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.CursorIterator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * This class holds the sorted list required for HDFS. 
+ * 
+ * 
+ */
+public class HDFSBucketRegionQueue extends AbstractBucketRegionQueue {
+     private static final boolean VERBOSE = Boolean.getBoolean("hdfsBucketRegionQueue.VERBOSE");
+     private final int batchSize;
+     volatile HDFSEventQueue hdfsEventQueue = null;
+     
+     // set before releasing the primary lock. 
+     private final AtomicBoolean releasingPrimaryLock = new AtomicBoolean(true);
+     
+     // This is used to keep track of the current size of the queue in bytes. 
+     final AtomicLong queueSizeInBytes =  new AtomicLong(0);
+     public boolean isBucketSorted = true;
+     /**
+     * @param regionName
+     * @param attrs
+     * @param parentRegion
+     * @param cache
+     * @param internalRegionArgs
+     */
+    public HDFSBucketRegionQueue(String regionName, RegionAttributes attrs,
+        LocalRegion parentRegion, GemFireCacheImpl cache,
+        InternalRegionArguments internalRegionArgs) {
+      super(regionName, attrs, parentRegion, cache, internalRegionArgs);
+      
+      this.isBucketSorted = internalRegionArgs.getPartitionedRegion().getParallelGatewaySender().getBucketSorted();
+      if (isBucketSorted)
+        hdfsEventQueue = new MultiRegionSortedQueue();
+      else
+        hdfsEventQueue = new EventQueue();
+      
+      batchSize = internalRegionArgs.getPartitionedRegion().
+          getParallelGatewaySender().getBatchSize() * 1024 *1024;
+      this.keySet();
+    }
+    @Override
+    protected void initialize(InputStream snapshotInputStream,
+        InternalDistributedMember imageTarget,
+        InternalRegionArguments internalRegionArgs) throws TimeoutException,
+        IOException, ClassNotFoundException {
+
+      super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
+
+      loadEventsFromTempQueue();
+      
+      this.initialized = true;
+      notifyEventProcessor();
+    }
+
+    private TreeSet<Long> createSkipListFromMap(Set keySet) {
+      TreeSet<Long> sortedKeys = null;
+      if (!hdfsEventQueue.isEmpty())
+        return sortedKeys;
+      
+      if (!keySet.isEmpty()) {
+        sortedKeys = new TreeSet<Long>(keySet);
+        if (!sortedKeys.isEmpty())
+        {
+          for (Long key : sortedKeys) {
+            if (this.isBucketSorted) {
+              Object hdfsevent = getNoLRU(key, true, false, false);
+              if (hdfsevent == null) { // this can happen when tombstones are recovered. 
+                if (logger.isDebugEnabled() || VERBOSE) {
+                  logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding key " + key + ", no event recovered"));
+                }
+              } else {
+                int eventSize = ((HDFSGatewayEventImpl)hdfsevent).
+                    getSizeOnHDFSInBytes(!this.isBucketSorted);
+                hdfsEventQueue.put(key,(HDFSGatewayEventImpl)hdfsevent, eventSize );
+                queueSizeInBytes.getAndAdd(eventSize);
+              }
+            }
+            else {
+              Object hdfsevent = getNoLRU(key, true, false, false);
+              if (hdfsevent != null) { // hdfs event can be null when tombstones are recovered.
+                queueSizeInBytes.getAndAdd(((HDFSGatewayEventImpl)hdfsevent).
+                    getSizeOnHDFSInBytes(!this.isBucketSorted));
+              }
+              ((EventQueue)hdfsEventQueue).put(key);
+            }
+              
+          }
+          getEventSeqNum().setIfGreater(sortedKeys.last());
+        }
+      
+      }
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+            "For bucket " + getId() + ", total keys recovered are : " + keySet.size()
+                + " and the seqNo is " + getEventSeqNum()));
+      }
+      return sortedKeys;
+    }
+    
+    @Override
+    protected void basicClear(RegionEventImpl ev) {
+      super.basicClear(ev);
+      queueSizeInBytes.set(0);
+      if ( this.getBucketAdvisor().isPrimary()) {
+        this.hdfsEventQueue.clear();
+      }
+    }
+    
+    protected void clearQueues(){
+      queueSizeInBytes.set(0);
+      if ( this.getBucketAdvisor().isPrimary()) {
+        this.hdfsEventQueue.clear();
+      }
+    }
+   
+    @Override
+    protected void basicDestroy(final EntryEventImpl event,
+        final boolean cacheWrite, Object expectedOldValue)
+        throws EntryNotFoundException, CacheWriterException, TimeoutException {
+      super.basicDestroy(event, cacheWrite, expectedOldValue);
+    }
+    
+    ArrayList peekABatch() {
+      ArrayList result = new ArrayList();
+      hdfsEventQueue.peek(result);
+      return result;
+    }
+    
+    @Override
+    protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
+      if (didPut &&  this.getBucketAdvisor().isPrimary()) {
+        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
+        if (sizeOfHDFSEvent == -1) { 
+          try {
+            // the size is calculated only on primary before event is inserted in the bucket. 
+            // If this node became primary after size was calculated, sizeOfHDFSEvent will be -1. 
+            // Try to get the size. #50016
+            sizeOfHDFSEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
+          } catch (Throwable e) {
+           //   Ignore any exception while fetching the size.
+            sizeOfHDFSEvent = 0;
+          }
+        }
+        queueSizeInBytes.getAndAdd(sizeOfHDFSEvent);
+        if (this.initialized) {
+          Long longKey = (Long)key;
+          this.hdfsEventQueue.put(longKey, hdfsEvent, sizeOfHDFSEvent);
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Put successfully in the queue : " + hdfsEvent + " . Queue initialized: " 
+              + this.initialized);
+        }
+      }
+    }
+    
+    /**
+     * It removes the first key from the queue.
+     * 
+     * @return Returns the key for which value was destroyed.
+     * @throws ForceReattemptException
+     */
+    public Long remove() throws ForceReattemptException {
+      throw new UnsupportedOperationException("Individual entries cannot be removed in a HDFSBucketRegionQueue");
+    }
+    
+    /**
+     * It removes the first key from the queue.
+     * 
+     * @return Returns the value.
+     * @throws InterruptedException
+     * @throws ForceReattemptException
+     */
+    public Object take() throws InterruptedException, ForceReattemptException {
+      throw new UnsupportedOperationException("take() cannot be called for individual entries in a HDFSBucketRegionQueue");
+    }
+    
+    public void destroyKeys(ArrayList<HDFSGatewayEventImpl>  listToDestroy) {
+      
+      HashSet<Long> removedSeqNums = new HashSet<Long>();
+      
+      for (int index =0; index < listToDestroy.size(); index++) {
+        HDFSGatewayEventImpl entry = null;
+        if (this.isBucketSorted) {
+          // Remove the events in reverse order so that the events with higher sequence number
+          // are removed last to ensure consistency.
+          entry = listToDestroy.get(listToDestroy.size() - index -1);
+        } else {
+          entry = listToDestroy.get(index);
+        }
+       
+        try {
+          if (this.logger.isDebugEnabled())
+            logger.debug("destroying primary key " + entry.getShadowKey() + " bucket id: " + this.getId());
+          // removed from peeked list
+          boolean deleted = this.hdfsEventQueue.remove(entry);
+          if (deleted) {
+            // this is an onheap event so a call to size should be ok. 
+            long entrySize = entry.getSizeOnHDFSInBytes(!this.isBucketSorted);
+            destroyKey(entry.getShadowKey());
+            long queueSize = queueSizeInBytes.getAndAdd(-1*entrySize);
+            if (queueSize < 0) {
+              // In HA scenarios, queueSizeInBytes can go awry.
+              queueSizeInBytes.compareAndSet(queueSize, 0);
+            }
+            removedSeqNums.add(entry.getShadowKey());
+          }
+        }catch (ForceReattemptException e) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got ForceReattemptException for " + this
+            + " for bucket = " + this.getId());
+          }
+        }
+        catch(EntryNotFoundException e) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got EntryNotFoundException for " + this
+              + " for bucket = " + this.getId() + " and key " + entry.getShadowKey());
+          }
+        } finally {
+          entry.release();
+        }
+      }
+      
+      if (this.getBucketAdvisor().isPrimary()) {
+        hdfsEventQueue.handleRemainingElements(removedSeqNums);
+      }
+    }
+
+    
+    public boolean isReadyForPeek() {
+      return !this.isEmpty() && !this.hdfsEventQueue.isEmpty() && getBucketAdvisor().isPrimary();
+    }
+
+    public long getLastPeekTimeInMillis() {
+      return hdfsEventQueue.getLastPeekTimeInMillis();
+    }
+    
+    public long getQueueSizeInBytes() {
+      return queueSizeInBytes.get();
+    }
+    /*
+     * This function is called when the bucket takes as the role of primary.
+     */
+    @Override
+    public void beforeAcquiringPrimaryState() {
+      
+      queueSizeInBytes.set(0);
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+            "This node has become primary for bucket " + this.getId()  +". " +
+            		"Creating sorted data structure for the async queue."));
+      }
+      releasingPrimaryLock.set(false);
+      
+      // clear the hdfs queue in case it has already elements left if it was a primary
+      // in the past
+      hdfsEventQueue.clear();
+      if (isBucketSorted)
+        hdfsEventQueue = new MultiRegionSortedQueue();
+      else
+        hdfsEventQueue = new EventQueue();
+      
+      TreeSet<Long> sortedKeys = createSkipListFromMap(this.keySet());
+      
+      if (sortedKeys != null && sortedKeys.size() > 0) {    
+        // Mark the events equal to batch size as duplicate. 
+        // calculate the batch size based on the number of events currently in the queue
+        // This is an approximation. 
+        long batchSizeMB =  this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
+        long batchSizeInBytes = batchSizeMB*1024*1024;
+        long totalBucketSize = queueSizeInBytes.get();
+        totalBucketSize = totalBucketSize >  0 ? totalBucketSize: 1;
+        long totalEntriesInBucket = this.entryCount();
+        totalEntriesInBucket =  totalEntriesInBucket > 0 ? totalEntriesInBucket: 1;
+        
+        long perEntryApproxSize = totalBucketSize/totalEntriesInBucket;
+        perEntryApproxSize = perEntryApproxSize >  0 ? perEntryApproxSize: 1;
+        
+        int batchSize  = (int)(batchSizeInBytes/perEntryApproxSize);
+        
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+              "Calculating batch size " +  " batchSizeMB: " + batchSizeMB + " batchSizeInBytes: " + batchSizeInBytes + 
+              " totalBucketSize: " + totalBucketSize + " totalEntriesInBucket: " + totalEntriesInBucket + 
+              " perEntryApproxSize: " + perEntryApproxSize + " batchSize: " + batchSize ));
+        }
+        
+        markEventsAsDuplicate(batchSize, sortedKeys.iterator());
+      }
+    }
+    
+    @Override
+    public void beforeReleasingPrimaryLockDuringDemotion() {
+      queueSizeInBytes.set(0);
+      releasingPrimaryLock.set(true);
+      // release memory in case of a clean transition
+      hdfsEventQueue.clear();
+    }
+
+    /**
+     * This function searches the skip list and the peeked skip list for a given region key
+     * @param region 
+     * 
+     */
+    public HDFSGatewayEventImpl getObjectForRegionKey(Region region, byte[] regionKey) {
+      // get can only be called for a sorted queue.
+      // Calling get with Long.MIN_VALUE seq number ensures that 
+      // the list will return the key which has highest seq number. 
+      return hdfsEventQueue.get(region, regionKey, Long.MIN_VALUE);
+    }
+
+    /**
+     * Get an iterator on the queue, passing in the partitioned region
+     * we want to iterate over the events from.
+     */
+    public SortedEventQueueIterator iterator(Region region) {
+      return hdfsEventQueue.iterator(region);
+    }
+
+    public long totalEntries() {
+      return entryCount();
+    }
+    
+    /**
+     * Ideally this function should be called from a thread periodically to 
+     * rollover the skip list when it is above a certain size. 
+     * 
+     */
+    public void rolloverSkipList() {
+      // rollover can only be called for a sorted queue.
+      hdfsEventQueue.rollover();
+    }
+    
+    public boolean shouldDrainImmediately() {
+      return hdfsEventQueue.getFlushObserver().shouldDrainImmediately();
+    }
+
+    public AsyncFlushResult flush() {
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flush requested"));
+      }
+      return hdfsEventQueue.getFlushObserver().flush();
+    }
+    
+    /**
+     * This class keeps the regionkey and seqNum. The objects of this class are 
+     * kept in a concurrent skip list. The order of elements is decided based on the 
+     * comparison of regionKey + seqNum. This kind of comparison allows us to keep 
+     * multiple updates on a single key (becaus it has different seq Num)
+     */
+    static class KeyToSeqNumObject implements Comparable<KeyToSeqNumObject>
+    {
+      private byte[] regionkey; 
+      private Long seqNum;
+      
+      KeyToSeqNumObject(byte[] regionkey, Long seqNum){
+        this.regionkey = regionkey;
+        this.seqNum = seqNum;
+      }
+      
+      /**
+       * This function compares the key first. If the keys are same then seq num is compared.
+       * This function is a key function because it ensures that the skiplists hold the elements 
+       * in an order we want it to and for multiple updates on key fetches the most recent one 
+       * Currently we are comparing seq numbers but we will have to change it to version stamps. 
+       * * List can have elements in following sequence 
+       * K1 Value1 version : 1 
+       * K2 Value2a version : 2
+       * K2 Value2 version : 1
+       * K3 Value3 version : 1
+       * For a get on K2, it should retunr K2 Value 2a.  
+       */
+      @Override
+      public int compareTo(KeyToSeqNumObject o) {
+        int compareOutput = ByteComparator.compareBytes(
+            this.getRegionkey(), 0, this.getRegionkey().length, o.getRegionkey(), 0, o.getRegionkey().length);
+        if (compareOutput != 0 )
+          return compareOutput;
+        
+        // If the keys are same and this is an object with dummy seq number, 
+        // return -1. This will ensure that ceiling function on a skip list will enumerate 
+        // all the entries and return the last one.   
+        if (this.getSeqNum() == Long.MIN_VALUE) 
+          return -1;
+        
+        // this is to just maintain consistency with the above statement. 
+        if (o.getSeqNum() == Long.MIN_VALUE) 
+          return 1;
+       
+        // minus operator pushes entries with lower seq number in the end so that 
+        // the order as mentioned above is maintained. And the entries with 
+        // higher version are fetched on a get. 
+        return this.getSeqNum().compareTo(o.getSeqNum()) * -1;  
+      }
+      
+      @Override
+      public boolean equals (Object o) {
+    	KeyToSeqNumObject obj = null;
+      	if (o == null)
+    		return false; 
+    	
+    	if (o instanceof KeyToSeqNumObject) 
+    		obj = (KeyToSeqNumObject)o;
+    	else
+    		return false;
+    	
+    	if (this.compareTo(obj) != 0)
+          return false;
+        else
+          return true;
+      }
+      
+      public int hashCode() {
+    	assert false : "hashCode not designed";
+    	return -1;
+      }
+      
+      byte[] getRegionkey() {
+        return regionkey;
+      }
+
+      public Long getSeqNum() {
+        return seqNum;
+      }
+
+      public void setSeqNum(Long seqNum) {
+        this.seqNum = seqNum;
+      }
+      
+      @Override
+      public String toString() {
+        return EntryEventImpl.deserialize(regionkey) + " {" + seqNum + "}";
+      }
+    }
+    
+    public interface HDFSEventQueue {
+      FlushObserver getFlushObserver();
+      
+      /** puts an event in the queue. */ 
+      public void put (long key, HDFSGatewayEventImpl event, int size);
+      
+      public SortedEventQueueIterator iterator(Region region);
+
+      public void rollover();
+
+      /** Get a value from the queue
+       * @throws IllegalStateException if this queue doesn't support get  
+       **/
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+          long minValue);
+
+      // Peeks a batch of size specified by batchSize
+      // And add the results to the array list
+      public void peek(ArrayList result);
+      
+      // Checks if there are elements to bee peeked 
+      public boolean isEmpty();
+      
+      // removes the event if it has already been peeked. 
+      public boolean remove(HDFSGatewayEventImpl event);
+      
+      // take care of the elements that were peeked 
+      // but were not removed after a batch dispatch 
+      // due to concurrency effects. 
+      public void handleRemainingElements(HashSet<Long> listToBeremoved);
+      
+      // clears the list. 
+      public void clear();
+      
+      // get the time when the last peek was done. 
+      public long getLastPeekTimeInMillis();
+    }
+    
+    class MultiRegionSortedQueue implements HDFSEventQueue {
+      ConcurrentMap<String, SortedEventQueue> regionToEventQueue = new ConcurrentHashMap<String, SortedEventQueue>();
+      volatile Set<SortedEventQueue> peekedQueues = Collections.EMPTY_SET;
+      private final AtomicBoolean peeking = new AtomicBoolean(false);
+      long lastPeekTimeInMillis = System.currentTimeMillis();
+      
+      private final FlushObserver flush = new FlushObserver() {
+        @Override
+        public AsyncFlushResult flush() {
+          final Set<AsyncFlushResult> flushes = new HashSet<AsyncFlushResult>();
+          for (SortedEventQueue queue : regionToEventQueue.values()) {
+            flushes.add(queue.getFlushObserver().flush());
+          }
+          
+          return new AsyncFlushResult() {
+            @Override
+            public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
+              long start = System.nanoTime();
+              long remaining = unit.toNanos(timeout);
+              for (AsyncFlushResult afr : flushes) {
+                if (!afr.waitForFlush(remaining, TimeUnit.NANOSECONDS)) {
+                  return false;
+                }
+                remaining -= (System.nanoTime() - start);
+              }
+              return true;
+            }
+          };
+        }
+        
+        @Override
+        public boolean shouldDrainImmediately() {
+          for (SortedEventQueue queue : regionToEventQueue.values()) {
+            if (queue.getFlushObserver().shouldDrainImmediately()) {
+              return true;
+            }
+          }
+          return false;
+        }
+      };
+      
+      @Override
+      public FlushObserver getFlushObserver() {
+        return flush;
+      }
+
+      @Override
+      public void put(long key, HDFSGatewayEventImpl event, int size) {
+        
+        String region = event.getRegionPath();
+        SortedEventQueue regionQueue = regionToEventQueue.get(region);
+        if(regionQueue == null) {
+          regionToEventQueue.putIfAbsent(region, new SortedEventQueue());
+          regionQueue = regionToEventQueue.get(region);
+        }
+        regionQueue.put(key, event, size);
+      }
+
+      @Override
+      public void peek(ArrayList result) {
+        // The elements that were peeked last time, have not been persisted to HDFS 
+        // yet. You cannot take out next batch until that is done.
+        if (!peeking.compareAndSet(false, true)) {
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
+          }
+          return;
+        }
+        //Maintain a separate set of peeked queues.
+        //All of these queues are statefull, and expect to be
+        //handleRemainingElements and clear to be called on
+        //them iff peek was called on them. However, new queues
+        //may be created in that time.
+        peekedQueues = Collections.newSetFromMap(new ConcurrentHashMap<SortedEventQueue, Boolean>(regionToEventQueue.size()));
+        
+        //Peek from all of the existing queues
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          if(!queue.isEmpty()) {
+            queue.peek(result);
+            peekedQueues.add(queue);
+          }
+        }
+        if (result.isEmpty()) 
+          peeking.set(false);
+        
+        
+        this.lastPeekTimeInMillis = System.currentTimeMillis();
+      }
+
+      @Override
+      public boolean isEmpty() {
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          if(!queue.isEmpty()) {
+            return false;
+          }
+        }
+        return true;
+      }
+
+      @Override
+      public boolean remove(HDFSGatewayEventImpl event) {
+        String region = event.getRegionPath();
+        SortedEventQueue regionQueue = regionToEventQueue.get(region);
+        return regionQueue.remove(event);
+      }
+
+      @Override
+      public void handleRemainingElements(HashSet<Long> removedSeqNums){
+        for(SortedEventQueue queue : peekedQueues) {
+          queue.handleRemainingElements(removedSeqNums);
+        }
+        peekedQueues.clear();
+        peeking.set(false);
+      }
+
+      @Override
+      public void clear() {
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          queue.clear();
+        }
+        peekedQueues.clear();
+        peeking.set(false);
+      }
+
+      @Override
+      public long getLastPeekTimeInMillis() {
+        return this.lastPeekTimeInMillis;
+      }
+
+      @Override
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+          long minValue) {
+        SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
+        if(queue == null) {
+          return null;
+        }
+        return queue.get(region, regionKey, minValue);
+      }
+
+      @Override
+      public SortedEventQueueIterator iterator(Region region) {
+        SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
+        if(queue == null) {
+          return new SortedEventQueueIterator(new LinkedBlockingDeque<SortedEventBuffer>());
+        }
+        return queue.iterator(region);
+      }
+
+      @Override
+      public void rollover() {
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          queue.rollover();
+        }
+      }
+    }
+    
+    class EventQueue implements HDFSEventQueue {
+      private final SignalledFlushObserver flush = new SignalledFlushObserver();
+      private final BlockingQueue<Long> eventSeqNumQueue = new LinkedBlockingQueue<Long>();
+      private final BlockingQueue<Long> peekedEvents = new LinkedBlockingQueue<Long>();
+      private long lastPeekTimeInMillis = System.currentTimeMillis(); 
+      
+      public EventQueue() {
+        
+      }
+      
+      @Override
+      public FlushObserver getFlushObserver() {
+        return flush;
+      }
+
+      @Override
+      public void put(long key, HDFSGatewayEventImpl event, int size) {
+        put(key);
+      }
+      public void put (long key) {
+        eventSeqNumQueue.add(key);
+        flush.push();
+        incQueueSize();
+      }
+      
+      
+      @Override
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+          long minValue) {
+        throw new InternalGemFireError("Get not supported on unsorted queue");
+      }
+      
+      @Override
+      public void peek(ArrayList peekedEntries) {
+        if (peekedEvents.size() != 0) {
+          return;
+        }
+        
+        for(int size=0; size < batchSize; ) {
+          Long seqNum = eventSeqNumQueue.peek();
+          if (seqNum == null) {
+            // queue is now empty, return
+            break;
+          }
+          Object object = getNoLRU(seqNum, true, false, false);
+          if (object != null) {
+            peekedEvents.add(seqNum);
+            size += ((HDFSGatewayEventImpl)object).getSizeOnHDFSInBytes(!isBucketSorted);
+            peekedEntries.add(object);
+
+          } else {
+            logger.debug("The entry corresponding to the sequence number " + 
+               seqNum +  " is missing. This can happen when an entry is already" +
+               "dispatched before a bucket moved.");
+            // event is being ignored. Decrease the queue size
+            decQueueSize();
+            flush.pop(1);
+           
+          }
+          eventSeqNumQueue.poll();
+          
+        }
+        this.lastPeekTimeInMillis  = System.currentTimeMillis();
+      }
+
+      @Override
+      public boolean isEmpty() {
+        return eventSeqNumQueue.isEmpty();
+      }
+
+      
+      @Override
+      public boolean remove(HDFSGatewayEventImpl event) {
+        boolean deleted = peekedEvents.remove(event.getShadowKey());
+        if (deleted)
+         decQueueSize();
+        return deleted;
+      }
+
+      @Override
+      // It looks like that there is no need for this function 
+      // in EventQueue.
+      public void handleRemainingElements(HashSet<Long> removedSeqNums) {
+        flush.pop(removedSeqNums.size());
+        eventSeqNumQueue.addAll(peekedEvents);
+        peekedEvents.clear();
+      }
+
+      @Override
+      public void clear() {
+        flush.clear();
+        decQueueSize(eventSeqNumQueue.size());
+        eventSeqNumQueue.clear();
+        decQueueSize(peekedEvents.size());
+        peekedEvents.clear();
+      }
+
+      @Override
+      public long getLastPeekTimeInMillis() {
+        return this.lastPeekTimeInMillis;
+      }
+      @Override
+      public SortedEventQueueIterator iterator(Region region) {
+        throw new InternalGemFireError("not supported on unsorted queue");
+      }
+      @Override
+      public void rollover() {
+        throw new InternalGemFireError("not supported on unsorted queue");
+      }
+    }
+    
+    class SortedEventQueue implements HDFSEventQueue {
+      private final SignalledFlushObserver flush = new SignalledFlushObserver();
+
+      // List of all the skip lists that hold the data
+      final Deque<SortedEventBuffer> queueOfLists = 
+          new LinkedBlockingDeque<SortedEventBuffer>();
+      
+      // This points to the tail of the queue
+      volatile SortedEventBuffer currentSkipList = new SortedEventBuffer();
+      
+      private final AtomicBoolean peeking = new AtomicBoolean(false);
+      
+      private long lastPeekTimeInMillis = System.currentTimeMillis(); 
+      
+      public SortedEventQueue() {
+        queueOfLists.add(currentSkipList);
+      }
+      
+      @Override
+      public FlushObserver getFlushObserver() {
+        return flush;
+      }
+
+      public boolean remove(HDFSGatewayEventImpl event) {
+        SortedEventBuffer eventBuffer = queueOfLists.peek();
+        if (eventBuffer != null) {
+          return eventBuffer.copyToBuffer(event);
+        }
+        else {
+          // This can happen when the queue is cleared because of bucket movement 
+          // before the remove is called. 
+          return true;
+        }
+      } 
+
+      public void clear() {
+        flush.clear();
+        for (SortedEventBuffer buf : queueOfLists) {
+          decQueueSize(buf.size());
+          buf.clear();
+        }
+        
+        queueOfLists.clear();
+        rollList(false);
+
+        peeking.set(false);
+      }
+
+      public boolean isEmpty() {
+        if (queueOfLists.size() == 1)
+          return queueOfLists.peek().isEmpty();
+        return false;
+      }
+
+      public void put(long key, HDFSGatewayEventImpl event, int eventSize) {
+        if (logger.isTraceEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Inserting key " + event + " into list " + System.identityHashCode(currentSkipList)));
+        }
+        putInList(new KeyToSeqNumObject(((HDFSGatewayEventImpl)event).getSerializedKey(), key), 
+            eventSize);
+      }
+
+      private void putInList(KeyToSeqNumObject entry, int sizeInBytes) {
+        // It was observed during testing that peek can start peeking 
+        // elements from a list to which a put is happening. This happens 
+        // when the peek changes the value of currentSkiplist to a new list 
+        // but the put continues to write to an older list. 
+        // So there is a possibility that an element is added to the list 
+        // that has already been peeked. To handle this case, in handleRemainingElements
+        // function we re-add the elements that were not peeked. 
+        if (currentSkipList.add(entry, sizeInBytes) == null) {
+          flush.push();
+          incQueueSize();
+        }
+      }
+
+      public void rollover(boolean forceRollover) {
+        if (currentSkipList.bufferSize() >= batchSize || forceRollover) {
+          rollList(forceRollover);
+        }
+      }
+      
+      /**
+       * Ideally this function should be called from a thread periodically to 
+       * rollover the skip list when it is above a certain size. 
+       * 
+       */
+      public void rollover() {
+        rollover(false);
+      }
+
+      public void peek(ArrayList peekedEntries) {
+        // The elements that were peeked last time, have not been persisted to HDFS 
+        // yet. You cannot take out next batch until that is done.
+        if (!peeking.compareAndSet(false, true)) {
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
+          }
+          return;
+        }
+
+        if (queueOfLists.size() == 1) {
+          rollList(false);
+        }
+        
+        Assert.assertTrue(queueOfLists.size() > 1, "Cannot peek from head of queue");
+        BufferIterator itr = queueOfLists.peek().iterator();
+        while (itr.hasNext()) {
+          KeyToSeqNumObject entry = itr.next();
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peeking key " + entry + " from list " + System.identityHashCode(queueOfLists.peek())));
+          }
+
+          HDFSGatewayEventImpl ev = itr.value();
+          ev.copyOffHeapValue();
+          peekedEntries.add(ev);
+        }
+        
+        // discard an empty batch as it is not processed and will plug up the
+        // queue
+        if (peekedEntries.isEmpty()) {
+          SortedEventBuffer empty = queueOfLists.remove();
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding empty batch " + empty));
+          }
+          peeking.set(false);
+        }
+        this.lastPeekTimeInMillis = System.currentTimeMillis();
+      }
+
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey, long key) {
+        KeyToSeqNumObject event = new KeyToSeqNumObject(regionKey, key);
+        Iterator<SortedEventBuffer> queueIterator = queueOfLists.descendingIterator();
+        while (queueIterator.hasNext()) {
+          HDFSGatewayEventImpl evt = queueIterator.next().getFromQueueOrBuffer(event);
+          if (evt != null) {
+            return evt;
+          }
+        }
+        return null;
+      }
+      
+      public void handleRemainingElements(HashSet<Long> removedSeqNums) {
+        if (!peeking.get()) {
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Not peeked, just cleaning up empty batch; current list is " + currentSkipList));
+          }
+          return;
+        }
+
+        Assert.assertTrue(queueOfLists.size() > 1, "Cannot remove only event list");
+
+        // all done with the peeked elements, okay to throw away now
+        SortedEventBuffer buf = queueOfLists.remove();
+        SortedEventBuffer.BufferIterator bufIter = buf.iterator();
+        // Check if the removed buffer has any extra events. If yes, check if these extra 
+        // events are part of region. If yes, reinsert these as they were probably inserted 
+        // into this list while it was being peeked. 
+        while (bufIter.hasNext()) {
+          KeyToSeqNumObject key = bufIter.next();
+          if (!removedSeqNums.contains(key.getSeqNum())) {
+            HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) getNoLRU(key.getSeqNum(), true, false, false);
+            if (evt != null) {
+              flush.push();
+              incQueueSize();
+              queueOfLists.getFirst().add(key, evt.getSizeOnHDFSInBytes(!isBucketSorted));
+            }
+          }
+        }
+
+        decQueueSize(buf.size());
+        flush.pop(buf.size());
+        peeking.set(false);
+      }
+      
+      public long getLastPeekTimeInMillis(){
+        return this.lastPeekTimeInMillis;
+      }
+      
+      NavigableSet<KeyToSeqNumObject> getPeeked() {
+        assert peeking.get();
+        return queueOfLists.peek().keySet();
+      }
+      
+      private synchronized void rollList(boolean forceRollover) {
+        if (currentSkipList.bufferSize() < batchSize && queueOfLists.size() > 1 && !forceRollover)
+          return;
+        SortedEventBuffer tmp = new SortedEventBuffer();
+        queueOfLists.add(tmp);
+        if (logger.isTraceEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Rolling over list from " + currentSkipList + " to list " + tmp));
+        }
+        currentSkipList = tmp;
+      }
+
+      @Override
+      public SortedEventQueueIterator iterator(Region region) {
+        return new SortedEventQueueIterator(queueOfLists);
+      }
+    }
+    
+    public class SortedEventBuffer {
+      private final HDFSGatewayEventImpl NULL = new HDFSGatewayEventImpl();
+  
+      private final ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl> events = new ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl>();
+      
+      private int bufferSize = 0;
+      
+      public boolean copyToBuffer(HDFSGatewayEventImpl event) {
+        KeyToSeqNumObject key = new KeyToSeqNumObject(event.getSerializedKey(), event.getShadowKey());
+        if (events.containsKey(key)) {
+          // After an event has been delivered in a batch, we copy it into the
+          // buffer so that it can be returned by an already in progress iterator.
+          // If we do not do this it is possible to miss events since the hoplog
+          // iterator uses a fixed set of files that are determined when the
+          // iterator is created.  The events will be GC'd once the buffer is no
+          // longer strongly referenced.
+          HDFSGatewayEventImpl oldVal = events.put(key, event);
+          assert oldVal == NULL;
+  
+          return true;
+        }
+        // If the primary lock is being relinquished, the events is cleared and probaly that is
+        // why we are here. return true if the primary lock is being relinquished
+        if (releasingPrimaryLock.get())
+          return true;
+        else 
+          return false;
+      }
+  
+      public HDFSGatewayEventImpl getFromQueueOrBuffer(KeyToSeqNumObject key) {
+        KeyToSeqNumObject result = events.ceilingKey(key);
+        if (result != null && Bytes.compareTo(key.getRegionkey(), result.getRegionkey()) == 0) {
+          
+          // first try to fetch the buffered event to make it fast. 
+          HDFSGatewayEventImpl evt = events.get(result);
+          if (evt != NULL) {
+            return evt;
+          }
+          // now try to fetch the event from the queue region
+          evt = (HDFSGatewayEventImpl) getNoLRU(result.getSeqNum(), true, false, false);
+          if (evt != null) {
+            return evt;
+          }
+          
+          // try to fetch again from the buffered events to avoid a race between 
+          // item deletion and the above two statements. 
+          evt = events.get(result);
+          if (evt != NULL) {
+            return evt;
+          }
+          
+        }
+        return null;
+      }
+  
+      public HDFSGatewayEventImpl add(KeyToSeqNumObject key, int sizeInBytes) {
+        bufferSize += sizeInBytes;
+        return events.put(key, NULL);
+      }
+  
+      public void clear() {
+        events.clear();
+      }
+  
+      public boolean isEmpty() {
+        return events.isEmpty();
+      }
+  
+      public int bufferSize() {
+        return bufferSize;
+      }
+      public int size() {
+        return events.size();
+      }
+      public NavigableSet<KeyToSeqNumObject> keySet() {
+        return events.keySet();
+      }
+  
+      public BufferIterator iterator() {
+        return new BufferIterator(events.keySet().iterator());
+      }
+  
+      public class BufferIterator implements Iterator<KeyToSeqNumObject> {
+        private final Iterator<KeyToSeqNumObject> src;
+
+        private KeyToSeqNumObject currentKey;
+        private HDFSGatewayEventImpl currentVal;
+
+        private KeyToSeqNumObject nextKey;
+        private HDFSGatewayEventImpl nextVal;
+        
+        public BufferIterator(Iterator<KeyToSeqNumObject> src) {
+          this.src = src;
+          moveNext();
+        }
+  
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+        
+        @Override
+        public boolean hasNext() {
+          return nextVal != null;
+        }
+        
+        @Override
+        public KeyToSeqNumObject next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          
+          currentKey = nextKey;
+          currentVal = nextVal;
+          
+          moveNext();
+          
+          return currentKey;
+        }
+  
+        public KeyToSeqNumObject key() {
+          assert currentKey != null;
+          return currentKey;
+        }
+        
+        public HDFSGatewayEventImpl value() {
+          assert currentVal != null;
+          return currentVal;
+        }
+        
+        private void moveNext() {
+          while (src.hasNext()) {
+            nextKey = src.next();
+            nextVal = getFromQueueOrBuffer(nextKey);
+            if (nextVal != null) {
+              return;
+            } else if (logger.isDebugEnabled() || VERBOSE) {
+              logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "The entry corresponding to"
+                  + " the sequence number " + nextKey.getSeqNum() 
+                  + " is missing. This can happen when an entry is already" 
+                  + " dispatched before a bucket moved."));
+            }
+          }
+          nextKey = null;
+          nextVal = null;
+        }
+      }
+    }
+  
+    public final class SortedEventQueueIterator implements CursorIterator<HDFSGatewayEventImpl> {
+      /** the iterators to merge */
+      private final List<SortedEventBuffer.BufferIterator> iters;
+  
+      /** the current iteration value */
+      private HDFSGatewayEventImpl value;
+  
+      public SortedEventQueueIterator(Deque<SortedEventBuffer> queueOfLists) {
+        iters = new ArrayList<SortedEventBuffer.BufferIterator>();
+        for (Iterator<SortedEventBuffer> iter = queueOfLists.descendingIterator(); iter.hasNext();) {
+          SortedEventBuffer.BufferIterator buf = iter.next().iterator();
+          if (buf.hasNext()) {
+            buf.next();
+            iters.add(buf);
+          }
+        }
+      }
+      
+      public void close() {
+        value = null;
+        iters.clear();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return !iters.isEmpty();
+      }
+      
+      @Override
+      public HDFSGatewayEventImpl next() {
+        if (!hasNext()) {
+          throw new UnsupportedOperationException();
+        }
+        
+        int diff = 0;
+        KeyToSeqNumObject min = null;
+        SortedEventBuffer.BufferIterator cursor = null;
+        
+        for (Iterator<SortedEventBuffer.BufferIterator> merge = iters.iterator(); merge.hasNext(); ) {
+          SortedEventBuffer.BufferIterator buf = merge.next();
+          KeyToSeqNumObject tmp = buf.key();
+          if (min == null || (diff = Bytes.compareTo(tmp.regionkey, min.regionkey)) < 0) {
+            min = tmp;
+            cursor = buf;
+            
+          } else if (diff == 0 && !advance(buf, min)) {
+            merge.remove();
+          }
+        }
+        
+        value = cursor.value();
+        assert value != null;
+
+        if (!advance(cursor, min)) {
+          iters.remove(cursor);
+        }
+        return current();
+      }
+      
+      @Override
+      public final HDFSGatewayEventImpl current() {
+        return value;
+      }
+
+      @Override 
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+      
+      private boolean advance(SortedEventBuffer.BufferIterator iter, KeyToSeqNumObject key) {
+        while (iter.hasNext()) {
+          if (Bytes.compareTo(iter.next().regionkey, key.regionkey) > 0) {
+            return true;
+          }
+        }
+        return false;
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
new file mode 100644
index 0000000..c8b7b28
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventQueueIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.HDFSRegionMap;
+import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import org.apache.hadoop.hbase.util.Bytes;
+
+@SuppressWarnings("rawtypes")
+public class HDFSEntriesSet extends AbstractSet {
+  private final IteratorType type;
+
+  private final HoplogOrganizer hoplogs;
+  private final HDFSBucketRegionQueue brq;
+  
+  private final BucketRegion region; 
+  private final ReferenceQueue<HDFSIterator> refs;
+  
+  public HDFSEntriesSet(BucketRegion region, HDFSBucketRegionQueue brq, 
+      HoplogOrganizer hoplogs, IteratorType type, ReferenceQueue<HDFSIterator> refs) {
+    this.region = region;
+    this.brq = brq;
+    this.hoplogs = hoplogs;
+    this.type = type;
+    this.refs = refs;
+  }
+  
+  @Override
+  public HDFSIterator iterator() {
+    HDFSIterator iter = new HDFSIterator(type, region.getPartitionedRegion(), true);
+    if (refs != null) {
+      // we can't rely on an explicit close but we need to free resources
+      //
+      // This approach has the potential to cause excessive memory load and/or
+      // GC problems if an app holds an iterator ref too long. A lease-based
+      // approach where iterators are automatically for X secs of inactivity is
+      // a potential alternative (but may require tuning for certain
+      // applications)
+      new WeakReference<HDFSEntriesSet.HDFSIterator>(iter, refs);
+    }
+    return iter;
+  }
+
+  @Override
+  public int size() {
+    // TODO this is the tortoise version, need a fast version for estimation
+    // note: more than 2^31-1 records will cause this counter to wrap
+    int size = 0;
+    HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
+    try {
+      while (iter.hasNext()) {
+        if (includeEntry(iter.next())) {
+          size++;
+        }
+      }
+    } finally {
+      iter.close();
+    }
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
+    try {
+      while (iter.hasNext()) {
+        if (includeEntry(iter.next())) {
+          return false;
+        }
+      }
+    } finally {
+      iter.close();
+    }
+    return true;
+  }
+
+  private boolean includeEntry(Object val) {
+    if (val instanceof HDFSGatewayEventImpl) {
+      HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) val;
+      if (evt.getOperation().isDestroy()) {
+        return false;
+      }
+    } else if (val instanceof PersistedEventImpl) {
+      PersistedEventImpl evt = (PersistedEventImpl) val;
+      if (evt.getOperation().isDestroy()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public class HDFSIterator implements Iterator {
+    private final IteratorType type;
+    private final boolean deserialize;
+    
+    private final SortedEventQueueIterator queue;
+    private final HoplogIterator<byte[], SortedHoplogPersistedEvent> hdfs;
+    private Iterator txCreatedEntryIterator;
+    
+    private boolean queueNext;
+    private boolean hdfsNext;
+    private boolean forUpdate;
+    private boolean hasTxEntry;
+
+    private byte[] currentHdfsKey;
+
+    public HDFSIterator(IteratorType type, Region region, boolean deserialize) {
+      this.type = type;
+      this.deserialize = deserialize;
+
+      // Check whether the queue has become primary here.
+      // There could be some time between bucket becoming a primary 
+      // and underlying queue becoming a primary, so isPrimaryWithWait() 
+      // waits for some time for the queue to become a primary on this member
+      if (!brq.getBucketAdvisor().isPrimaryWithWait()) {
+        InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
+            .basicGetPrimaryMember();
+        throw new PrimaryBucketException("Bucket " + brq.getName()
+            + " is not primary. Current primary holder is " + primaryHolder);
+      }
+      // We are deliberating NOT sync'ing while creating the iterators.  If done
+      // in the correct order, we may get duplicates (due to an in-progress
+      // flush) but we won't miss any entries.  The dupes will be eliminated
+      // during iteration.
+      queue = brq.iterator(region);
+      advanceQueue();
+      
+      HoplogIterator<byte[], SortedHoplogPersistedEvent> tmp = null;
+      try {
+        tmp = hoplogs.scan();
+      } catch (IOException e) {
+        HDFSEntriesSet.this.region.checkForPrimary();
+        throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
+      }
+      
+      hdfs = tmp;
+      if (hdfs != null) {
+        advanceHdfs();
+      }
+    }
+    
+    @Override
+    public boolean hasNext() {
+      boolean nonTxHasNext = hdfsNext || queueNext;
+      if (!nonTxHasNext && this.txCreatedEntryIterator != null) {
+        this.hasTxEntry = this.txCreatedEntryIterator.hasNext();
+        return this.hasTxEntry;
+      }
+      return nonTxHasNext;
+    }
+    
+    @Override
+    public Object next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (hasTxEntry) {
+        hasTxEntry = false;
+        return this.txCreatedEntryIterator.next();
+      }
+
+      Object val;
+      if (!queueNext) {
+        val = getFromHdfs();
+        advanceHdfs();
+        
+      } else if (!hdfsNext) {
+        val = getFromQueue();
+        advanceQueue();
+        
+      } else {
+        byte[] qKey = queue.current().getSerializedKey();
+        byte[] hKey = this.currentHdfsKey;
+        
+        int diff = Bytes.compareTo(qKey, hKey);
+        if (diff < 0) {
+          val = getFromQueue();
+          advanceQueue();
+          
+        } else if (diff == 0) {
+          val = getFromQueue();
+          advanceQueue();
+
+          // ignore the duplicate
+          advanceHdfs();
+
+        } else {
+          val = getFromHdfs();
+          advanceHdfs();
+        }
+      }
+      return val;
+    }
+    
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+    
+    public void close() {
+      if (queueNext) {
+        queue.close();
+      }
+
+      if (hdfsNext) {
+        hdfs.close();
+      }
+    }
+
+    private Object getFromQueue() {
+      HDFSGatewayEventImpl evt = queue.current();
+      if (type == null) {
+        return evt;
+      }
+      
+      switch (type) {
+      case KEYS:
+        byte[] key = evt.getSerializedKey();
+        return deserialize ? EntryEventImpl.deserialize(key) : key;
+        
+      case VALUES:
+        return evt.getValue();
+        
+      default:
+        Object keyObj = EntryEventImpl.deserialize(evt.getSerializedKey());
+        if(keyObj instanceof KeyWithRegionContext) {
+          ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
+        }
+        return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, evt, true, forUpdate);
+      }
+    }
+
+    private Object getFromHdfs() {
+      if (type == null) {
+        return hdfs.getValue();
+      }
+      
+      switch (type) {
+      case KEYS:
+        byte[] key = this.currentHdfsKey;
+        return deserialize ? EntryEventImpl.deserialize(key) : key;
+        
+      case VALUES:
+        PersistedEventImpl evt = hdfs.getValue();
+        return evt.getValue();
+        
+      default:
+        Object keyObj = EntryEventImpl.deserialize(this.currentHdfsKey);
+        if(keyObj instanceof KeyWithRegionContext) {
+          ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
+        }
+        return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, hdfs.getValue(), true, forUpdate);
+      }
+    }
+    
+    private void advanceHdfs() {
+      if (hdfsNext = hdfs.hasNext()) {
+        try {
+          this.currentHdfsKey = hdfs.next();
+        } catch (IOException e) {
+          region.checkForPrimary();
+          throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
+        }
+      } else {
+        this.currentHdfsKey = null;
+        hdfs.close();
+      }
+    }
+    
+    private void advanceQueue() {
+      if (queueNext = queue.hasNext()) {
+        queue.next();
+      } else {
+        brq.checkForPrimary();
+        queue.close();
+      }
+    }
+    
+    public void setForUpdate(){
+      this.forUpdate = true;
+    }
+    
+    /**MergeGemXDHDFSToGFE not sure of this function is required */ 
+    /*public void setTXState(TXState txState) {
+      TXRegionState txr = txState.getTXRegionState(region);
+      if (txr != null) {
+        txr.lock();
+        try {
+          this.txCreatedEntryIterator = txr.getCreatedEntryKeys().iterator();
+        }
+        finally{
+          txr.unlock();
+        }
+      }
+    }*/
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
new file mode 100644
index 0000000..607650f
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.i18n.StringId;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+
+/**
+ * Listener that persists events to HDFS
+ *
+ */
+public class HDFSEventListener implements AsyncEventListener {
+  private final LogWriterI18n logger;
+  private volatile boolean senderStopped = false;
+
+  private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
+
+  public HDFSEventListener(LogWriterI18n logger) {
+    this.logger = logger;
+  }
+  
+  @Override
+  public void close() {
+    senderStopped = true;
+  }
+  
+  @Override
+  public boolean processEvents(List<AsyncEvent> events) {
+    if (Hoplog.NOP_WRITE) {
+      return true;
+    }
+    
+    // The list of events that async queue receives are sorted at the
+    // bucket level. Events for multiple regions are concatenated together.
+    // Events for multiple buckets are sent which are concatenated
+    // one after the other for e.g.
+    //
+    // <Region1, Key1, bucket1>, <Region1, Key19, bucket1>, 
+    // <Region1, Key4, bucket2>, <Region1, Key6, bucket2>
+    // <Region2, Key1, bucket1>, <Region2, Key4, bucket1>
+    // ..
+    
+    Region previousRegion = null;
+    int prevBucketId = -1;
+    ArrayList<QueuedPersistentEvent> list = null;
+    boolean success = false;
+    try {
+      //Back off if we are experiencing failures
+      failureTracker.sleepIfRetry();
+      
+      HoplogOrganizer bucketOrganizer = null; 
+      for (AsyncEvent asyncEvent : events) {
+        if (senderStopped){
+          failureTracker.failure();
+          if (logger.fineEnabled()) {
+            logger.fine("HDFSEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
+          }
+          return false;
+        }
+        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
+        Region region = hdfsEvent.getRegion();
+        
+        if (prevBucketId != hdfsEvent.getBucketId() || region != previousRegion){
+          if (prevBucketId != -1) {
+            bucketOrganizer.flush(list.iterator(), list.size());
+            success=true;
+            if (logger.fineEnabled()) {
+              logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
+            }
+          }
+          bucketOrganizer = getOrganizer((PartitionedRegion) region, hdfsEvent.getBucketId());
+          // Bucket organizer can be null only when the bucket has moved. throw an exception so that the 
+          // batch is discarded. 
+          if (bucketOrganizer == null)
+            throw new BucketMovedException("Bucket moved. BucketId: " + hdfsEvent.getBucketId() +  " HDFSRegion: " + region.getName());
+          list = new  ArrayList<QueuedPersistentEvent>();
+        }
+        try {
+          //TODO:HDFS check if there is any serialization overhead
+          list.add(new SortedHDFSQueuePersistedEvent(hdfsEvent));
+        } catch (ClassNotFoundException e) {
+          //TODO:HDFS add localized string
+          logger.warning(new StringId(0, "Error while converting HDFSGatewayEvent to PersistedEventImpl."), e);
+          return false;
+        }
+        prevBucketId = hdfsEvent.getBucketId();
+        previousRegion = region;
+        
+      }
+      if (bucketOrganizer != null) {
+        bucketOrganizer.flush(list.iterator(), list.size());
+        success = true;
+        
+        if (logger.fineEnabled()) {
+          logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
+        }
+      }
+    } catch (IOException e) {
+      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+      return false;
+    }
+    catch (ForceReattemptException e) {
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    }
+    catch(PrimaryBucketException e) {
+      //do nothing, the bucket is no longer primary so we shouldn't get the same
+      //batch next time.
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    }
+    catch(BucketMovedException e) {
+      //do nothing, the bucket is no longer primary so we shouldn't get the same
+      //batch next time.
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    }
+    catch (CacheClosedException e) {
+      if (logger.fineEnabled())
+        logger.fine(e);
+      // exit silently
+      return false;
+    } catch (InterruptedException e1) {
+      if (logger.fineEnabled())
+        logger.fine(e1);
+      return false;
+    } finally {
+      failureTracker.record(success);
+    }
+
+    return true;
+  }
+  
+  private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
+    BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
+    if (br == null) {
+      // got rebalanced or something
+      throw new PrimaryBucketException("Bucket region is no longer available " + bucketId + region);
+    }
+
+    return br.getHoplogOrganizer();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
new file mode 100644
index 0000000..0860e75
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+
+/**
+ * Current use of this class is limited to ignoring the Bulk DML operations. 
+ * 
+ *
+ */
+public class HDFSEventQueueFilter implements GatewayEventFilter{
+  private LogWriterI18n logger;
+  
+  public HDFSEventQueueFilter(LogWriterI18n logger) {
+    this.logger = logger; 
+  }
+  @Override
+  public void close() {
+    
+  }
+
+  @Override
+  public boolean beforeEnqueue(GatewayQueueEvent event) {
+    Operation op = event.getOperation();
+    
+    
+    /* MergeGemXDHDFSToGFE - Disabled as it is gemxd specific 
+    if (op == Operation.BULK_DML_OP) {
+     // On accessors there are no parallel queues, so with the 
+     // current logic, isSerialWanEnabled function in LocalRegion 
+     // always returns true on an accessor. So when a bulk dml 
+     // op is fired on accessor, this behavior results in distribution 
+     // of the bulk dml operation to other members. To avoid putting 
+     // of this bulk dml in parallel queues, added this filter. This 
+     // is not the efficient way as the filters are used before inserting 
+     // in the queue. The bulk dmls should be blocked before they are distributed.
+     if (logger.fineEnabled())
+       logger.fine( "HDFSEventQueueFilter:beforeEnqueue: Disallowing insertion of a bulk DML in HDFS queue.");
+      return false;
+    }*/
+    
+    return true;
+  }
+
+  @Override
+  public boolean beforeTransmit(GatewayQueueEvent event) {
+   // No op
+   return true;
+  }
+
+  @Override
+  public void afterAcknowledgement(GatewayQueueEvent event) {
+    // No op
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
new file mode 100644
index 0000000..db99e7e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+
+/**
+ * Gateway event extended for HDFS functionality 
+ *
+ */
+public class HDFSGatewayEventImpl extends GatewaySenderEventImpl {
+  
+  private static final long serialVersionUID = 4642852957292192406L;
+  protected transient boolean keyIsSerialized = false;
+  protected byte[] serializedKey = null; 
+  protected VersionTag versionTag; 
+  
+  public HDFSGatewayEventImpl(){
+  }
+  
+  @Retained
+  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+      Object substituteValue)
+      throws IOException {
+    super(operation, event, substituteValue);
+    initializeHDFSGatewayEventObject(event);
+  }
+
+  @Retained
+  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+      Object substituteValue, boolean initialize, int bucketId) throws IOException {
+    super(operation, event,substituteValue, initialize, bucketId);
+    initializeHDFSGatewayEventObject(event);
+  }
+
+  @Retained
+  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+      Object substituteValue, boolean initialize) throws IOException {
+    super(operation, event, substituteValue, initialize);
+    initializeHDFSGatewayEventObject(event);
+  }
+
+  protected HDFSGatewayEventImpl(HDFSGatewayEventImpl offHeapEvent) {
+    super(offHeapEvent);
+    this.keyIsSerialized = offHeapEvent.keyIsSerialized;
+    this.serializedKey = offHeapEvent.serializedKey;
+    this.versionTag = offHeapEvent.versionTag;
+  }
+  
+  @Override
+  protected GatewaySenderEventImpl makeCopy() {
+    return new HDFSGatewayEventImpl(this);
+  }
+
+  private void initializeHDFSGatewayEventObject(EntryEvent event)
+      throws IOException {
+
+    serializeKey();
+    versionTag = ((EntryEventImpl)event).getVersionTag();
+    if (versionTag != null && versionTag.getMemberID() == null) {
+      versionTag.setMemberID(((LocalRegion)getRegion()).getVersionMember());
+    }
+  }
+
+  private void serializeKey() throws IOException {
+    if (!keyIsSerialized && isInitialized())
+    {
+      this.serializedKey = CacheServerHelper.serialize(this.key);
+      keyIsSerialized = true;
+    } 
+  }
+  /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
+  /*@Override
+  protected StoredObject obtainOffHeapValueBasedOnOp(EntryEventImpl event,
+      boolean hasNonWanDispatcher) {
+    return  event.getOffHeapNewValue();
+  }*/
+  
+  /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
+  /*@Override
+  protected Object obtainHeapValueBasedOnOp(EntryEventImpl event,
+      boolean hasNonWanDispatcher) {
+    return   event.getRawNewValue(shouldApplyDelta());
+  }*/
+  
+  @Override
+  protected boolean shouldApplyDelta() {
+    return true;
+  }
+
+  
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeObject(this.versionTag, out);
+    
+  }
+  
+  @Override
+  protected void serializeKey(DataOutput out) throws IOException {
+    DataSerializer.writeByteArray((byte[])this.serializedKey, out);
+  }
+  
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.versionTag = (VersionTag)DataSerializer.readObject(in);
+  }
+  
+  @Override
+  protected void deserializeKey(DataInput in) throws IOException,
+    ClassNotFoundException {
+    this.serializedKey = DataSerializer.readByteArray(in);
+    this.key = BlobHelper.deserializeBlob(this.serializedKey,
+        InternalDataSerializer.getVersionForDataStreamOrNull(in), null);
+    keyIsSerialized = true;
+  }
+
+  @Override
+  public int getDSFID() {
+    
+    return HDFS_GATEWAY_EVENT_IMPL;
+  }
+  public byte[] getSerializedKey() {
+    
+    return this.serializedKey;
+  }
+  
+  public VersionTag getVersionTag() {
+    
+    return this.versionTag;
+  }
+  
+  /**
+   * Returns the size on HDFS of this event  
+   * @param writeOnly
+   */
+  public int getSizeOnHDFSInBytes(boolean writeOnly) {
+  
+    if (writeOnly)
+      return UnsortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,  
+          getSerializedValueSize(), this.versionTag);
+    else
+      return SortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,  
+          getSerializedValueSize(), this.versionTag);
+  
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
new file mode 100644
index 0000000..740a607
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Contains utility functions
+ *
+ *
+ */
+public class HDFSIntegrationUtil {
+
+  public static <K, V> AsyncEventQueue createDefaultAsyncQueueForHDFS(Cache cache, boolean writeOnly, String regionPath) {
+    return createAsyncQueueForHDFS(cache, regionPath, writeOnly, null);
+  }
+
+  private static AsyncEventQueue createAsyncQueueForHDFS(Cache cache, String regionPath, boolean writeOnly,
+      HDFSStore configView) {
+    LogWriterI18n logger = cache.getLoggerI18n();
+    String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
+
+    if (configView == null) {
+      configView = new HDFSStoreFactoryImpl(cache).getConfigView();
+    }
+    
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(configView.getBatchSize());
+    factory.setPersistent(configView.getBufferPersistent());
+    factory.setDiskStoreName(configView.getDiskStoreName());
+    factory.setMaximumQueueMemory(configView.getMaxMemory());
+    factory.setBatchTimeInterval(configView.getBatchInterval());
+    factory.setDiskSynchronous(configView.getSynchronousDiskWrite());
+    factory.setDispatcherThreads(configView.getDispatcherThreads());
+    factory.setParallel(true);
+    factory.addGatewayEventFilter(new HDFSEventQueueFilter(logger));
+    ((AsyncEventQueueFactoryImpl) factory).setBucketSorted(!writeOnly);
+    ((AsyncEventQueueFactoryImpl) factory).setIsHDFSQueue(true);
+
+    AsyncEventQueue asyncQ = null;
+
+    if (!writeOnly)
+      asyncQ = factory.create(defaultAsyncQueueName, new HDFSEventListener(cache.getLoggerI18n()));
+    else
+      asyncQ = factory.create(defaultAsyncQueueName, new HDFSWriteOnlyStoreEventListener(cache.getLoggerI18n()));
+
+    logger.fine("HDFS: async queue created for HDFS. Id: " + asyncQ.getId() + ". Disk store: "
+        + asyncQ.getDiskStoreName() + ". Batch size: " + asyncQ.getBatchSize() + ". bucket sorted:  " + !writeOnly);
+    return asyncQ;
+
+  }
+
+  public static void createAndAddAsyncQueue(String regionPath, RegionAttributes regionAttributes, Cache cache) {
+    if (!regionAttributes.getDataPolicy().withHDFS()) {
+      return;
+    }
+
+    String leaderRegionPath = getLeaderRegionPath(regionPath, regionAttributes, cache);
+
+    String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(leaderRegionPath);
+    if (cache.getAsyncEventQueue(defaultAsyncQueueName) == null) {
+      if (regionAttributes.getHDFSStoreName() != null && regionAttributes.getPartitionAttributes() != null
+          && !(regionAttributes.getPartitionAttributes().getLocalMaxMemory() == 0)) {
+        HDFSStore store = ((GemFireCacheImpl) cache).findHDFSStore(regionAttributes.getHDFSStoreName());
+        if (store == null) {
+          throw new IllegalStateException(
+              LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND.toLocalizedString(regionAttributes.getHDFSStoreName()));
+        }
+        HDFSIntegrationUtil
+            .createAsyncQueueForHDFS(cache, leaderRegionPath, regionAttributes.getHDFSWriteOnly(), store);
+      }
+    }
+  }
+
+  private static String getLeaderRegionPath(String regionPath, RegionAttributes regionAttributes, Cache cache) {
+    String colocated;
+    while (regionAttributes.getPartitionAttributes() != null
+        && (colocated = regionAttributes.getPartitionAttributes().getColocatedWith()) != null) {
+      // Do not waitOnInitialization() for PR
+      GemFireCacheImpl gfc = (GemFireCacheImpl) cache;
+      Region colocatedRegion = gfc.getPartitionedRegion(colocated, false);
+      if (colocatedRegion == null) {
+        Assert.fail("Could not find parent region " + colocated + " for " + regionPath);
+      }
+      regionAttributes = colocatedRegion.getAttributes();
+      regionPath = colocatedRegion.getFullPath();
+    }
+    return regionPath;
+  }
+
+}