You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/05/04 22:57:15 UTC

[15/63] [abbrv] incubator-geode git commit: GEODE-1072: Removing HDFS related code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
deleted file mode 100644
index 9127e4d..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
+++ /dev/null
@@ -1,1232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.EntryNotFoundException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.TimeoutException;
-import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventBuffer.BufferIterator;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.RegionEventImpl;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.CursorIterator;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
-import org.apache.hadoop.hbase.util.Bytes;
-
-
-/**
- * This class holds the sorted list required for HDFS. 
- * 
- * 
- */
-public class HDFSBucketRegionQueue extends AbstractBucketRegionQueue {
-     private static final boolean VERBOSE = Boolean.getBoolean("hdfsBucketRegionQueue.VERBOSE");
-     private final int batchSize;
-     volatile HDFSEventQueue hdfsEventQueue = null;
-     
-     // set before releasing the primary lock. 
-     private final AtomicBoolean releasingPrimaryLock = new AtomicBoolean(true);
-     
-     // This is used to keep track of the current size of the queue in bytes. 
-     final AtomicLong queueSizeInBytes =  new AtomicLong(0);
-     public boolean isBucketSorted = true;
-     /**
-     * @param regionName
-     * @param attrs
-     * @param parentRegion
-     * @param cache
-     * @param internalRegionArgs
-     */
-    public HDFSBucketRegionQueue(String regionName, RegionAttributes attrs,
-        LocalRegion parentRegion, GemFireCacheImpl cache,
-        InternalRegionArguments internalRegionArgs) {
-      super(regionName, attrs, parentRegion, cache, internalRegionArgs);
-      
-      this.isBucketSorted = internalRegionArgs.getPartitionedRegion().getParallelGatewaySender().getBucketSorted();
-      if (isBucketSorted)
-        hdfsEventQueue = new MultiRegionSortedQueue();
-      else
-        hdfsEventQueue = new EventQueue();
-      
-      batchSize = internalRegionArgs.getPartitionedRegion().
-          getParallelGatewaySender().getBatchSize() * 1024 *1024;
-      this.keySet();
-    }
-    @Override
-    protected void initialize(InputStream snapshotInputStream,
-        InternalDistributedMember imageTarget,
-        InternalRegionArguments internalRegionArgs) throws TimeoutException,
-        IOException, ClassNotFoundException {
-
-      super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
-
-      loadEventsFromTempQueue();
-      
-      this.initialized = true;
-      notifyEventProcessor();
-    }
-
-    private TreeSet<Long> createSkipListFromMap(Set keySet) {
-      TreeSet<Long> sortedKeys = null;
-      if (!hdfsEventQueue.isEmpty())
-        return sortedKeys;
-      
-      if (!keySet.isEmpty()) {
-        sortedKeys = new TreeSet<Long>(keySet);
-        if (!sortedKeys.isEmpty())
-        {
-          for (Long key : sortedKeys) {
-            if (this.isBucketSorted) {
-              Object hdfsevent = getNoLRU(key, true, false, false);
-              if (hdfsevent == null) { // this can happen when tombstones are recovered. 
-                if (logger.isDebugEnabled() || VERBOSE) {
-                  logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding key " + key + ", no event recovered"));
-                }
-              } else {
-                int eventSize = ((HDFSGatewayEventImpl)hdfsevent).
-                    getSizeOnHDFSInBytes(!this.isBucketSorted);
-                hdfsEventQueue.put(key,(HDFSGatewayEventImpl)hdfsevent, eventSize );
-                queueSizeInBytes.getAndAdd(eventSize);
-              }
-            }
-            else {
-              Object hdfsevent = getNoLRU(key, true, false, false);
-              if (hdfsevent != null) { // hdfs event can be null when tombstones are recovered.
-                queueSizeInBytes.getAndAdd(((HDFSGatewayEventImpl)hdfsevent).
-                    getSizeOnHDFSInBytes(!this.isBucketSorted));
-              }
-              ((EventQueue)hdfsEventQueue).put(key);
-            }
-              
-          }
-          getEventSeqNum().setIfGreater(sortedKeys.last());
-        }
-      
-      }
-      if (logger.isDebugEnabled() || VERBOSE) {
-        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
-            "For bucket " + getId() + ", total keys recovered are : " + keySet.size()
-                + " and the seqNo is " + getEventSeqNum()));
-      }
-      return sortedKeys;
-    }
-    
-    @Override
-    protected void basicClear(RegionEventImpl ev) {
-      super.basicClear(ev);
-      queueSizeInBytes.set(0);
-      if ( this.getBucketAdvisor().isPrimary()) {
-        this.hdfsEventQueue.clear();
-      }
-    }
-    
-    protected void clearQueues(){
-      queueSizeInBytes.set(0);
-      if ( this.getBucketAdvisor().isPrimary()) {
-        this.hdfsEventQueue.clear();
-      }
-    }
-   
-    @Override
-    protected void basicDestroy(final EntryEventImpl event,
-        final boolean cacheWrite, Object expectedOldValue)
-        throws EntryNotFoundException, CacheWriterException, TimeoutException {
-      super.basicDestroy(event, cacheWrite, expectedOldValue);
-    }
-    
-    ArrayList peekABatch() {
-      ArrayList result = new ArrayList();
-      hdfsEventQueue.peek(result);
-      return result;
-    }
-    
-    @Override
-    protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
-      if (didPut &&  this.getBucketAdvisor().isPrimary()) {
-        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
-        if (sizeOfHDFSEvent == -1) { 
-          try {
-            // the size is calculated only on primary before event is inserted in the bucket. 
-            // If this node became primary after size was calculated, sizeOfHDFSEvent will be -1. 
-            // Try to get the size. #50016
-            sizeOfHDFSEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
-          } catch (Throwable e) {
-           //   Ignore any exception while fetching the size.
-            sizeOfHDFSEvent = 0;
-          }
-        }
-        queueSizeInBytes.getAndAdd(sizeOfHDFSEvent);
-        if (this.initialized) {
-          Long longKey = (Long)key;
-          this.hdfsEventQueue.put(longKey, hdfsEvent, sizeOfHDFSEvent);
-        }
-        if (logger.isDebugEnabled()) {
-          logger.debug("Put successfully in the queue : " + hdfsEvent + " . Queue initialized: " 
-              + this.initialized);
-        }
-      }
-    }
-    
-    /**
-     * It removes the first key from the queue.
-     * 
-     * @return Returns the key for which value was destroyed.
-     * @throws ForceReattemptException
-     */
-    public Long remove() throws ForceReattemptException {
-      throw new UnsupportedOperationException("Individual entries cannot be removed in a HDFSBucketRegionQueue");
-    }
-    
-    /**
-     * It removes the first key from the queue.
-     * 
-     * @return Returns the value.
-     * @throws InterruptedException
-     * @throws ForceReattemptException
-     */
-    public Object take() throws InterruptedException, ForceReattemptException {
-      throw new UnsupportedOperationException("take() cannot be called for individual entries in a HDFSBucketRegionQueue");
-    }
-    
-    public void destroyKeys(ArrayList<HDFSGatewayEventImpl>  listToDestroy) {
-      
-      HashSet<Long> removedSeqNums = new HashSet<Long>();
-      
-      for (int index =0; index < listToDestroy.size(); index++) {
-        HDFSGatewayEventImpl entry = null;
-        if (this.isBucketSorted) {
-          // Remove the events in reverse order so that the events with higher sequence number
-          // are removed last to ensure consistency.
-          entry = listToDestroy.get(listToDestroy.size() - index -1);
-        } else {
-          entry = listToDestroy.get(index);
-        }
-       
-        try {
-          if (this.logger.isDebugEnabled())
-            logger.debug("destroying primary key " + entry.getShadowKey() + " bucket id: " + this.getId());
-          // removed from peeked list
-          boolean deleted = this.hdfsEventQueue.remove(entry);
-          if (deleted) {
-            // this is an onheap event so a call to size should be ok. 
-            long entrySize = entry.getSizeOnHDFSInBytes(!this.isBucketSorted);
-            destroyKey(entry.getShadowKey());
-            long queueSize = queueSizeInBytes.getAndAdd(-1*entrySize);
-            if (queueSize < 0) {
-              // In HA scenarios, queueSizeInBytes can go awry.
-              queueSizeInBytes.compareAndSet(queueSize, 0);
-            }
-            removedSeqNums.add(entry.getShadowKey());
-          }
-        }catch (ForceReattemptException e) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got ForceReattemptException for " + this
-            + " for bucket = " + this.getId());
-          }
-        }
-        catch(EntryNotFoundException e) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got EntryNotFoundException for " + this
-              + " for bucket = " + this.getId() + " and key " + entry.getShadowKey());
-          }
-        } finally {
-          entry.release();
-        }
-      }
-      
-      if (this.getBucketAdvisor().isPrimary()) {
-        hdfsEventQueue.handleRemainingElements(removedSeqNums);
-      }
-    }
-
-    
-    public boolean isReadyForPeek() {
-      return !this.isEmpty() && !this.hdfsEventQueue.isEmpty() && getBucketAdvisor().isPrimary();
-    }
-
-    public long getLastPeekTimeInMillis() {
-      return hdfsEventQueue.getLastPeekTimeInMillis();
-    }
-    
-    public long getQueueSizeInBytes() {
-      return queueSizeInBytes.get();
-    }
-    /*
-     * This function is called when the bucket takes as the role of primary.
-     */
-    @Override
-    public void beforeAcquiringPrimaryState() {
-      
-      queueSizeInBytes.set(0);
-      if (logger.isDebugEnabled() || VERBOSE) {
-        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
-            "This node has become primary for bucket " + this.getId()  +". " +
-            		"Creating sorted data structure for the async queue."));
-      }
-      releasingPrimaryLock.set(false);
-      
-      // clear the hdfs queue in case it has already elements left if it was a primary
-      // in the past
-      hdfsEventQueue.clear();
-      if (isBucketSorted)
-        hdfsEventQueue = new MultiRegionSortedQueue();
-      else
-        hdfsEventQueue = new EventQueue();
-      
-      TreeSet<Long> sortedKeys = createSkipListFromMap(this.keySet());
-      
-      if (sortedKeys != null && sortedKeys.size() > 0) {    
-        // Mark the events equal to batch size as duplicate. 
-        // calculate the batch size based on the number of events currently in the queue
-        // This is an approximation. 
-        long batchSizeMB =  this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
-        long batchSizeInBytes = batchSizeMB*1024*1024;
-        long totalBucketSize = queueSizeInBytes.get();
-        totalBucketSize = totalBucketSize >  0 ? totalBucketSize: 1;
-        long totalEntriesInBucket = this.entryCount();
-        totalEntriesInBucket =  totalEntriesInBucket > 0 ? totalEntriesInBucket: 1;
-        
-        long perEntryApproxSize = totalBucketSize/totalEntriesInBucket;
-        perEntryApproxSize = perEntryApproxSize >  0 ? perEntryApproxSize: 1;
-        
-        int batchSize  = (int)(batchSizeInBytes/perEntryApproxSize);
-        
-        if (logger.isDebugEnabled() || VERBOSE) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
-              "Calculating batch size " +  " batchSizeMB: " + batchSizeMB + " batchSizeInBytes: " + batchSizeInBytes + 
-              " totalBucketSize: " + totalBucketSize + " totalEntriesInBucket: " + totalEntriesInBucket + 
-              " perEntryApproxSize: " + perEntryApproxSize + " batchSize: " + batchSize ));
-        }
-        
-        markEventsAsDuplicate(batchSize, sortedKeys.iterator());
-      }
-    }
-    
-    @Override
-    public void beforeReleasingPrimaryLockDuringDemotion() {
-      queueSizeInBytes.set(0);
-      releasingPrimaryLock.set(true);
-      // release memory in case of a clean transition
-      hdfsEventQueue.clear();
-    }
-
-    /**
-     * This function searches the skip list and the peeked skip list for a given region key
-     * @param region 
-     * 
-     */
-    public HDFSGatewayEventImpl getObjectForRegionKey(Region region, byte[] regionKey) {
-      // get can only be called for a sorted queue.
-      // Calling get with Long.MIN_VALUE seq number ensures that 
-      // the list will return the key which has highest seq number. 
-      return hdfsEventQueue.get(region, regionKey, Long.MIN_VALUE);
-    }
-
-    /**
-     * Get an iterator on the queue, passing in the partitioned region
-     * we want to iterate over the events from.
-     */
-    public SortedEventQueueIterator iterator(Region region) {
-      return hdfsEventQueue.iterator(region);
-    }
-
-    public long totalEntries() {
-      return entryCount();
-    }
-    
-    /**
-     * Ideally this function should be called from a thread periodically to 
-     * rollover the skip list when it is above a certain size. 
-     * 
-     */
-    public void rolloverSkipList() {
-      // rollover can only be called for a sorted queue.
-      hdfsEventQueue.rollover();
-    }
-    
-    public boolean shouldDrainImmediately() {
-      return hdfsEventQueue.getFlushObserver().shouldDrainImmediately();
-    }
-
-    public AsyncFlushResult flush() {
-      if (logger.isDebugEnabled() || VERBOSE) {
-        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flush requested"));
-      }
-      return hdfsEventQueue.getFlushObserver().flush();
-    }
-    
-    /**
-     * This class keeps the regionkey and seqNum. The objects of this class are 
-     * kept in a concurrent skip list. The order of elements is decided based on the 
-     * comparison of regionKey + seqNum. This kind of comparison allows us to keep 
-     * multiple updates on a single key (becaus it has different seq Num)
-     */
-    static class KeyToSeqNumObject implements Comparable<KeyToSeqNumObject>
-    {
-      private byte[] regionkey; 
-      private Long seqNum;
-      
-      KeyToSeqNumObject(byte[] regionkey, Long seqNum){
-        this.regionkey = regionkey;
-        this.seqNum = seqNum;
-      }
-      
-      /**
-       * This function compares the key first. If the keys are same then seq num is compared.
-       * This function is a key function because it ensures that the skiplists hold the elements 
-       * in an order we want it to and for multiple updates on key fetches the most recent one 
-       * Currently we are comparing seq numbers but we will have to change it to version stamps. 
-       * * List can have elements in following sequence 
-       * K1 Value1 version : 1 
-       * K2 Value2a version : 2
-       * K2 Value2 version : 1
-       * K3 Value3 version : 1
-       * For a get on K2, it should retunr K2 Value 2a.  
-       */
-      @Override
-      public int compareTo(KeyToSeqNumObject o) {
-        int compareOutput = ByteComparator.compareBytes(
-            this.getRegionkey(), 0, this.getRegionkey().length, o.getRegionkey(), 0, o.getRegionkey().length);
-        if (compareOutput != 0 )
-          return compareOutput;
-        
-        // If the keys are same and this is an object with dummy seq number, 
-        // return -1. This will ensure that ceiling function on a skip list will enumerate 
-        // all the entries and return the last one.   
-        if (this.getSeqNum() == Long.MIN_VALUE) 
-          return -1;
-        
-        // this is to just maintain consistency with the above statement. 
-        if (o.getSeqNum() == Long.MIN_VALUE) 
-          return 1;
-       
-        // minus operator pushes entries with lower seq number in the end so that 
-        // the order as mentioned above is maintained. And the entries with 
-        // higher version are fetched on a get. 
-        return this.getSeqNum().compareTo(o.getSeqNum()) * -1;  
-      }
-      
-      @Override
-      public boolean equals (Object o) {
-    	KeyToSeqNumObject obj = null;
-      	if (o == null)
-    		return false; 
-    	
-    	if (o instanceof KeyToSeqNumObject) 
-    		obj = (KeyToSeqNumObject)o;
-    	else
-    		return false;
-    	
-    	if (this.compareTo(obj) != 0)
-          return false;
-        else
-          return true;
-      }
-      
-      public int hashCode() {
-    	assert false : "hashCode not designed";
-    	return -1;
-      }
-      
-      byte[] getRegionkey() {
-        return regionkey;
-      }
-
-      public Long getSeqNum() {
-        return seqNum;
-      }
-
-      public void setSeqNum(Long seqNum) {
-        this.seqNum = seqNum;
-      }
-      
-      @Override
-      public String toString() {
-        return EntryEventImpl.deserialize(regionkey) + " {" + seqNum + "}";
-      }
-    }
-    
-    public interface HDFSEventQueue {
-      FlushObserver getFlushObserver();
-      
-      /** puts an event in the queue. */ 
-      public void put (long key, HDFSGatewayEventImpl event, int size);
-      
-      public SortedEventQueueIterator iterator(Region region);
-
-      public void rollover();
-
-      /** Get a value from the queue
-       * @throws IllegalStateException if this queue doesn't support get  
-       **/
-      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
-          long minValue);
-
-      // Peeks a batch of size specified by batchSize
-      // And add the results to the array list
-      public void peek(ArrayList result);
-      
-      // Checks if there are elements to bee peeked 
-      public boolean isEmpty();
-      
-      // removes the event if it has already been peeked. 
-      public boolean remove(HDFSGatewayEventImpl event);
-      
-      // take care of the elements that were peeked 
-      // but were not removed after a batch dispatch 
-      // due to concurrency effects. 
-      public void handleRemainingElements(HashSet<Long> listToBeremoved);
-      
-      // clears the list. 
-      public void clear();
-      
-      // get the time when the last peek was done. 
-      public long getLastPeekTimeInMillis();
-    }
-    
-    class MultiRegionSortedQueue implements HDFSEventQueue {
-      ConcurrentMap<String, SortedEventQueue> regionToEventQueue = new ConcurrentHashMap<String, SortedEventQueue>();
-      volatile Set<SortedEventQueue> peekedQueues = Collections.EMPTY_SET;
-      private final AtomicBoolean peeking = new AtomicBoolean(false);
-      long lastPeekTimeInMillis = System.currentTimeMillis();
-      
-      private final FlushObserver flush = new FlushObserver() {
-        @Override
-        public AsyncFlushResult flush() {
-          final Set<AsyncFlushResult> flushes = new HashSet<AsyncFlushResult>();
-          for (SortedEventQueue queue : regionToEventQueue.values()) {
-            flushes.add(queue.getFlushObserver().flush());
-          }
-          
-          return new AsyncFlushResult() {
-            @Override
-            public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
-              long start = System.nanoTime();
-              long remaining = unit.toNanos(timeout);
-              for (AsyncFlushResult afr : flushes) {
-                if (!afr.waitForFlush(remaining, TimeUnit.NANOSECONDS)) {
-                  return false;
-                }
-                remaining -= (System.nanoTime() - start);
-              }
-              return true;
-            }
-          };
-        }
-        
-        @Override
-        public boolean shouldDrainImmediately() {
-          for (SortedEventQueue queue : regionToEventQueue.values()) {
-            if (queue.getFlushObserver().shouldDrainImmediately()) {
-              return true;
-            }
-          }
-          return false;
-        }
-      };
-      
-      @Override
-      public FlushObserver getFlushObserver() {
-        return flush;
-      }
-
-      @Override
-      public void put(long key, HDFSGatewayEventImpl event, int size) {
-        
-        String region = event.getRegionPath();
-        SortedEventQueue regionQueue = regionToEventQueue.get(region);
-        if(regionQueue == null) {
-          regionToEventQueue.putIfAbsent(region, new SortedEventQueue());
-          regionQueue = regionToEventQueue.get(region);
-        }
-        regionQueue.put(key, event, size);
-      }
-
-      @Override
-      public void peek(ArrayList result) {
-        // The elements that were peeked last time, have not been persisted to HDFS 
-        // yet. You cannot take out next batch until that is done.
-        if (!peeking.compareAndSet(false, true)) {
-          if (logger.isTraceEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
-          }
-          return;
-        }
-        //Maintain a separate set of peeked queues.
-        //All of these queues are statefull, and expect to be
-        //handleRemainingElements and clear to be called on
-        //them iff peek was called on them. However, new queues
-        //may be created in that time.
-        peekedQueues = Collections.newSetFromMap(new ConcurrentHashMap<SortedEventQueue, Boolean>(regionToEventQueue.size()));
-        
-        //Peek from all of the existing queues
-        for(SortedEventQueue queue : regionToEventQueue.values()) {
-          if(!queue.isEmpty()) {
-            queue.peek(result);
-            peekedQueues.add(queue);
-          }
-        }
-        if (result.isEmpty()) 
-          peeking.set(false);
-        
-        
-        this.lastPeekTimeInMillis = System.currentTimeMillis();
-      }
-
-      @Override
-      public boolean isEmpty() {
-        for(SortedEventQueue queue : regionToEventQueue.values()) {
-          if(!queue.isEmpty()) {
-            return false;
-          }
-        }
-        return true;
-      }
-
-      @Override
-      public boolean remove(HDFSGatewayEventImpl event) {
-        String region = event.getRegionPath();
-        SortedEventQueue regionQueue = regionToEventQueue.get(region);
-        return regionQueue.remove(event);
-      }
-
-      @Override
-      public void handleRemainingElements(HashSet<Long> removedSeqNums){
-        for(SortedEventQueue queue : peekedQueues) {
-          queue.handleRemainingElements(removedSeqNums);
-        }
-        peekedQueues.clear();
-        peeking.set(false);
-      }
-
-      @Override
-      public void clear() {
-        for(SortedEventQueue queue : regionToEventQueue.values()) {
-          queue.clear();
-        }
-        peekedQueues.clear();
-        peeking.set(false);
-      }
-
-      @Override
-      public long getLastPeekTimeInMillis() {
-        return this.lastPeekTimeInMillis;
-      }
-
-      @Override
-      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
-          long minValue) {
-        SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
-        if(queue == null) {
-          return null;
-        }
-        return queue.get(region, regionKey, minValue);
-      }
-
-      @Override
-      public SortedEventQueueIterator iterator(Region region) {
-        SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
-        if(queue == null) {
-          return new SortedEventQueueIterator(new LinkedBlockingDeque<SortedEventBuffer>());
-        }
-        return queue.iterator(region);
-      }
-
-      @Override
-      public void rollover() {
-        for(SortedEventQueue queue : regionToEventQueue.values()) {
-          queue.rollover();
-        }
-      }
-    }
-    
-    class EventQueue implements HDFSEventQueue {
-      private final SignalledFlushObserver flush = new SignalledFlushObserver();
-      private final BlockingQueue<Long> eventSeqNumQueue = new LinkedBlockingQueue<Long>();
-      private final BlockingQueue<Long> peekedEvents = new LinkedBlockingQueue<Long>();
-      private long lastPeekTimeInMillis = System.currentTimeMillis(); 
-      
-      public EventQueue() {
-        
-      }
-      
-      @Override
-      public FlushObserver getFlushObserver() {
-        return flush;
-      }
-
-      @Override
-      public void put(long key, HDFSGatewayEventImpl event, int size) {
-        put(key);
-      }
-      public void put (long key) {
-        eventSeqNumQueue.add(key);
-        flush.push();
-        incQueueSize();
-      }
-      
-      
-      @Override
-      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
-          long minValue) {
-        throw new InternalGemFireError("Get not supported on unsorted queue");
-      }
-      
-      @Override
-      public void peek(ArrayList peekedEntries) {
-        if (peekedEvents.size() != 0) {
-          return;
-        }
-        
-        for(int size=0; size < batchSize; ) {
-          Long seqNum = eventSeqNumQueue.peek();
-          if (seqNum == null) {
-            // queue is now empty, return
-            break;
-          }
-          Object object = getNoLRU(seqNum, true, false, false);
-          if (object != null) {
-            peekedEvents.add(seqNum);
-            size += ((HDFSGatewayEventImpl)object).getSizeOnHDFSInBytes(!isBucketSorted);
-            peekedEntries.add(object);
-
-          } else {
-            logger.debug("The entry corresponding to the sequence number " + 
-               seqNum +  " is missing. This can happen when an entry is already" +
-               "dispatched before a bucket moved.");
-            // event is being ignored. Decrease the queue size
-            decQueueSize();
-            flush.pop(1);
-           
-          }
-          eventSeqNumQueue.poll();
-          
-        }
-        this.lastPeekTimeInMillis  = System.currentTimeMillis();
-      }
-
-      @Override
-      public boolean isEmpty() {
-        return eventSeqNumQueue.isEmpty();
-      }
-
-      
-      @Override
-      public boolean remove(HDFSGatewayEventImpl event) {
-        boolean deleted = peekedEvents.remove(event.getShadowKey());
-        if (deleted)
-         decQueueSize();
-        return deleted;
-      }
-
-      @Override
-      // It looks like that there is no need for this function 
-      // in EventQueue.
-      public void handleRemainingElements(HashSet<Long> removedSeqNums) {
-        flush.pop(removedSeqNums.size());
-        eventSeqNumQueue.addAll(peekedEvents);
-        peekedEvents.clear();
-      }
-
-      @Override
-      public void clear() {
-        flush.clear();
-        decQueueSize(eventSeqNumQueue.size());
-        eventSeqNumQueue.clear();
-        decQueueSize(peekedEvents.size());
-        peekedEvents.clear();
-      }
-
-      @Override
-      public long getLastPeekTimeInMillis() {
-        return this.lastPeekTimeInMillis;
-      }
-      @Override
-      public SortedEventQueueIterator iterator(Region region) {
-        throw new InternalGemFireError("not supported on unsorted queue");
-      }
-      @Override
-      public void rollover() {
-        throw new InternalGemFireError("not supported on unsorted queue");
-      }
-    }
-    
-    class SortedEventQueue implements HDFSEventQueue {
-      private final SignalledFlushObserver flush = new SignalledFlushObserver();
-
-      // List of all the skip lists that hold the data
-      final Deque<SortedEventBuffer> queueOfLists = 
-          new LinkedBlockingDeque<SortedEventBuffer>();
-      
-      // This points to the tail of the queue
-      volatile SortedEventBuffer currentSkipList = new SortedEventBuffer();
-      
-      private final AtomicBoolean peeking = new AtomicBoolean(false);
-      
-      private long lastPeekTimeInMillis = System.currentTimeMillis(); 
-      
-      public SortedEventQueue() {
-        queueOfLists.add(currentSkipList);
-      }
-      
-      @Override
-      public FlushObserver getFlushObserver() {
-        return flush;
-      }
-
-      public boolean remove(HDFSGatewayEventImpl event) {
-        SortedEventBuffer eventBuffer = queueOfLists.peek();
-        if (eventBuffer != null) {
-          return eventBuffer.copyToBuffer(event);
-        }
-        else {
-          // This can happen when the queue is cleared because of bucket movement 
-          // before the remove is called. 
-          return true;
-        }
-      } 
-
-      public void clear() {
-        flush.clear();
-        for (SortedEventBuffer buf : queueOfLists) {
-          decQueueSize(buf.size());
-          buf.clear();
-        }
-        
-        queueOfLists.clear();
-        rollList(false);
-
-        peeking.set(false);
-      }
-
-      public boolean isEmpty() {
-        if (queueOfLists.size() == 1)
-          return queueOfLists.peek().isEmpty();
-        return false;
-      }
-
-      public void put(long key, HDFSGatewayEventImpl event, int eventSize) {
-        if (logger.isTraceEnabled() || VERBOSE) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Inserting key " + event + " into list " + System.identityHashCode(currentSkipList)));
-        }
-        putInList(new KeyToSeqNumObject(((HDFSGatewayEventImpl)event).getSerializedKey(), key), 
-            eventSize);
-      }
-
-      private void putInList(KeyToSeqNumObject entry, int sizeInBytes) {
-        // It was observed during testing that peek can start peeking 
-        // elements from a list to which a put is happening. This happens 
-        // when the peek changes the value of currentSkiplist to a new list 
-        // but the put continues to write to an older list. 
-        // So there is a possibility that an element is added to the list 
-        // that has already been peeked. To handle this case, in handleRemainingElements
-        // function we re-add the elements that were not peeked. 
-        if (currentSkipList.add(entry, sizeInBytes) == null) {
-          flush.push();
-          incQueueSize();
-        }
-      }
-
-      public void rollover(boolean forceRollover) {
-        if (currentSkipList.bufferSize() >= batchSize || forceRollover) {
-          rollList(forceRollover);
-        }
-      }
-      
-      /**
-       * Ideally this function should be called from a thread periodically to 
-       * rollover the skip list when it is above a certain size. 
-       * 
-       */
-      public void rollover() {
-        rollover(false);
-      }
-
-      public void peek(ArrayList peekedEntries) {
-        // The elements that were peeked last time, have not been persisted to HDFS 
-        // yet. You cannot take out next batch until that is done.
-        if (!peeking.compareAndSet(false, true)) {
-          if (logger.isTraceEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
-          }
-          return;
-        }
-
-        if (queueOfLists.size() == 1) {
-          rollList(false);
-        }
-        
-        Assert.assertTrue(queueOfLists.size() > 1, "Cannot peek from head of queue");
-        BufferIterator itr = queueOfLists.peek().iterator();
-        while (itr.hasNext()) {
-          KeyToSeqNumObject entry = itr.next();
-          if (logger.isTraceEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peeking key " + entry + " from list " + System.identityHashCode(queueOfLists.peek())));
-          }
-
-          HDFSGatewayEventImpl ev = itr.value();
-          ev.copyOffHeapValue();
-          peekedEntries.add(ev);
-        }
-        
-        // discard an empty batch as it is not processed and will plug up the
-        // queue
-        if (peekedEntries.isEmpty()) {
-          SortedEventBuffer empty = queueOfLists.remove();
-          if (logger.isTraceEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding empty batch " + empty));
-          }
-          peeking.set(false);
-        }
-        this.lastPeekTimeInMillis = System.currentTimeMillis();
-      }
-
-      public HDFSGatewayEventImpl get(Region region, byte[] regionKey, long key) {
-        KeyToSeqNumObject event = new KeyToSeqNumObject(regionKey, key);
-        Iterator<SortedEventBuffer> queueIterator = queueOfLists.descendingIterator();
-        while (queueIterator.hasNext()) {
-          HDFSGatewayEventImpl evt = queueIterator.next().getFromQueueOrBuffer(event);
-          if (evt != null) {
-            return evt;
-          }
-        }
-        return null;
-      }
-      
-      public void handleRemainingElements(HashSet<Long> removedSeqNums) {
-        if (!peeking.get()) {
-          if (logger.isTraceEnabled() || VERBOSE) {
-            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Not peeked, just cleaning up empty batch; current list is " + currentSkipList));
-          }
-          return;
-        }
-
-        Assert.assertTrue(queueOfLists.size() > 1, "Cannot remove only event list");
-
-        // all done with the peeked elements, okay to throw away now
-        SortedEventBuffer buf = queueOfLists.remove();
-        SortedEventBuffer.BufferIterator bufIter = buf.iterator();
-        // Check if the removed buffer has any extra events. If yes, check if these extra 
-        // events are part of region. If yes, reinsert these as they were probably inserted 
-        // into this list while it was being peeked. 
-        while (bufIter.hasNext()) {
-          KeyToSeqNumObject key = bufIter.next();
-          if (!removedSeqNums.contains(key.getSeqNum())) {
-            HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) getNoLRU(key.getSeqNum(), true, false, false);
-            if (evt != null) {
-              flush.push();
-              incQueueSize();
-              queueOfLists.getFirst().add(key, evt.getSizeOnHDFSInBytes(!isBucketSorted));
-            }
-          }
-        }
-
-        decQueueSize(buf.size());
-        flush.pop(buf.size());
-        peeking.set(false);
-      }
-      
-      public long getLastPeekTimeInMillis(){
-        return this.lastPeekTimeInMillis;
-      }
-      
-      NavigableSet<KeyToSeqNumObject> getPeeked() {
-        assert peeking.get();
-        return queueOfLists.peek().keySet();
-      }
-      
-      private synchronized void rollList(boolean forceRollover) {
-        if (currentSkipList.bufferSize() < batchSize && queueOfLists.size() > 1 && !forceRollover)
-          return;
-        SortedEventBuffer tmp = new SortedEventBuffer();
-        queueOfLists.add(tmp);
-        if (logger.isTraceEnabled() || VERBOSE) {
-          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Rolling over list from " + currentSkipList + " to list " + tmp));
-        }
-        currentSkipList = tmp;
-      }
-
-      @Override
-      public SortedEventQueueIterator iterator(Region region) {
-        return new SortedEventQueueIterator(queueOfLists);
-      }
-    }
-    
-    public class SortedEventBuffer {
-      private final HDFSGatewayEventImpl NULL = new HDFSGatewayEventImpl();
-  
-      private final ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl> events = new ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl>();
-      
-      private int bufferSize = 0;
-      
-      public boolean copyToBuffer(HDFSGatewayEventImpl event) {
-        KeyToSeqNumObject key = new KeyToSeqNumObject(event.getSerializedKey(), event.getShadowKey());
-        if (events.containsKey(key)) {
-          // After an event has been delivered in a batch, we copy it into the
-          // buffer so that it can be returned by an already in progress iterator.
-          // If we do not do this it is possible to miss events since the hoplog
-          // iterator uses a fixed set of files that are determined when the
-          // iterator is created.  The events will be GC'd once the buffer is no
-          // longer strongly referenced.
-          HDFSGatewayEventImpl oldVal = events.put(key, event);
-          assert oldVal == NULL;
-  
-          return true;
-        }
-        // If the primary lock is being relinquished, the events is cleared and probaly that is
-        // why we are here. return true if the primary lock is being relinquished
-        if (releasingPrimaryLock.get())
-          return true;
-        else 
-          return false;
-      }
-  
-      public HDFSGatewayEventImpl getFromQueueOrBuffer(KeyToSeqNumObject key) {
-        KeyToSeqNumObject result = events.ceilingKey(key);
-        if (result != null && Bytes.compareTo(key.getRegionkey(), result.getRegionkey()) == 0) {
-          
-          // first try to fetch the buffered event to make it fast. 
-          HDFSGatewayEventImpl evt = events.get(result);
-          if (evt != NULL) {
-            return evt;
-          }
-          // now try to fetch the event from the queue region
-          evt = (HDFSGatewayEventImpl) getNoLRU(result.getSeqNum(), true, false, false);
-          if (evt != null) {
-            return evt;
-          }
-          
-          // try to fetch again from the buffered events to avoid a race between 
-          // item deletion and the above two statements. 
-          evt = events.get(result);
-          if (evt != NULL) {
-            return evt;
-          }
-          
-        }
-        return null;
-      }
-  
-      public HDFSGatewayEventImpl add(KeyToSeqNumObject key, int sizeInBytes) {
-        bufferSize += sizeInBytes;
-        return events.put(key, NULL);
-      }
-  
-      public void clear() {
-        events.clear();
-      }
-  
-      public boolean isEmpty() {
-        return events.isEmpty();
-      }
-  
-      public int bufferSize() {
-        return bufferSize;
-      }
-      public int size() {
-        return events.size();
-      }
-      public NavigableSet<KeyToSeqNumObject> keySet() {
-        return events.keySet();
-      }
-  
-      public BufferIterator iterator() {
-        return new BufferIterator(events.keySet().iterator());
-      }
-  
-      public class BufferIterator implements Iterator<KeyToSeqNumObject> {
-        private final Iterator<KeyToSeqNumObject> src;
-
-        private KeyToSeqNumObject currentKey;
-        private HDFSGatewayEventImpl currentVal;
-
-        private KeyToSeqNumObject nextKey;
-        private HDFSGatewayEventImpl nextVal;
-        
-        public BufferIterator(Iterator<KeyToSeqNumObject> src) {
-          this.src = src;
-          moveNext();
-        }
-  
-        @Override
-        public void remove() {
-          throw new UnsupportedOperationException();
-        }
-        
-        @Override
-        public boolean hasNext() {
-          return nextVal != null;
-        }
-        
-        @Override
-        public KeyToSeqNumObject next() {
-          if (!hasNext()) {
-            throw new NoSuchElementException();
-          }
-          
-          currentKey = nextKey;
-          currentVal = nextVal;
-          
-          moveNext();
-          
-          return currentKey;
-        }
-  
-        public KeyToSeqNumObject key() {
-          assert currentKey != null;
-          return currentKey;
-        }
-        
-        public HDFSGatewayEventImpl value() {
-          assert currentVal != null;
-          return currentVal;
-        }
-        
-        private void moveNext() {
-          while (src.hasNext()) {
-            nextKey = src.next();
-            nextVal = getFromQueueOrBuffer(nextKey);
-            if (nextVal != null) {
-              return;
-            } else if (logger.isDebugEnabled() || VERBOSE) {
-              logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "The entry corresponding to"
-                  + " the sequence number " + nextKey.getSeqNum() 
-                  + " is missing. This can happen when an entry is already" 
-                  + " dispatched before a bucket moved."));
-            }
-          }
-          nextKey = null;
-          nextVal = null;
-        }
-      }
-    }
-  
-    public final class SortedEventQueueIterator implements CursorIterator<HDFSGatewayEventImpl> {
-      /** the iterators to merge */
-      private final List<SortedEventBuffer.BufferIterator> iters;
-  
-      /** the current iteration value */
-      private HDFSGatewayEventImpl value;
-  
-      public SortedEventQueueIterator(Deque<SortedEventBuffer> queueOfLists) {
-        iters = new ArrayList<SortedEventBuffer.BufferIterator>();
-        for (Iterator<SortedEventBuffer> iter = queueOfLists.descendingIterator(); iter.hasNext();) {
-          SortedEventBuffer.BufferIterator buf = iter.next().iterator();
-          if (buf.hasNext()) {
-            buf.next();
-            iters.add(buf);
-          }
-        }
-      }
-      
-      public void close() {
-        value = null;
-        iters.clear();
-      }
-
-      @Override
-      public boolean hasNext() {
-        return !iters.isEmpty();
-      }
-      
-      @Override
-      public HDFSGatewayEventImpl next() {
-        if (!hasNext()) {
-          throw new UnsupportedOperationException();
-        }
-        
-        int diff = 0;
-        KeyToSeqNumObject min = null;
-        SortedEventBuffer.BufferIterator cursor = null;
-        
-        for (Iterator<SortedEventBuffer.BufferIterator> merge = iters.iterator(); merge.hasNext(); ) {
-          SortedEventBuffer.BufferIterator buf = merge.next();
-          KeyToSeqNumObject tmp = buf.key();
-          if (min == null || (diff = Bytes.compareTo(tmp.regionkey, min.regionkey)) < 0) {
-            min = tmp;
-            cursor = buf;
-            
-          } else if (diff == 0 && !advance(buf, min)) {
-            merge.remove();
-          }
-        }
-        
-        value = cursor.value();
-        assert value != null;
-
-        if (!advance(cursor, min)) {
-          iters.remove(cursor);
-        }
-        return current();
-      }
-      
-      @Override
-      public final HDFSGatewayEventImpl current() {
-        return value;
-      }
-
-      @Override 
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-      
-      private boolean advance(SortedEventBuffer.BufferIterator iter, KeyToSeqNumObject key) {
-        while (iter.hasNext()) {
-          if (Bytes.compareTo(iter.next().regionkey, key.regionkey) > 0) {
-            return true;
-          }
-        }
-        return false;
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
deleted file mode 100644
index c8b7b28..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.lang.ref.ReferenceQueue;
-import java.lang.ref.WeakReference;
-import java.util.AbstractSet;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventQueueIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.HDFSRegionMap;
-import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
-import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import org.apache.hadoop.hbase.util.Bytes;
-
-@SuppressWarnings("rawtypes")
-public class HDFSEntriesSet extends AbstractSet {
-  private final IteratorType type;
-
-  private final HoplogOrganizer hoplogs;
-  private final HDFSBucketRegionQueue brq;
-  
-  private final BucketRegion region; 
-  private final ReferenceQueue<HDFSIterator> refs;
-  
-  public HDFSEntriesSet(BucketRegion region, HDFSBucketRegionQueue brq, 
-      HoplogOrganizer hoplogs, IteratorType type, ReferenceQueue<HDFSIterator> refs) {
-    this.region = region;
-    this.brq = brq;
-    this.hoplogs = hoplogs;
-    this.type = type;
-    this.refs = refs;
-  }
-  
-  @Override
-  public HDFSIterator iterator() {
-    HDFSIterator iter = new HDFSIterator(type, region.getPartitionedRegion(), true);
-    if (refs != null) {
-      // we can't rely on an explicit close but we need to free resources
-      //
-      // This approach has the potential to cause excessive memory load and/or
-      // GC problems if an app holds an iterator ref too long. A lease-based
-      // approach where iterators are automatically for X secs of inactivity is
-      // a potential alternative (but may require tuning for certain
-      // applications)
-      new WeakReference<HDFSEntriesSet.HDFSIterator>(iter, refs);
-    }
-    return iter;
-  }
-
-  @Override
-  public int size() {
-    // TODO this is the tortoise version, need a fast version for estimation
-    // note: more than 2^31-1 records will cause this counter to wrap
-    int size = 0;
-    HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
-    try {
-      while (iter.hasNext()) {
-        if (includeEntry(iter.next())) {
-          size++;
-        }
-      }
-    } finally {
-      iter.close();
-    }
-    return size;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
-    try {
-      while (iter.hasNext()) {
-        if (includeEntry(iter.next())) {
-          return false;
-        }
-      }
-    } finally {
-      iter.close();
-    }
-    return true;
-  }
-
-  private boolean includeEntry(Object val) {
-    if (val instanceof HDFSGatewayEventImpl) {
-      HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) val;
-      if (evt.getOperation().isDestroy()) {
-        return false;
-      }
-    } else if (val instanceof PersistedEventImpl) {
-      PersistedEventImpl evt = (PersistedEventImpl) val;
-      if (evt.getOperation().isDestroy()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public class HDFSIterator implements Iterator {
-    private final IteratorType type;
-    private final boolean deserialize;
-    
-    private final SortedEventQueueIterator queue;
-    private final HoplogIterator<byte[], SortedHoplogPersistedEvent> hdfs;
-    private Iterator txCreatedEntryIterator;
-    
-    private boolean queueNext;
-    private boolean hdfsNext;
-    private boolean forUpdate;
-    private boolean hasTxEntry;
-
-    private byte[] currentHdfsKey;
-
-    public HDFSIterator(IteratorType type, Region region, boolean deserialize) {
-      this.type = type;
-      this.deserialize = deserialize;
-
-      // Check whether the queue has become primary here.
-      // There could be some time between bucket becoming a primary 
-      // and underlying queue becoming a primary, so isPrimaryWithWait() 
-      // waits for some time for the queue to become a primary on this member
-      if (!brq.getBucketAdvisor().isPrimaryWithWait()) {
-        InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
-            .basicGetPrimaryMember();
-        throw new PrimaryBucketException("Bucket " + brq.getName()
-            + " is not primary. Current primary holder is " + primaryHolder);
-      }
-      // We are deliberating NOT sync'ing while creating the iterators.  If done
-      // in the correct order, we may get duplicates (due to an in-progress
-      // flush) but we won't miss any entries.  The dupes will be eliminated
-      // during iteration.
-      queue = brq.iterator(region);
-      advanceQueue();
-      
-      HoplogIterator<byte[], SortedHoplogPersistedEvent> tmp = null;
-      try {
-        tmp = hoplogs.scan();
-      } catch (IOException e) {
-        HDFSEntriesSet.this.region.checkForPrimary();
-        throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
-      }
-      
-      hdfs = tmp;
-      if (hdfs != null) {
-        advanceHdfs();
-      }
-    }
-    
-    @Override
-    public boolean hasNext() {
-      boolean nonTxHasNext = hdfsNext || queueNext;
-      if (!nonTxHasNext && this.txCreatedEntryIterator != null) {
-        this.hasTxEntry = this.txCreatedEntryIterator.hasNext();
-        return this.hasTxEntry;
-      }
-      return nonTxHasNext;
-    }
-    
-    @Override
-    public Object next() {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
-      }
-      if (hasTxEntry) {
-        hasTxEntry = false;
-        return this.txCreatedEntryIterator.next();
-      }
-
-      Object val;
-      if (!queueNext) {
-        val = getFromHdfs();
-        advanceHdfs();
-        
-      } else if (!hdfsNext) {
-        val = getFromQueue();
-        advanceQueue();
-        
-      } else {
-        byte[] qKey = queue.current().getSerializedKey();
-        byte[] hKey = this.currentHdfsKey;
-        
-        int diff = Bytes.compareTo(qKey, hKey);
-        if (diff < 0) {
-          val = getFromQueue();
-          advanceQueue();
-          
-        } else if (diff == 0) {
-          val = getFromQueue();
-          advanceQueue();
-
-          // ignore the duplicate
-          advanceHdfs();
-
-        } else {
-          val = getFromHdfs();
-          advanceHdfs();
-        }
-      }
-      return val;
-    }
-    
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-    
-    public void close() {
-      if (queueNext) {
-        queue.close();
-      }
-
-      if (hdfsNext) {
-        hdfs.close();
-      }
-    }
-
-    private Object getFromQueue() {
-      HDFSGatewayEventImpl evt = queue.current();
-      if (type == null) {
-        return evt;
-      }
-      
-      switch (type) {
-      case KEYS:
-        byte[] key = evt.getSerializedKey();
-        return deserialize ? EntryEventImpl.deserialize(key) : key;
-        
-      case VALUES:
-        return evt.getValue();
-        
-      default:
-        Object keyObj = EntryEventImpl.deserialize(evt.getSerializedKey());
-        if(keyObj instanceof KeyWithRegionContext) {
-          ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
-        }
-        return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, evt, true, forUpdate);
-      }
-    }
-
-    private Object getFromHdfs() {
-      if (type == null) {
-        return hdfs.getValue();
-      }
-      
-      switch (type) {
-      case KEYS:
-        byte[] key = this.currentHdfsKey;
-        return deserialize ? EntryEventImpl.deserialize(key) : key;
-        
-      case VALUES:
-        PersistedEventImpl evt = hdfs.getValue();
-        return evt.getValue();
-        
-      default:
-        Object keyObj = EntryEventImpl.deserialize(this.currentHdfsKey);
-        if(keyObj instanceof KeyWithRegionContext) {
-          ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
-        }
-        return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, hdfs.getValue(), true, forUpdate);
-      }
-    }
-    
-    private void advanceHdfs() {
-      if (hdfsNext = hdfs.hasNext()) {
-        try {
-          this.currentHdfsKey = hdfs.next();
-        } catch (IOException e) {
-          region.checkForPrimary();
-          throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
-        }
-      } else {
-        this.currentHdfsKey = null;
-        hdfs.close();
-      }
-    }
-    
-    private void advanceQueue() {
-      if (queueNext = queue.hasNext()) {
-        queue.next();
-      } else {
-        brq.checkForPrimary();
-        queue.close();
-      }
-    }
-    
-    public void setForUpdate(){
-      this.forUpdate = true;
-    }
-    
-    /**MergeGemXDHDFSToGFE not sure of this function is required */ 
-    /*public void setTXState(TXState txState) {
-      TXRegionState txr = txState.getTXRegionState(region);
-      if (txr != null) {
-        txr.lock();
-        try {
-          this.txCreatedEntryIterator = txr.getCreatedEntryKeys().iterator();
-        }
-        finally{
-          txr.unlock();
-        }
-      }
-    }*/
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
deleted file mode 100644
index 607650f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.i18n.StringId;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-
-/**
- * Listener that persists events to HDFS
- *
- */
-public class HDFSEventListener implements AsyncEventListener {
-  private final LogWriterI18n logger;
-  private volatile boolean senderStopped = false;
-
-  private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
-
-  public HDFSEventListener(LogWriterI18n logger) {
-    this.logger = logger;
-  }
-  
-  @Override
-  public void close() {
-    senderStopped = true;
-  }
-  
-  @Override
-  public boolean processEvents(List<AsyncEvent> events) {
-    if (Hoplog.NOP_WRITE) {
-      return true;
-    }
-    
-    // The list of events that async queue receives are sorted at the
-    // bucket level. Events for multiple regions are concatenated together.
-    // Events for multiple buckets are sent which are concatenated
-    // one after the other for e.g.
-    //
-    // <Region1, Key1, bucket1>, <Region1, Key19, bucket1>, 
-    // <Region1, Key4, bucket2>, <Region1, Key6, bucket2>
-    // <Region2, Key1, bucket1>, <Region2, Key4, bucket1>
-    // ..
-    
-    Region previousRegion = null;
-    int prevBucketId = -1;
-    ArrayList<QueuedPersistentEvent> list = null;
-    boolean success = false;
-    try {
-      //Back off if we are experiencing failures
-      failureTracker.sleepIfRetry();
-      
-      HoplogOrganizer bucketOrganizer = null; 
-      for (AsyncEvent asyncEvent : events) {
-        if (senderStopped){
-          failureTracker.failure();
-          if (logger.fineEnabled()) {
-            logger.fine("HDFSEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
-          }
-          return false;
-        }
-        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
-        Region region = hdfsEvent.getRegion();
-        
-        if (prevBucketId != hdfsEvent.getBucketId() || region != previousRegion){
-          if (prevBucketId != -1) {
-            bucketOrganizer.flush(list.iterator(), list.size());
-            success=true;
-            if (logger.fineEnabled()) {
-              logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
-            }
-          }
-          bucketOrganizer = getOrganizer((PartitionedRegion) region, hdfsEvent.getBucketId());
-          // Bucket organizer can be null only when the bucket has moved. throw an exception so that the 
-          // batch is discarded. 
-          if (bucketOrganizer == null)
-            throw new BucketMovedException("Bucket moved. BucketId: " + hdfsEvent.getBucketId() +  " HDFSRegion: " + region.getName());
-          list = new  ArrayList<QueuedPersistentEvent>();
-        }
-        try {
-          //TODO:HDFS check if there is any serialization overhead
-          list.add(new SortedHDFSQueuePersistedEvent(hdfsEvent));
-        } catch (ClassNotFoundException e) {
-          //TODO:HDFS add localized string
-          logger.warning(new StringId(0, "Error while converting HDFSGatewayEvent to PersistedEventImpl."), e);
-          return false;
-        }
-        prevBucketId = hdfsEvent.getBucketId();
-        previousRegion = region;
-        
-      }
-      if (bucketOrganizer != null) {
-        bucketOrganizer.flush(list.iterator(), list.size());
-        success = true;
-        
-        if (logger.fineEnabled()) {
-          logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
-        }
-      }
-    } catch (IOException e) {
-      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
-      return false;
-    }
-    catch (ForceReattemptException e) {
-      if (logger.fineEnabled())
-        logger.fine(e);
-      return false;
-    }
-    catch(PrimaryBucketException e) {
-      //do nothing, the bucket is no longer primary so we shouldn't get the same
-      //batch next time.
-      if (logger.fineEnabled())
-        logger.fine(e);
-      return false;
-    }
-    catch(BucketMovedException e) {
-      //do nothing, the bucket is no longer primary so we shouldn't get the same
-      //batch next time.
-      if (logger.fineEnabled())
-        logger.fine(e);
-      return false;
-    }
-    catch (CacheClosedException e) {
-      if (logger.fineEnabled())
-        logger.fine(e);
-      // exit silently
-      return false;
-    } catch (InterruptedException e1) {
-      if (logger.fineEnabled())
-        logger.fine(e1);
-      return false;
-    } finally {
-      failureTracker.record(success);
-    }
-
-    return true;
-  }
-  
-  private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
-    BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
-    if (br == null) {
-      // got rebalanced or something
-      throw new PrimaryBucketException("Bucket region is no longer available " + bucketId + region);
-    }
-
-    return br.getHoplogOrganizer();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
deleted file mode 100644
index 0860e75..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
-import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-
-/**
- * Current use of this class is limited to ignoring the Bulk DML operations. 
- * 
- *
- */
-public class HDFSEventQueueFilter implements GatewayEventFilter{
-  private LogWriterI18n logger;
-  
-  public HDFSEventQueueFilter(LogWriterI18n logger) {
-    this.logger = logger; 
-  }
-  @Override
-  public void close() {
-    
-  }
-
-  @Override
-  public boolean beforeEnqueue(GatewayQueueEvent event) {
-    Operation op = event.getOperation();
-    
-    
-    /* MergeGemXDHDFSToGFE - Disabled as it is gemxd specific 
-    if (op == Operation.BULK_DML_OP) {
-     // On accessors there are no parallel queues, so with the 
-     // current logic, isSerialWanEnabled function in LocalRegion 
-     // always returns true on an accessor. So when a bulk dml 
-     // op is fired on accessor, this behavior results in distribution 
-     // of the bulk dml operation to other members. To avoid putting 
-     // of this bulk dml in parallel queues, added this filter. This 
-     // is not the efficient way as the filters are used before inserting 
-     // in the queue. The bulk dmls should be blocked before they are distributed.
-     if (logger.fineEnabled())
-       logger.fine( "HDFSEventQueueFilter:beforeEnqueue: Disallowing insertion of a bulk DML in HDFS queue.");
-      return false;
-    }*/
-    
-    return true;
-  }
-
-  @Override
-  public boolean beforeTransmit(GatewayQueueEvent event) {
-   // No op
-   return true;
-  }
-
-  @Override
-  public void afterAcknowledgement(GatewayQueueEvent event) {
-    // No op
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
deleted file mode 100644
index db99e7e..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.EntryEvent;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.lru.Sizeable;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
-import com.gemstone.gemfire.internal.offheap.StoredObject;
-import com.gemstone.gemfire.internal.offheap.annotations.Retained;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-
-
-/**
- * Gateway event extended for HDFS functionality 
- *
- */
-public class HDFSGatewayEventImpl extends GatewaySenderEventImpl {
-  
-  private static final long serialVersionUID = 4642852957292192406L;
-  protected transient boolean keyIsSerialized = false;
-  protected byte[] serializedKey = null; 
-  protected VersionTag versionTag; 
-  
-  public HDFSGatewayEventImpl(){
-  }
-  
-  @Retained
-  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
-      Object substituteValue)
-      throws IOException {
-    super(operation, event, substituteValue);
-    initializeHDFSGatewayEventObject(event);
-  }
-
-  @Retained
-  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
-      Object substituteValue, boolean initialize, int bucketId) throws IOException {
-    super(operation, event,substituteValue, initialize, bucketId);
-    initializeHDFSGatewayEventObject(event);
-  }
-
-  @Retained
-  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
-      Object substituteValue, boolean initialize) throws IOException {
-    super(operation, event, substituteValue, initialize);
-    initializeHDFSGatewayEventObject(event);
-  }
-
-  protected HDFSGatewayEventImpl(HDFSGatewayEventImpl offHeapEvent) {
-    super(offHeapEvent);
-    this.keyIsSerialized = offHeapEvent.keyIsSerialized;
-    this.serializedKey = offHeapEvent.serializedKey;
-    this.versionTag = offHeapEvent.versionTag;
-  }
-  
-  @Override
-  protected GatewaySenderEventImpl makeCopy() {
-    return new HDFSGatewayEventImpl(this);
-  }
-
-  private void initializeHDFSGatewayEventObject(EntryEvent event)
-      throws IOException {
-
-    serializeKey();
-    versionTag = ((EntryEventImpl)event).getVersionTag();
-    if (versionTag != null && versionTag.getMemberID() == null) {
-      versionTag.setMemberID(((LocalRegion)getRegion()).getVersionMember());
-    }
-  }
-
-  private void serializeKey() throws IOException {
-    if (!keyIsSerialized && isInitialized())
-    {
-      this.serializedKey = CacheServerHelper.serialize(this.key);
-      keyIsSerialized = true;
-    } 
-  }
-  /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
-  /*@Override
-  protected StoredObject obtainOffHeapValueBasedOnOp(EntryEventImpl event,
-      boolean hasNonWanDispatcher) {
-    return  event.getOffHeapNewValue();
-  }*/
-  
-  /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
-  /*@Override
-  protected Object obtainHeapValueBasedOnOp(EntryEventImpl event,
-      boolean hasNonWanDispatcher) {
-    return   event.getRawNewValue(shouldApplyDelta());
-  }*/
-  
-  @Override
-  protected boolean shouldApplyDelta() {
-    return true;
-  }
-
-  
-  @Override
-  public void toData(DataOutput out) throws IOException {
-    super.toData(out);
-    DataSerializer.writeObject(this.versionTag, out);
-    
-  }
-  
-  @Override
-  protected void serializeKey(DataOutput out) throws IOException {
-    DataSerializer.writeByteArray((byte[])this.serializedKey, out);
-  }
-  
-  @Override
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    super.fromData(in);
-    this.versionTag = (VersionTag)DataSerializer.readObject(in);
-  }
-  
-  @Override
-  protected void deserializeKey(DataInput in) throws IOException,
-    ClassNotFoundException {
-    this.serializedKey = DataSerializer.readByteArray(in);
-    this.key = BlobHelper.deserializeBlob(this.serializedKey,
-        InternalDataSerializer.getVersionForDataStreamOrNull(in), null);
-    keyIsSerialized = true;
-  }
-
-  @Override
-  public int getDSFID() {
-    
-    return HDFS_GATEWAY_EVENT_IMPL;
-  }
-  public byte[] getSerializedKey() {
-    
-    return this.serializedKey;
-  }
-  
-  public VersionTag getVersionTag() {
-    
-    return this.versionTag;
-  }
-  
-  /**
-   * Returns the size on HDFS of this event  
-   * @param writeOnly
-   */
-  public int getSizeOnHDFSInBytes(boolean writeOnly) {
-  
-    if (writeOnly)
-      return UnsortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,  
-          getSerializedValueSize(), this.versionTag);
-    else
-      return SortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,  
-          getSerializedValueSize(), this.versionTag);
-  
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/46535f28/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
deleted file mode 100644
index 740a607..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionAttributes;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.i18n.LogWriterI18n;
-import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- * Contains utility functions
- *
- *
- */
-public class HDFSIntegrationUtil {
-
-  public static <K, V> AsyncEventQueue createDefaultAsyncQueueForHDFS(Cache cache, boolean writeOnly, String regionPath) {
-    return createAsyncQueueForHDFS(cache, regionPath, writeOnly, null);
-  }
-
-  private static AsyncEventQueue createAsyncQueueForHDFS(Cache cache, String regionPath, boolean writeOnly,
-      HDFSStore configView) {
-    LogWriterI18n logger = cache.getLoggerI18n();
-    String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
-
-    if (configView == null) {
-      configView = new HDFSStoreFactoryImpl(cache).getConfigView();
-    }
-    
-
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(configView.getBatchSize());
-    factory.setPersistent(configView.getBufferPersistent());
-    factory.setDiskStoreName(configView.getDiskStoreName());
-    factory.setMaximumQueueMemory(configView.getMaxMemory());
-    factory.setBatchTimeInterval(configView.getBatchInterval());
-    factory.setDiskSynchronous(configView.getSynchronousDiskWrite());
-    factory.setDispatcherThreads(configView.getDispatcherThreads());
-    factory.setParallel(true);
-    factory.addGatewayEventFilter(new HDFSEventQueueFilter(logger));
-    ((AsyncEventQueueFactoryImpl) factory).setBucketSorted(!writeOnly);
-    ((AsyncEventQueueFactoryImpl) factory).setIsHDFSQueue(true);
-
-    AsyncEventQueue asyncQ = null;
-
-    if (!writeOnly)
-      asyncQ = factory.create(defaultAsyncQueueName, new HDFSEventListener(cache.getLoggerI18n()));
-    else
-      asyncQ = factory.create(defaultAsyncQueueName, new HDFSWriteOnlyStoreEventListener(cache.getLoggerI18n()));
-
-    logger.fine("HDFS: async queue created for HDFS. Id: " + asyncQ.getId() + ". Disk store: "
-        + asyncQ.getDiskStoreName() + ". Batch size: " + asyncQ.getBatchSize() + ". bucket sorted:  " + !writeOnly);
-    return asyncQ;
-
-  }
-
-  public static void createAndAddAsyncQueue(String regionPath, RegionAttributes regionAttributes, Cache cache) {
-    if (!regionAttributes.getDataPolicy().withHDFS()) {
-      return;
-    }
-
-    String leaderRegionPath = getLeaderRegionPath(regionPath, regionAttributes, cache);
-
-    String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(leaderRegionPath);
-    if (cache.getAsyncEventQueue(defaultAsyncQueueName) == null) {
-      if (regionAttributes.getHDFSStoreName() != null && regionAttributes.getPartitionAttributes() != null
-          && !(regionAttributes.getPartitionAttributes().getLocalMaxMemory() == 0)) {
-        HDFSStore store = ((GemFireCacheImpl) cache).findHDFSStore(regionAttributes.getHDFSStoreName());
-        if (store == null) {
-          throw new IllegalStateException(
-              LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND.toLocalizedString(regionAttributes.getHDFSStoreName()));
-        }
-        HDFSIntegrationUtil
-            .createAsyncQueueForHDFS(cache, leaderRegionPath, regionAttributes.getHDFSWriteOnly(), store);
-      }
-    }
-  }
-
-  private static String getLeaderRegionPath(String regionPath, RegionAttributes regionAttributes, Cache cache) {
-    String colocated;
-    while (regionAttributes.getPartitionAttributes() != null
-        && (colocated = regionAttributes.getPartitionAttributes().getColocatedWith()) != null) {
-      // Do not waitOnInitialization() for PR
-      GemFireCacheImpl gfc = (GemFireCacheImpl) cache;
-      Region colocatedRegion = gfc.getPartitionedRegion(colocated, false);
-      if (colocatedRegion == null) {
-        Assert.fail("Could not find parent region " + colocated + " for " + regionPath);
-      }
-      regionAttributes = colocatedRegion.getAttributes();
-      regionPath = colocatedRegion.getFullPath();
-    }
-    return regionPath;
-  }
-
-}