You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:50:05 UTC

[19/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
new file mode 100644
index 0000000..e8abb38
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
@@ -0,0 +1,2004 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.gemstone.gemfire.internal.hll.CardinalityMergeException;
+import com.gemstone.gemfire.internal.hll.HyperLogLog;
+import com.gemstone.gemfire.internal.hll.ICardinality;
+import com.gemstone.gemfire.internal.hll.MurmurHash;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import com.gemstone.gemfire.InternalGemFireException;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager.CompactionRequest;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReaderActivityListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.Meta;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.IOOperation;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.concurrent.ConcurrentHashSet;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * Manages sorted oplog files for a bucket. An instance per bucket will exist in
+ * each PR
+ * 
+ */
+public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHoplogPersistedEvent> {
+  public static final int AVG_NUM_KEYS_PER_INDEX_BLOCK = 200;
+  
+  // all valid sorted hoplogs will follow the following name pattern
+  public static final String SORTED_HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
+      + FLUSH_HOPLOG_EXTENSION + "|" + MINOR_HOPLOG_EXTENSION + "|"
+      + MAJOR_HOPLOG_EXTENSION + ")";
+  public static final Pattern SORTED_HOPLOG_PATTERN = Pattern.compile(SORTED_HOPLOG_REGEX);
+  
+  //Amount of time before deleting old temporary files
+  final long TMP_FILE_EXPIRATION_TIME_MS = Long.getLong(HoplogConfig.TMP_FILE_EXPIRATION, HoplogConfig.TMP_FILE_EXPIRATION_DEFAULT);
+  
+  static float RATIO = HoplogConfig.COMPACTION_FILE_RATIO_DEFAULT;
+
+  // Compacter for this bucket
+  private Compactor compacter;
+    
+  private final HoplogReadersController hoplogReadersController;
+  private AtomicLong previousCleanupTimestamp = new AtomicLong(Long.MIN_VALUE);
+
+  /**
+   * The default HLL constant. gives an accuracy of about 3.25%
+   * public only for testing upgrade from 1.3 to 1.4
+   */
+  public static double HLL_CONSTANT = 0.03;
+  /**
+   * This estimator keeps track of this buckets entry count. This value is
+   * affected by flush and compaction cycles
+   */
+  private volatile ICardinality bucketSize = new HyperLogLog(HLL_CONSTANT);
+  //A set of tmp files that existed when this bucket organizer was originally
+  //created. These may still be open by the old primary, or they may be
+  //abandoned files.
+  private LinkedList<FileStatus> oldTmpFiles;
+
+  private ConcurrentMap<Hoplog, Boolean> tmpFiles = new ConcurrentHashMap<Hoplog, Boolean>();
+
+  protected volatile boolean organizerClosed = false;
+
+  /**
+   * For the 1.4 release we are changing the HLL_CONSTANT which will make the
+   * old persisted HLLs incompatible with the new HLLs. To fix this we will
+   * force a major compaction when the system starts up so that we will only
+   * have new HLLs in the system (see bug 51403)
+   */
+  private boolean startCompactionOnStartup = false;
+
+  /**
+   * @param region
+   *          Region manager instance. Instances of hdfs listener instance,
+   *          stats collector, file system, etc are shared by all buckets of a
+   *          region and provided by region manager instance
+   * @param bucketId bucket id to be managed by this organizer
+   * @throws IOException
+   */
+  public HdfsSortedOplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
+    super(region, bucketId);
+    
+    String val = System.getProperty(HoplogConfig.COMPACTION_FILE_RATIO);
+    try {
+      RATIO = Float.parseFloat(val);
+    } catch (Exception e) {
+    }
+
+    hoplogReadersController = new HoplogReadersController();
+    
+    // initialize with all the files in the directory
+    List<Hoplog> hoplogs = identifyAndLoadSortedOplogs(true);
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Initializing bucket with existing hoplogs, count = " + hoplogs.size(), logPrefix);
+    }
+    for (Hoplog hoplog : hoplogs) {
+      addSortedOplog(hoplog, false, true);
+    }
+
+    // initialize sequence to the current maximum
+    sequence = new AtomicInteger(findMaxSequenceNumber(hoplogs));
+    
+    initOldTmpFiles();
+    
+    FileSystem fs = store.getFileSystem();
+    Path cleanUpIntervalPath = new Path(store.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME); 
+    if (!fs.exists(cleanUpIntervalPath)) {
+      long intervalDurationMillis = store.getPurgeInterval() * 60 * 1000;
+      HoplogUtil.exposeCleanupIntervalMillis(fs, cleanUpIntervalPath, intervalDurationMillis);
+    }
+
+    if (startCompactionOnStartup) {
+      forceCompactionOnVersionUpgrade();
+      if (logger.isInfoEnabled()) {
+        logger.info(LocalizedStrings.HOPLOG_MAJOR_COMPACTION_SCHEDULED_FOR_BETTER_ESTIMATE);
+      }
+    }
+  }
+
+  /**
+   * Iterates on the input buffer and persists it in a new sorted oplog. This operation is
+   * synchronous and blocks the thread.
+   */
+  @Override
+  public void flush(Iterator<? extends QueuedPersistentEvent> iterator, final int count)
+      throws IOException, ForceReattemptException {
+    assert iterator != null;
+
+    if (logger.isDebugEnabled())
+      logger.debug("{}Initializing flush operation", logPrefix);
+    
+    final Hoplog so = getTmpSortedOplog(null, FLUSH_HOPLOG_EXTENSION);
+    HoplogWriter writer = null;
+    ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
+    
+    // variables for updating stats
+    long start = stats.getFlush().begin();
+    int byteCount = 0;
+    
+    try {
+      /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
+      //HeapDataOutputStream out = new HeapDataOutputStream();
+      
+      try {
+        writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
+          @Override
+          public HoplogWriter call() throws Exception {
+            return so.createWriter(count);
+          }
+        });
+      } catch (Exception e) {
+        if (e instanceof IOException) {
+          throw (IOException)e;
+        }
+        throw new IOException(e);
+      }
+
+      while (iterator.hasNext() && !this.organizerClosed) {
+        HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
+        
+        QueuedPersistentEvent item = iterator.next();
+        item.toHoplogEventBytes(out);
+        byte[] valueBytes = out.toByteArray();
+        writer.append(item.getRawKey(), valueBytes);
+        
+        // add key length and value length to stats byte counter
+        byteCount += (item.getRawKey().length + valueBytes.length);
+
+        // increment size only if entry is not deleted
+        if (!isDeletedEntry(valueBytes, 0)) {
+          int hash = MurmurHash.hash(item.getRawKey());
+          localHLL.offerHashed(hash);
+        }
+        /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
+        //out.clearForReuse();
+      }
+      if (organizerClosed)
+        throw new BucketMovedException("The current bucket is moved BucketID: "+  
+            this.bucketId + " Region name: " +  this.regionManager.getRegion().getName());
+      
+      // append completed. provide cardinality and close writer
+      writer.close(buildMetaData(localHLL));
+      writer = null;
+    } catch (IOException e) {
+      stats.getFlush().error(start);
+      try {
+        e = handleWriteHdfsIOError(writer, so, e);
+      } finally {
+        //Set the writer to null because handleWriteHDFSIOError has
+        //already closed the writer.
+        writer = null;
+      }
+      throw e;
+    } catch (BucketMovedException e) {
+      stats.getFlush().error(start);
+      deleteTmpFile(writer, so);
+      writer = null;
+      throw e;
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+
+    try{
+      
+      // ping secondaries before making the file a legitimate file to ensure 
+      // that in case of split brain, no other vm has taken up as primary. #50110.  
+      pingSecondaries();
+      
+      // rename file and check if renaming was successful
+      synchronized (changePrimarylockObject) {
+        if (!organizerClosed)
+          makeLegitimate(so);
+        else 
+          throw new BucketMovedException("The current bucket is moved BucketID: "+  
+              this.bucketId + " Region name: " +  this.regionManager.getRegion().getName());
+      }
+      try {
+        so.getSize();
+      } catch (IllegalStateException e) {
+        throw new IOException("Failed to rename hoplog file:" + so.getFileName());
+      }
+      
+      //Disabling this assertion due to bug 49740
+      // check to make sure the sequence number is correct
+//      if (ENABLE_INTEGRITY_CHECKS) {
+//        Assert.assertTrue(getSequenceNumber(so) == findMaxSequenceNumber(identifyAndLoadSortedOplogs(false)), 
+//            "Invalid sequence number detected for " + so.getFileName());
+//      }
+      
+      // record the file for future maintenance and reads
+      addSortedOplog(so, false, true);
+      stats.getFlush().end(byteCount, start);
+      incrementDiskUsage(so.getSize());
+    } catch (BucketMovedException e) {
+      stats.getFlush().error(start);
+      deleteTmpFile(writer, so);
+      writer = null;
+      throw e;
+    } catch (IOException e) {
+      stats.getFlush().error(start);
+      logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
+      throw e;
+    }
+
+    submitCompactionRequests();
+  }
+
+
+  /**
+   * store cardinality information in metadata
+   * @param localHLL the hll estimate for this hoplog only
+   */
+  private EnumMap<Meta, byte[]> buildMetaData(ICardinality localHLL) throws IOException {
+    EnumMap<Meta, byte[]> map = new EnumMap<Hoplog.Meta, byte[]>(Meta.class);
+    map.put(Meta.LOCAL_CARDINALITY_ESTIMATE_V2, localHLL.getBytes());
+    return map;
+  }
+
+  private void submitCompactionRequests() throws IOException {
+    CompactionRequest req;
+    
+    // determine if a major compaction is needed and create a compaction request
+    // with compaction manager
+    if (store.getMajorCompaction()) {
+      if (isMajorCompactionNeeded()) {
+        req = new CompactionRequest(regionFolder, bucketId, getCompactor(), true);
+        HDFSCompactionManager.getInstance(store).submitRequest(req);
+      }
+    }
+    
+    // submit a minor compaction task. It will be ignored if there is no work to
+    // be done.
+    if (store.getMinorCompaction()) {
+      req = new CompactionRequest(regionFolder, bucketId, getCompactor(), false);
+      HDFSCompactionManager.getInstance(store).submitRequest(req);
+    }
+  }
+
+  /**
+   * @return true if the oldest hoplog was created 1 major compaction interval ago
+   */
+  private boolean isMajorCompactionNeeded() throws IOException {
+    // major compaction interval in milliseconds
+    
+    long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
+
+    Hoplog oplog = hoplogReadersController.getOldestHoplog();
+    if (oplog == null) {
+      return false;
+    }
+    
+    long oldestFileTime = oplog.getModificationTimeStamp();
+    long now = System.currentTimeMillis();
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Checking oldest hop " + oplog.getFileName()
+          + " for majorCompactionInterval=" + majorCInterval
+          + " + now=" + now, logPrefix);
+    }
+    if (oldestFileTime > 0l && oldestFileTime < (now - majorCInterval)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public SortedHoplogPersistedEvent read(byte[] key) throws IOException {
+    long startTime = stats.getRead().begin();
+    String user = logger.isDebugEnabled() ? "Read" : null;
+    
+    // collect snapshot of hoplogs
+    List<TrackedReference<Hoplog>> hoplogs = null;
+    hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+    try {
+      // search for the key in order starting with the youngest oplog
+      for (TrackedReference<Hoplog> hoplog : hoplogs) {
+        HoplogReader reader = hoplog.get().getReader();
+        byte[] val = reader.read(key);
+        if (val != null) {
+          // value found in a younger hoplog. stop iteration
+          SortedHoplogPersistedEvent eventObj = deserializeValue(val);
+          stats.getRead().end(val.length, startTime);
+          return eventObj;
+        }
+      }
+    } catch (IllegalArgumentException e) {
+      if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+        throw handleIOError((IOException) e.getCause());
+      } else {
+        throw e;
+      }
+    } catch (IOException e) {
+      throw handleIOError(e);
+    } catch (HDFSIOException e) {
+        throw handleIOError(e);
+    } finally {
+      hoplogReadersController.releaseHoplogs(hoplogs, user);
+    }
+    
+    stats.getRead().end(0, startTime);
+    return null;
+  }
+
+  protected IOException handleIOError(IOException e) {
+    // expose the error wrapped inside remote exception
+    if (e instanceof RemoteException) {
+      return ((RemoteException) e).unwrapRemoteException();
+    } 
+    
+    checkForSafeError(e);
+    
+    // it is not a safe error. let the caller handle it
+    return e;
+  }
+  
+  protected HDFSIOException handleIOError(HDFSIOException e) {
+    checkForSafeError(e);
+    return e;
+  }
+
+  protected void checkForSafeError(Exception e) {
+    boolean safeError = ShutdownHookManager.get().isShutdownInProgress();
+    if (safeError) {
+      // IOException because of closed file system. This happens when member is
+      // shutting down
+      if (logger.isDebugEnabled())
+        logger.debug("IO error caused by filesystem shutdown", e);
+      throw new CacheClosedException("IO error caused by filesystem shutdown", e);
+    } 
+
+    if(isClosed()) {
+      //If the hoplog organizer is closed, throw an exception to indicate the 
+      //caller should retry on the new primary.
+      throw new PrimaryBucketException(e);
+    }
+  }
+  
+  protected IOException handleWriteHdfsIOError(HoplogWriter writer, Hoplog so, IOException e)
+      throws IOException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Handle write error:" + so, logPrefix);
+    }
+    
+    closeWriter(writer);
+    // add to the janitor queue
+    tmpFiles.put(so, Boolean.TRUE);
+
+    return handleIOError(e);
+  }
+
+  private void deleteTmpFile(HoplogWriter writer, Hoplog so) {
+    closeWriter(writer);
+    
+    // delete the temporary hoplog
+    try {
+      if (so != null) {
+        so.delete();
+      }
+    } catch (IOException e1) {
+      logger.info(e1);
+    }
+  }
+
+  private void closeWriter(HoplogWriter writer) {
+    if (writer != null) {
+      // close writer before deleting it
+      try {
+        writer.close();
+      } catch (Throwable e1) {
+        // error to close hoplog will happen if no connections to datanode are
+        // available. Try to delete the file on namenode
+        if(!isClosed()) {
+          logger.info(e1);
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes hoplog and suppresses IO during reader close. Suppressing IO errors
+   * when the organizer is closing or an hoplog becomes inactive lets the system
+   * continue freeing other resources. It could potentially lead to socket
+   * leaks though.
+   */
+  private void closeReaderAndSuppressError(Hoplog hoplog, boolean clearCache) {
+    try {
+      hoplog.close();
+    } catch (IOException e) {
+      // expose the error wrapped inside remote exception
+      if (e instanceof RemoteException) {
+        e = ((RemoteException) e).unwrapRemoteException();
+      } 
+      logger.info(e);
+    }
+  }
+
+  @Override
+  public BucketIterator scan() throws IOException {
+    String user = logger.isDebugEnabled() ? "Scan" : null;
+    List<TrackedReference<Hoplog>> hoplogs = null;
+    BucketIterator iter = null;
+    try {
+      hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+      iter = new BucketIterator(hoplogs);
+      return iter;
+    }  finally {
+      // Normally the hoplogs will be released when the iterator is closed. The
+      // hoplogs must be released only if creating the iterator has failed.
+      if (iter == null) {
+        hoplogReadersController.releaseHoplogs(hoplogs, user);
+      }
+    }
+  }
+
+  @Override
+  public BucketIterator scan(byte[] from, byte[] to) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public BucketIterator scan(byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public HoplogIterator<byte[], SortedHoplogPersistedEvent> scan(
+      long startOffset, long length) throws IOException {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    
+    synchronized (changePrimarylockObject) {
+      organizerClosed = true;
+    }
+    //Suspend compaction
+    getCompactor().suspend();
+    
+    //Close the readers controller.
+    hoplogReadersController.close();
+    
+    previousCleanupTimestamp.set(Long.MIN_VALUE);
+    
+  }
+
+  /**
+   * This method call will happen on secondary node. The secondary node needs to update its data
+   * structures
+   */
+  @Override
+  public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
+      throws IOException {
+    for (Hoplog oplog : oplogs) {
+      addSortedOplog(oplog, false, true);
+    }
+  }
+
+  @Override
+  public long sizeEstimate() {
+    return this.bucketSize.cardinality();
+  }
+
+  private void addSortedOplog(Hoplog so, boolean notify, boolean addsToBucketSize)
+  throws IOException {
+    if (!hoplogReadersController.addSortedOplog(so)) {
+      so.close();
+      throw new InternalGemFireException("Failed to add " + so);
+    }
+
+    String user = logger.isDebugEnabled() ? "Add" : null;
+    if (addsToBucketSize) {
+      TrackedReference<Hoplog> ref = null;
+      try {
+        ref = hoplogReadersController.trackHoplog(so, user);
+        synchronized (bucketSize) {
+          ICardinality localHLL = ref.get().getEntryCountEstimate();
+          if (localHLL != null) {
+            bucketSize = mergeHLL(bucketSize, localHLL);
+          }
+        }
+      } finally {
+        if (ref != null) {
+          hoplogReadersController.releaseHoplog(ref, user);
+        }
+      }
+    }
+
+    if (notify && listener != null) {
+      listener.hoplogCreated(regionFolder, bucketId, so);
+    }
+  }
+
+  private void reEstimateBucketSize() throws IOException {
+    ICardinality global = null;
+    String user = logger.isDebugEnabled() ? "HLL" : null;
+    List<TrackedReference<Hoplog>> hoplogs = null;
+    try {
+      hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+      global = new HyperLogLog(HLL_CONSTANT);
+      for (TrackedReference<Hoplog> hop : hoplogs) {
+        global = mergeHLL(global, hop.get().getEntryCountEstimate());
+      }
+    } finally {
+      hoplogReadersController.releaseHoplogs(hoplogs, user);
+    }
+    bucketSize = global;
+  }
+
+  protected ICardinality mergeHLL(ICardinality global, ICardinality local)
+  /*throws IOException*/ {
+    try {
+      return global.merge(local);
+    } catch (CardinalityMergeException e) {
+      // uncomment this after the 1.4 release
+      //throw new InternalGemFireException(e.getLocalizedMessage(), e);
+      startCompactionOnStartup = true;
+      return global;
+    }
+  }
+
+  private void removeSortedOplog(TrackedReference<Hoplog> so, boolean notify) throws IOException {
+    hoplogReadersController.removeSortedOplog(so);
+    
+    // release lock before notifying listeners
+    if (notify && listener != null) {
+      listener.hoplogDeleted(regionFolder, bucketId, so.get());
+    }
+  }
+  
+  private void notifyCompactionListeners(boolean isMajor) {
+    listener.compactionCompleted(regionFolder, bucketId, isMajor);
+  }
+  
+  /**
+   * This method call will happen on secondary node. The secondary node needs to update its data
+   * structures
+   * @throws IOException 
+   */
+  @Override
+  public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public synchronized Compactor getCompactor() {
+    if (compacter == null) {
+      compacter = new HoplogCompactor();
+    }
+    return compacter;
+  }
+
+  @Override
+  protected Hoplog getHoplog(Path hoplogPath) throws IOException {
+    Hoplog so = new HFileSortedOplog(store, hoplogPath, store.getBlockCache(), stats, store.getStats());
+    return so;
+  }
+
+  /**
+   * locks sorted oplogs collection, removes oplog and renames for deletion later
+   * @throws IOException 
+   */
+  void markSortedOplogForDeletion(List<TrackedReference<Hoplog>> targets, boolean notify) throws IOException {
+    for (int i = targets.size(); i > 0; i--) {
+      TrackedReference<Hoplog> so = targets.get(i - 1);
+      removeSortedOplog(so, true);
+      if (!store.getFileSystem().exists(new Path(bucketPath, so.get().getFileName()))) {
+        // the hoplog does not even exist on file system. Skip remaining steps
+        continue;
+      }
+      addExpiryMarkerForAFile(so.get());
+    }
+  }
+  
+  /**
+   * Deletes expired hoplogs and expiry markers from the file system. Calculates
+   * a target timestamp based on cleanup interval. Then gets list of target
+   * hoplogs. It also updates the disk usage state
+   * 
+   * @return number of files deleted
+   */
+   synchronized int initiateCleanup() throws IOException {
+    int conf = store.getPurgeInterval();
+    // minutes to milliseconds
+    long intervalDurationMillis = conf * 60 * 1000;
+    // Any expired hoplog with timestamp less than targetTS is a delete
+    // candidate.
+    long targetTS = System.currentTimeMillis() - intervalDurationMillis;
+    if (logger.isDebugEnabled()) {
+      logger.debug("Target timestamp for expired hoplog deletion " + targetTS, logPrefix);
+    }
+    // avoid too frequent cleanup invocations. Exit cleanup invocation if the
+    // previous cleanup was executed within 10% range of cleanup interval
+    if (previousCleanupTimestamp.get() > targetTS
+        && (previousCleanupTimestamp.get() - targetTS) < (intervalDurationMillis / 10)) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Skip cleanup, previous " + previousCleanupTimestamp.get(), logPrefix);
+      }
+      return 0;
+    }
+
+    List<FileStatus> targets = getOptimizationTargets(targetTS);
+    return deleteExpiredFiles(targets);
+  }
+
+  protected int deleteExpiredFiles(List<FileStatus> targets) throws IOException {
+    if (targets == null) {
+      return 0;
+    }
+
+    for (FileStatus file : targets) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Deleting file: " + file.getPath(), logPrefix);
+      }
+      store.getFileSystem().delete(file.getPath(), false);
+      
+      if (isClosed()) {
+        if (logger.isDebugEnabled())
+          logger.debug("{}Expiry file cleanup interupted by bucket close", logPrefix);
+        return 0;
+      }
+      incrementDiskUsage(-1 * file.getLen());
+    }
+
+    previousCleanupTimestamp.set(System.currentTimeMillis());
+    return targets.size();
+  }
+
+  /**
+   * @param ts
+   *          target timestamp
+   * @return list of hoplogs, whose expiry markers were created before target
+   *         timestamp, and the expiry marker itself.
+   * @throws IOException
+   */
+  protected List<FileStatus> getOptimizationTargets(long ts) throws IOException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Identifying optimization targets " + ts, logPrefix);
+    }
+
+    List<FileStatus> deleteTargets = new ArrayList<FileStatus>();
+    FileStatus[] markers = getExpiryMarkers();
+    if (markers != null) {
+      for (FileStatus marker : markers) {
+        String name = truncateExpiryExtension(marker.getPath().getName());
+        long timestamp = marker.getModificationTime();
+
+        // expired minor compacted files are not being used anywhere. These can
+        // be removed immediately. All the other expired files should be removed
+        // when the files have aged
+        boolean isTarget = false;
+        
+        if (name.endsWith(MINOR_HOPLOG_EXTENSION)) {
+          isTarget = true;
+        } else if (timestamp < ts && name.endsWith(FLUSH_HOPLOG_EXTENSION)) {
+          isTarget = true;
+        } else if (timestamp < ts && name.endsWith(MAJOR_HOPLOG_EXTENSION)) {
+          long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
+          if (timestamp < (System.currentTimeMillis() - majorCInterval)) {
+            isTarget = true;
+          }
+        }
+        if (!isTarget) {
+          continue;
+        }
+        
+        // if the file is still being read, do not delete or rename it
+        TrackedReference<Hoplog> used = hoplogReadersController.getInactiveHoplog(name);
+        if (used != null) {
+          if (used.inUse() && logger.isDebugEnabled()) {
+            logger.debug("{}Optimizer: found active expired hoplog:" + name, logPrefix);
+          } else if (logger.isDebugEnabled()) {
+            logger.debug("{}Optimizer: found open expired hoplog:" + name, logPrefix);
+          }
+          continue;
+        }
+        
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Delete target identified " + marker.getPath(), logPrefix);
+        }
+        
+        deleteTargets.add(marker);
+        Path hoplogPath = new Path(bucketPath, name);
+        if (store.getFileSystem().exists(hoplogPath)) {
+          FileStatus hoplog = store.getFileSystem().getFileStatus(hoplogPath);
+          deleteTargets.add(hoplog);
+        }
+      }
+    }
+    return deleteTargets;
+  }
+
+  /**
+   * Returns a list of of hoplogs present in the bucket's directory, expected to be called during
+   * hoplog set initialization
+   */
+  List<Hoplog> identifyAndLoadSortedOplogs(boolean countSize) throws IOException {
+    FileSystem fs = store.getFileSystem();
+    if (! fs.exists(bucketPath)) {
+      return new ArrayList<Hoplog>();
+    }
+    
+    FileStatus allFiles[] = fs.listStatus(bucketPath);
+    ArrayList<FileStatus> validFiles = new ArrayList<FileStatus>();
+    for (FileStatus file : allFiles) {
+      // All hoplog files contribute to disk usage
+      Matcher matcher = HOPLOG_NAME_PATTERN.matcher(file.getPath().getName());
+      if (! matcher.matches()) {
+        // not a hoplog
+        continue;
+      }
+      
+      // account for the disk used by this file
+      if (countSize) {
+        incrementDiskUsage(file.getLen());
+      }
+      
+      // All valid hoplog files must match the regex
+      matcher = SORTED_HOPLOG_PATTERN.matcher(file.getPath().getName());
+      if (matcher.matches()) {
+        validFiles.add(file);
+      }
+    }
+    
+    FileStatus[] markers = getExpiryMarkers();
+    FileStatus[] validHoplogs = filterValidHoplogs(
+        validFiles.toArray(new FileStatus[validFiles.size()]), markers);
+
+    ArrayList<Hoplog> results = new ArrayList<Hoplog>();
+    if (validHoplogs == null || validHoplogs.length == 0) {
+      return results;
+    }
+
+    for (int i = 0; i < validHoplogs.length; i++) {
+      // Skip directories
+      if (validHoplogs[i].isDirectory()) {
+        continue;
+      }
+
+      final Path p = validHoplogs[i].getPath();
+      // skip empty file
+      if (fs.getFileStatus(p).getLen() <= 0) {
+        continue;
+      }
+
+      Hoplog hoplog = new HFileSortedOplog(store, p, store.getBlockCache(), stats, store.getStats());
+      results.add(hoplog);
+    }
+
+    return results;
+  }
+
+  private static int findMaxSequenceNumber(List<Hoplog> hoplogs) throws IOException {
+    int maxSeq = 0;
+    for (Hoplog hoplog : hoplogs) {
+      maxSeq = Math.max(maxSeq, getSequenceNumber(hoplog));
+    }
+    return maxSeq;
+  }
+
+  /**
+   * @return the sequence number associate with a hoplog file
+   */
+  static int getSequenceNumber(Hoplog hoplog) {
+    Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(hoplog.getFileName());
+    boolean matched = matcher.find();
+    assert matched;
+    return Integer.valueOf(matcher.group(3));
+  }
+
+  protected FileStatus[] getExpiryMarkers() throws IOException {
+    FileSystem fs = store.getFileSystem();
+    if (hoplogReadersController.hoplogs == null
+        || hoplogReadersController.hoplogs.size() == 0) {
+      // there are no hoplogs in the system. May be the bucket is not existing
+      // at all.
+      if (!fs.exists(bucketPath)) {
+        if (logger.isDebugEnabled())
+          logger.debug("{}This bucket is unused, skipping expired hoplog check", logPrefix);
+        return null;
+      }
+    }
+    
+    FileStatus files[] = FSUtils.listStatus(fs, bucketPath, new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All expired hoplog end with expire extension and must match the valid file regex
+        String fileName = file.getName();
+        if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+          return false;
+        }
+        fileName = truncateExpiryExtension(fileName);
+        Matcher matcher = SORTED_HOPLOG_PATTERN.matcher(fileName);
+        return matcher.find();
+      }
+
+    });
+    return files;
+  }
+  
+  @Override
+  public void clear() throws IOException {
+    //Suspend compaction while we are doing the clear. This
+    //aborts the currently in progress compaction.
+    getCompactor().suspend();
+    
+    // while compaction is suspended, clear method marks hoplogs for deletion
+    // only. Files will be removed by cleanup thread after active gets and
+    // iterations are completed
+    String user = logger.isDebugEnabled() ? "clear" : null;
+    List<TrackedReference<Hoplog>> oplogs = null;
+    try {
+      oplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+      markSortedOplogForDeletion(oplogs, true);
+    } finally {
+      if (oplogs != null) {
+        hoplogReadersController.releaseHoplogs(oplogs, user);
+      }
+      //Resume compaction
+      getCompactor().resume();
+    }
+  }
+
+  /**
+   * Performs the following activities
+   * <UL>
+   * <LI>Submits compaction requests as needed
+   * <LI>Deletes tmp files which the system failed to removed earlier
+   */
+  @Override
+  public void performMaintenance() throws IOException {
+    long startTime = System.currentTimeMillis();
+    
+    if (logger.isDebugEnabled())
+      logger.debug("{}Executing bucket maintenance", logPrefix);
+
+    submitCompactionRequests();
+    hoplogReadersController.closeInactiveHoplogs();
+    initiateCleanup();
+    
+    cleanupTmpFiles();
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Time spent in bucket maintenance (in ms): "
+          + (System.currentTimeMillis() - startTime), logPrefix);
+    }
+  }
+
+  @Override
+  public Future<CompactionStatus> forceCompaction(boolean isMajor) {
+    CompactionRequest request = new CompactionRequest(regionFolder, bucketId,
+        getCompactor(), isMajor, true/*force*/);
+    return HDFSCompactionManager.getInstance(store).submitRequest(request);
+  }
+
+  private Future<CompactionStatus> forceCompactionOnVersionUpgrade() {
+    CompactionRequest request = new CompactionRequest(regionFolder, bucketId, getCompactor(), true, true, true);
+    return HDFSCompactionManager.getInstance(store).submitRequest(request);
+  }
+
+  @Override
+  public long getLastMajorCompactionTimestamp() {
+    long ts = 0;
+    String user = logger.isDebugEnabled() ? "StoredProc" : null;
+    List<TrackedReference<Hoplog>> hoplogs = hoplogReadersController.getTrackedSortedOplogList(user);
+    try {
+      for (TrackedReference<Hoplog> hoplog : hoplogs) {
+        String fileName = hoplog.get().getFileName();
+        Matcher file = HOPLOG_NAME_PATTERN.matcher(fileName);
+        if (file.matches() && fileName.endsWith(MAJOR_HOPLOG_EXTENSION)) {
+          ts = getHoplogTimestamp(file);
+          break;
+        }
+      }
+    } finally {
+      hoplogReadersController.releaseHoplogs(hoplogs, user);
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}HDFS: for bucket:"+getRegionBucketStr()+" returning last major compaction timestamp "+ts, logPrefix);
+    }
+    return ts;
+  }
+
+  private void initOldTmpFiles() throws IOException {
+    FileSystem fs = store.getFileSystem();
+    if (! fs.exists(bucketPath)) {
+      return;
+    }
+    
+    oldTmpFiles = new LinkedList<FileStatus>(Arrays.asList(fs.listStatus(bucketPath, new TmpFilePathFilter())));
+  }
+  
+  private void cleanupTmpFiles() throws IOException {
+    if(oldTmpFiles == null && tmpFiles == null) {
+      return;
+    }
+    
+    if (oldTmpFiles != null) {
+      FileSystem fs = store.getFileSystem();
+      long now = System.currentTimeMillis();
+      for (Iterator<FileStatus> itr = oldTmpFiles.iterator(); itr.hasNext();) {
+        FileStatus file = itr.next();
+        if(file.getModificationTime() + TMP_FILE_EXPIRATION_TIME_MS > now) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Deleting temporary file:" + file.getPath(), logPrefix);
+          }
+          fs.delete(file.getPath(), false);
+          itr.remove();
+        }
+      }
+    }
+    if (tmpFiles != null) {
+      for (Hoplog so : tmpFiles.keySet()) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Deleting temporary file:" + so.getFileName(), logPrefix);
+        }
+        deleteTmpFile(null, so);
+      }
+    }
+  }
+  
+  /**
+   * Executes tiered compaction of hoplog files. One instance of compacter per bucket will exist
+   */
+  protected class HoplogCompactor implements Compactor {
+    private volatile boolean suspend = false;
+    
+    // the following boolean will be used to synchronize minor compaction
+    private AtomicBoolean isMinorCompactionActive = new AtomicBoolean(false);
+    // the following boolean will be used to synchronize major compaction
+    private AtomicBoolean isMajorCompactionActive = new AtomicBoolean(false);
+    // the following integer tracks the max sequence number amongst the
+    // target files being major compacted. This value will be used to prevent
+    // concurrent MajorC and minorC. MinorC is preempted in case of an
+    // overlap. This object is also used as a lock. The lock is acquired before
+    // identifying compaction targets and before marking targets for expiry
+    final AtomicInteger maxMajorCSeqNum = new AtomicInteger(-1);
+
+    @Override
+    public void suspend() {
+      long wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+      this.suspend=true;
+      //this forces the compact method to finish.
+      while (isMajorCompactionActive.get() || isMinorCompactionActive.get()) {
+        if (wait < 0) {
+          wait = Long.getLong(HoplogConfig.SUSPEND_MAX_WAIT_MS, HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+          String act = isMajorCompactionActive.get() ? "MajorC" : "MinorC";
+          logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_SUSPEND_OF_0_FAILED_IN_1, new Object[] {act, wait}));
+          break;
+        }
+        try {
+          TimeUnit.MILLISECONDS.sleep(50);
+          wait -= 50;
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+    }
+    
+    @Override
+    public void resume() {
+      this.suspend = false;
+    }
+    
+    @Override
+    public boolean isBusy(boolean isMajor) {
+      if (isMajor) {
+        return isMajorCompactionActive.get();
+      } else {
+        return isMinorCompactionActive.get();
+      }
+    }
+    
+    /**
+     * compacts hoplogs. The method takes a minor or major compaction "lock" to
+     * prevent concurrent execution of compaction cycles. A possible improvement
+     * is to allow parallel execution of minor compaction if the sets of
+     * hoplogs being compacted are disjoint.
+     */
+    @Override
+    public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+      if(suspend) {
+        return false;
+      }
+
+      String extension = null;
+      IOOperation compactionStats = null;
+      long startTime = 0; 
+      final AtomicBoolean lock;
+      Hoplog compactedHoplog = null;
+      List<TrackedReference<Hoplog>> targets = null;
+      String user = logger.isDebugEnabled() ? (isMajor ? "MajorC" : "MinorC") : null;
+      
+      if (isMajor) {
+        lock = isMajorCompactionActive;
+        extension = MAJOR_HOPLOG_EXTENSION;
+        compactionStats = stats.getMajorCompaction();
+      } else {
+        lock = isMinorCompactionActive;
+        extension = MINOR_HOPLOG_EXTENSION;
+        compactionStats = stats.getMinorCompaction();
+      }
+
+      // final check before beginning compaction. Return if compaction is active
+      if (! lock.compareAndSet(false, true)) {
+        if (isMajor) {
+          if (logger.isDebugEnabled())
+            logger.debug("{}Major compaction already active. Ignoring new request", logPrefix);
+        } else {
+          if (logger.isDebugEnabled())
+            logger.debug("Minor compaction already active. Ignoring new request", logPrefix);
+        }
+        return false;
+      }
+      
+      try {
+        if(suspend) {
+          return false;
+        }
+        
+        // variables for updating stats
+        startTime = compactionStats.begin();
+        
+        int seqNum = -1;
+        int lastKnownMajorCSeqNum;
+        synchronized (maxMajorCSeqNum) {
+          lastKnownMajorCSeqNum = maxMajorCSeqNum.get();
+          targets = hoplogReadersController.getTrackedSortedOplogList(user);
+          getCompactionTargets(isMajor, targets, lastKnownMajorCSeqNum);
+          if (targets != null && targets.size() > 0) {
+            targets = Collections.unmodifiableList(targets);
+            seqNum = getSequenceNumber(targets.get(0).get());
+            if (isMajor) {
+              maxMajorCSeqNum.set(seqNum);
+            }
+          }
+        }
+        
+        if (targets == null || targets.isEmpty() || (!isMajor && targets.size() == 1 && !isForced)) {
+          if (logger.isDebugEnabled()){
+            logger.debug("{}Skipping compaction, too few hoplops to compact. Major?" + isMajor, logPrefix);
+          }
+            
+          compactionStats.end(0, startTime);
+          return true;
+        }
+        
+        //In case that we only have one major compacted file, we don't need to run major compaction to
+        //generate a copy of the same content
+        if (targets.size() == 1 && !isForced) {
+        String hoplogName = targets.get(0).get().getFileName();
+          if (hoplogName.endsWith(MAJOR_HOPLOG_EXTENSION)){
+            if (logger.isDebugEnabled()){
+              logger.debug("{}Skipping compaction, no need to compact a major compacted file. Major?" + isMajor, logPrefix);
+            }
+            compactionStats.end(0, startTime);
+            return true;
+          }
+        }
+        
+        if (logger.isDebugEnabled()) {
+          for (TrackedReference<Hoplog> target : targets) {
+            if (logger.isDebugEnabled()) {
+              fineLog("Target:", target, " size:", target.get().getSize());
+            }
+          }
+        }
+        
+        // Create a temporary hoplog for compacted hoplog. The compacted hoplog
+        // will have the seq number same as that of youngest target file. Any
+        // hoplog younger than target hoplogs will have a higher sequence number
+        compactedHoplog = getTmpSortedOplog(seqNum, extension);
+        
+        long byteCount;
+        try {
+          byteCount = fillCompactionHoplog(isMajor, targets, compactedHoplog, lastKnownMajorCSeqNum);
+          compactionStats.end(byteCount, startTime);
+        } catch (InterruptedException e) {
+          if (logger.isDebugEnabled())
+            logger.debug("{}Compaction execution suspended", logPrefix);
+          compactionStats.error(startTime);
+          return false;
+        } catch (ForceReattemptException e) {
+          if (logger.isDebugEnabled())
+            logger.debug("{}Compaction execution suspended", logPrefix);
+          compactionStats.error(startTime);
+          return false;
+        }
+        
+        // creation of compacted hoplog completed, its time to use it for
+        // reading. Before using it, make sure minorC and mojorC were not
+        // executing on overlapping sets of files. All targets can be marked for
+        // expiration. Notify listener if configured. Update bucket size
+        synchronized (maxMajorCSeqNum) {
+          if (!isMajor && isMinorMajorOverlap(targets, maxMajorCSeqNum.get())) {
+            // MajorC is higher priority. In case of any overlap kill minorC
+            if (logger.isDebugEnabled())
+              logger.debug("{}Interrupting MinorC for a concurrent MajorC", logPrefix);
+            compactionStats.error(startTime);
+            return false;
+          }
+          addSortedOplog(compactedHoplog, true, false);
+          markSortedOplogForDeletion(targets, true);
+        }
+      } catch (IOException e) {
+        compactionStats.error(startTime);
+        throw e;
+      } finally {
+        if (isMajor) {
+          maxMajorCSeqNum.set(-1);
+        }
+        lock.set(false);
+        hoplogReadersController.releaseHoplogs(targets, user);
+      }
+      
+      incrementDiskUsage(compactedHoplog.getSize());
+      reEstimateBucketSize();
+      
+      notifyCompactionListeners(isMajor);
+      return true;
+    }
+
+    /**
+     * Major compaction compacts all files. Seq number of the youngest file
+     * being MajorCed is known. If MinorC is operating on any file with a seq
+     * number less than this number, there is a overlap
+     * @param num 
+     */
+    boolean isMinorMajorOverlap(List<TrackedReference<Hoplog>> targets, int num) {
+      if (num < 0 || targets == null || targets.isEmpty()) {
+        return false;
+      }
+
+      for (TrackedReference<Hoplog> hop : targets) {
+        if (getSequenceNumber(hop.get()) <= num) {
+          return true;
+        }
+      }
+      
+      return false;
+    }
+
+    /**
+     * Iterates over targets and writes eligible targets to the output hoplog.
+     * Handles creation of iterators and writer and closing it in case of
+     * errors.
+     */
+    public long fillCompactionHoplog(boolean isMajor,
+        List<TrackedReference<Hoplog>> targets, Hoplog output, int majorCSeqNum)
+        throws IOException, InterruptedException, ForceReattemptException {
+
+      HoplogWriter writer = null;
+      ICardinality localHLL = new HyperLogLog(HLL_CONSTANT);
+      HoplogSetIterator mergedIter = null;
+      int byteCount = 0;
+      
+      try {
+        // create a merged iterator over the targets and write entries into
+        // output hoplog
+        mergedIter = new HoplogSetIterator(targets);
+        writer = output.createWriter(mergedIter.getRemainingEntryCount());
+
+        boolean interrupted = false;
+        for (; mergedIter.hasNext(); ) {
+          if (suspend) {
+            interrupted = true;
+            break;
+          } else if (!isMajor &&  maxMajorCSeqNum.get() > majorCSeqNum) {
+            // A new major compaction cycle is starting, quit minorC to avoid
+            // duplicate work and missing deletes
+            if (logger.isDebugEnabled())
+              logger.debug("{}Preempting MinorC, new MajorC cycle detected ", logPrefix);
+            interrupted = true;
+            break;
+          }
+
+          mergedIter.nextBB();
+          
+          ByteBuffer k = mergedIter.getKeyBB();
+          ByteBuffer v = mergedIter.getValueBB();
+          
+          boolean isDeletedEntry = isDeletedEntry(v.array(), v.arrayOffset());
+          if (isMajor && isDeletedEntry) {
+            // its major compaction, time to ignore deleted entries
+            continue;
+          }
+
+          if (!isDeletedEntry) {
+            int hash = MurmurHash.hash(k.array(), k.arrayOffset(), k.remaining(), -1);
+            localHLL.offerHashed(hash);
+          }
+
+          writer.append(k, v);
+          byteCount += (k.remaining() + v.remaining());
+        }
+
+        mergedIter.close();
+        mergedIter = null;
+
+        writer.close(buildMetaData(localHLL));
+        writer = null;
+
+        if (interrupted) {
+          // If we suspended compaction operations, delete the partially written
+          // file and return.
+          output.delete();
+          throw new InterruptedException();
+        }
+        
+        // ping secondaries before making the file a legitimate file to ensure 
+        // that in case of split brain, no other vm has taken up as primary. #50110. 
+        pingSecondaries();
+        
+        makeLegitimate(output);
+        return byteCount;
+      } catch (IOException e) {
+        e = handleWriteHdfsIOError(writer, output, e);
+        writer = null;
+        throw e;
+      } catch (ForceReattemptException e) {
+        output.delete();
+        throw e;
+      }finally {
+        if (mergedIter != null) {
+          mergedIter.close();
+        }
+
+        if (writer != null) {
+          writer.close();
+        }
+      }
+    }
+
+    /**
+     * identifies targets. For major compaction all sorted oplogs will be
+     * iterated on. For minor compaction, policy driven fewer targets will take
+     * place.
+     */
+    protected void getCompactionTargets(boolean major,
+        List<TrackedReference<Hoplog>> targets, int majorCSeqNum) {
+      if (!major) {
+        getMinorCompactionTargets(targets, majorCSeqNum);
+      }
+    }
+
+    /**
+     * list of oplogs most suitable for compaction. The alogrithm selects m
+     * smallest oplogs which are not bigger than X in size. Null if valid
+     * candidates are not found
+     */
+    void getMinorCompactionTargets(List<TrackedReference<Hoplog>> targets, int majorCSeqNum) 
+    {
+      List<TrackedReference<Hoplog>> omittedHoplogs = new ArrayList<TrackedReference<Hoplog>>();
+
+      // reverse the order of hoplogs in list. the oldest file becomes the first file.
+      Collections.reverse(targets);
+
+      // hoplog greater than this size will not be minor-compacted
+      final long MAX_COMPACTION_FILE_SIZE;
+      // maximum number of files to be included in any compaction cycle
+      final int MAX_FILE_COUNT_COMPACTION;
+      // minimum number of files that must be present for compaction to be worth
+      final int MIN_FILE_COUNT_COMPACTION;
+      
+      MAX_COMPACTION_FILE_SIZE = ((long)store.getInputFileSizeMax()) * 1024 *1024;
+      MAX_FILE_COUNT_COMPACTION = store.getInputFileCountMax();
+      MIN_FILE_COUNT_COMPACTION = store.getInputFileCountMin();
+
+      try {
+        // skip till first file smaller than the max compaction file size is
+        // found. And if MajorC is active, move to a file which is also outside
+        // scope of MajorC
+        for (Iterator<TrackedReference<Hoplog>> iterator = targets.iterator(); iterator.hasNext();) {
+          TrackedReference<Hoplog> oplog = iterator.next();
+          if (majorCSeqNum >= getSequenceNumber(oplog.get())) {
+            iterator.remove();
+            omittedHoplogs.add(oplog);
+            if (logger.isDebugEnabled()){
+              fineLog("Overlap with MajorC, excluding hoplog " + oplog.get());
+            }
+            continue;
+          }
+          
+          if (oplog.get().getSize() > MAX_COMPACTION_FILE_SIZE || oplog.get().getFileName().endsWith(MAJOR_HOPLOG_EXTENSION)) {
+          // big file will not be included for minor compaction
+          // major compacted file will not be converted to minor compacted file
+            iterator.remove();
+            omittedHoplogs.add(oplog);
+            if (logger.isDebugEnabled()) {
+              fineLog("Excluding big hoplog from minor cycle:",
+                  oplog.get(), " size:", oplog.get().getSize(), " limit:",
+                  MAX_COMPACTION_FILE_SIZE);
+            }
+          } else {
+            // first small hoplog found, skip the loop
+            break;
+          }
+        }
+
+        // If there are too few files no need to perform compaction
+        if (targets.size() < MIN_FILE_COUNT_COMPACTION) {
+          if (logger.isDebugEnabled()){
+            logger.debug("{}Too few hoplogs for minor cycle:" + targets.size(), logPrefix);
+          }
+          omittedHoplogs.addAll(targets);
+          targets.clear();
+          return;
+        }
+        
+        float maxGain = Float.MIN_VALUE;
+        int bestFrom = -1; 
+        int bestTo = -1; 
+        
+        // for listSize=5 list, minFile=3; maxIndex=5-3. 
+        // so from takes values 0,1,2
+        int maxIndexForFrom = targets.size() - MIN_FILE_COUNT_COMPACTION;
+        for (int from = 0; from <= maxIndexForFrom ; from++) {
+          // for listSize=6 list, minFile=3, maxFile=5; minTo=0+3-1, maxTo=0+5-1
+          // so to takes values 2,3,4
+          int minIndexForTo = from + MIN_FILE_COUNT_COMPACTION - 1;
+          int maxIndexForTo = Math.min(from + MAX_FILE_COUNT_COMPACTION, targets.size());
+          
+          for (int i = minIndexForTo; i < maxIndexForTo; i++) {
+            Float gain = computeGain(from, i, targets);
+            if (gain == null) {
+              continue;
+            }
+            
+            if (gain > maxGain) {
+              maxGain = gain;
+              bestFrom = from;
+              bestTo = i;
+            }
+          }
+        }
+        
+        if (bestFrom == -1) {
+          if (logger.isDebugEnabled())
+            logger.debug("{}Failed to find optimal target set for MinorC", logPrefix);
+          omittedHoplogs.addAll(targets);
+          targets.clear();
+          return;
+        }
+
+        if (logger.isDebugEnabled()) {
+          fineLog("MinorCTarget optimal result from:", bestFrom, " to:", bestTo);
+        }
+
+        // remove hoplogs they do not fall in the bestFrom-bestTo range
+        int i = 0;
+        for (Iterator<TrackedReference<Hoplog>> iter = targets.iterator(); iter.hasNext();) {
+          TrackedReference<Hoplog> hop = iter.next();
+          if (i < bestFrom || i > bestTo) {
+            iter.remove();
+            omittedHoplogs.add(hop);
+          }
+          i++;
+        }
+      } finally {
+        // release readers of targets not included in the compaction cycle 
+        String user = logger.isDebugEnabled() ? "MinorC" : null;
+        hoplogReadersController.releaseHoplogs(omittedHoplogs, user);
+      }
+      
+      // restore the order, youngest file is the first file again
+      Collections.reverse(targets);
+    }
+
+    @Override
+    public HDFSStore getHdfsStore() {
+      return store;
+    }
+  }
+  
+  Float computeGain(int from, int to, List<TrackedReference<Hoplog>> targets) {
+    double SIZE_64K = 64.0 * 1024;
+    // TODO the base for log should depend on the average number of keys a index block will contain
+    double LOG_BASE = Math.log(AVG_NUM_KEYS_PER_INDEX_BLOCK);
+    
+    long totalSize = 0;
+    double costBefore = 0f;
+    for (int i = from; i <= to; i++) {
+      long size = targets.get(i).get().getSize();
+      if (size == 0) {
+        continue;
+      }
+      totalSize += size;
+      
+      // For each hoplog file, read cost is number of index block reads and 1
+      // data block read. Index blocks on an average contain N keys and are
+      // organized in a N-ary tree structure. Hence the number of index block
+      // reads will be logBaseN(number of data blocks)
+      costBefore += Math.ceil(Math.max(1.0, Math.log(size / SIZE_64K) / LOG_BASE)) + 1;
+    }
+    
+    // if the first file is relatively too large this set is bad for compaction
+    long firstFileSize = targets.get(from).get().getSize();
+    if (firstFileSize > (totalSize - firstFileSize) * RATIO) {
+      if (logger.isDebugEnabled()){
+        fineLog("First file too big:", firstFileSize, " totalSize:", totalSize);
+      }
+      return null;
+    }
+        
+    // compute size in mb so that the value of gain is in few decimals
+    long totalSizeInMb = totalSize / 1024 / 1024;
+    if (totalSizeInMb == 0) {
+      // the files are tooooo small, just return the count. The more we compact
+      // the better it is
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Total size too small:" +totalSize, logPrefix);
+      }
+      return (float) costBefore;
+    }
+    
+    double costAfter = Math.ceil(Math.log(totalSize / SIZE_64K) / LOG_BASE) + 1;
+    return (float) ((costBefore - costAfter) / totalSizeInMb);
+  }
+  
+  /**
+   * Hoplog readers are accessed asynchronously. There could be a window in
+   * which, while a hoplog is being iterated on, it gets compacted and becomes
+   * expired or inactive. The reader of the hoplog must not be closed till the
+   * iterator completes. All such scenarios will be managed by this class. It
+   * will keep all the reader, active and inactive, and reference counter to the
+   * readers. An inactive reader will be closed if the reference count goes down
+   * to 0.
+   * 
+   * One important point, only compaction process makes a hoplog inactive.
+   * Compaction process in a bucket is single threaded. So compaction itself
+   * will not face race condition. Read and scan operations on the bucket will
+   * be affected. So reference counter is incremented for each read and scan.
+   * 
+   */
+  private class HoplogReadersController implements HoplogReaderActivityListener {
+    private Integer maxOpenFilesLimit;
+
+    // sorted collection of all the active oplog files associated with this bucket. Instead of a
+    // queue, a set is used. New files created as part of compaction may be inserted after a few
+    // hoplogs were created. The compacted file is such a case but should not be treated newest.
+    private final ConcurrentSkipListSet<TrackedReference<Hoplog>> hoplogs;
+    
+    // list of all the hoplogs that have been compacted and need to be closed
+    // once the reference count reduces to 0
+    private final ConcurrentHashSet<TrackedReference<Hoplog>> inactiveHoplogs;
+    
+    // ReadWriteLock on list of oplogs to allow for consistent reads and scans
+    // while hoplog set changes. A write lock is needed on completion of
+    // compaction, addition of a new hoplog or on receiving updates message from
+    // other GF nodes
+    private final ReadWriteLock hoplogRWLock = new ReentrantReadWriteLock(true);
+
+    // tracks the number of active readers for hoplogs of this bucket
+    private AtomicInteger activeReaderCount = new AtomicInteger(0);
+    
+    public HoplogReadersController() {
+      HoplogComparator comp = new HoplogComparator();
+      hoplogs = new ConcurrentSkipListSet<TrackedReference<Hoplog>>(comp) {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public boolean add(TrackedReference<Hoplog> e) {
+          // increment number of hoplogs active for this bucket
+          boolean result =  super.add(e);
+          if (result)
+            stats.incActiveFiles(1);
+          return result;
+        }
+        
+        @Override
+        public boolean remove(Object o) {
+          // decrement the number of hoplogs active for this bucket
+          boolean result =  super.remove(o);
+          if (result)
+            stats.incActiveFiles(-1);
+          return result;
+        }
+      };
+      
+      inactiveHoplogs = new ConcurrentHashSet<TrackedReference<Hoplog>>() {
+        private static final long serialVersionUID = 1L;
+        
+        @Override
+        public boolean add(TrackedReference<Hoplog> e) {
+          boolean result =  super.add(e);
+          if (result)
+            stats.incInactiveFiles(1);
+          return result;
+        }
+        
+        @Override
+        public boolean remove(Object o) {
+          boolean result =  super.remove(o);
+          if (result)
+            stats.incInactiveFiles(-1);
+          return result;
+        }
+      };
+      
+      maxOpenFilesLimit = Integer.getInteger(
+          HoplogConfig.BUCKET_MAX_OPEN_HFILES_CONF,
+          HoplogConfig.BUCKET_MAX_OPEN_HFILES_DEFAULT);
+    }
+    
+    Hoplog getOldestHoplog() {
+      if (hoplogs.isEmpty()) {
+        return null;
+      }
+      return hoplogs.last().get();
+    }
+
+    /**
+     * locks sorted oplogs collection and performs add operation
+     * @return if addition was successful
+     */
+    private boolean addSortedOplog(Hoplog so) throws IOException {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Try add " + so, logPrefix);
+      }
+      hoplogRWLock.writeLock().lock();
+      try {
+        int size = hoplogs.size();
+        boolean result = hoplogs.add(new TrackedReference<Hoplog>(so));
+        so.setReaderActivityListener(this);
+        if (logger.isDebugEnabled()){
+          fineLog("Added: ", so, " Before:", size, " After:", hoplogs.size());
+        }
+        return result;
+      } finally {
+        hoplogRWLock.writeLock().unlock();
+      }
+    }
+    
+    /**
+     * locks sorted oplogs collection and performs remove operation and updates readers also
+     */
+    private void removeSortedOplog(TrackedReference<Hoplog> so) throws IOException {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Try remove " + so, logPrefix);
+      }
+      hoplogRWLock.writeLock().lock();
+      try {
+        int size = hoplogs.size();
+        boolean result = hoplogs.remove(so);
+        if (result) {
+          inactiveHoplogs.add(so);
+          if (logger.isDebugEnabled()) {
+            fineLog("Removed: ", so, " Before:", size, " After:", hoplogs.size());
+          }
+        } else {
+          if (inactiveHoplogs.contains(so)) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("{}Found a missing active hoplog in inactive list." + so, logPrefix);
+            }
+          } else {
+            so.get().close();
+            logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_MISSING_IN_BUCKET_FORCED_CLOSED, so.get()));
+          }
+        }
+      } finally {
+        hoplogRWLock.writeLock().unlock();
+      }
+    }
+    
+    private  void closeInactiveHoplogs() throws IOException {
+      hoplogRWLock.writeLock().lock();
+      try {
+        for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
+          if (logger.isDebugEnabled()){
+            logger.debug("{}Try close inactive " + hoplog, logPrefix);
+          }
+
+          if (!hoplog.inUse()) {
+            int size = inactiveHoplogs.size();            
+            inactiveHoplogs.remove(hoplog);
+            closeReaderAndSuppressError(hoplog.get(), true);
+            if (logger.isDebugEnabled()){
+              fineLog("Closed inactive: ", hoplog.get(), " Before:", size,
+                  " After:", inactiveHoplogs.size());
+            }
+          }
+        }
+      } finally {
+        hoplogRWLock.writeLock().unlock();
+      }
+    }
+    
+    /**
+     * @param target
+     *          name of the hoplog file
+     * @return trackedReference if target exists in inactive hoplog list.
+     * @throws IOException
+     */
+    TrackedReference<Hoplog> getInactiveHoplog(String target) throws IOException {
+      hoplogRWLock.writeLock().lock();
+      try {
+        for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
+          if (hoplog.get().getFileName().equals(target)) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("{}Target found in inactive hoplogs list: " + hoplog, logPrefix);
+            }
+            return hoplog;
+          }
+        }
+        if (logger.isDebugEnabled()){
+          logger.debug("{}Target not found in inactive hoplogs list: " + target, logPrefix);
+        }
+        return null;
+      } finally {
+        hoplogRWLock.writeLock().unlock();
+      }
+    }
+    
+    /**
+     * force closes all readers
+     */
+    public void close() throws IOException {
+      hoplogRWLock.writeLock().lock();
+      try {
+        for (TrackedReference<Hoplog> hoplog : hoplogs) {
+          closeReaderAndSuppressError(hoplog.get(), true);
+        }
+        
+        for (TrackedReference<Hoplog> hoplog : inactiveHoplogs) {
+          closeReaderAndSuppressError(hoplog.get(), true);
+        }
+      } finally {
+        hoplogs.clear();
+        inactiveHoplogs.clear();
+        hoplogRWLock.writeLock().unlock();
+      }
+    }
+    
+    /**
+     * locks hoplogs to create a snapshot of active hoplogs. reference of each
+     * reader is incremented to keep it from getting closed
+     * 
+     * @return ordered list of sorted oplogs
+     */
+    private List<TrackedReference<Hoplog>> getTrackedSortedOplogList(String user) {
+      List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
+      hoplogRWLock.readLock().lock();
+      try {
+        for (TrackedReference<Hoplog> oplog : hoplogs) {
+          oplog.increment(user);
+          oplogs.add(oplog);
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Track ref " + oplog, logPrefix);
+          }
+        }
+      } finally {
+        hoplogRWLock.readLock().unlock();
+      }
+      return oplogs;
+    }
+
+    private TrackedReference<Hoplog> trackHoplog(Hoplog hoplog, String user) {
+      hoplogRWLock.readLock().lock();
+      try {
+        for (TrackedReference<Hoplog> oplog : hoplogs) {
+          if (oplog.get().getFileName().equals(hoplog.getFileName())) {
+            oplog.increment(user);
+            if (logger.isDebugEnabled()) {
+              logger.debug("{}Track " + oplog, logPrefix);
+            }
+            return oplog;
+          }
+        }
+      } finally {
+        hoplogRWLock.readLock().unlock();
+      }
+      throw new NoSuchElementException(hoplog.getFileName());
+    }
+    
+    public void releaseHoplogs(List<TrackedReference<Hoplog>> targets, String user) {
+      if (targets == null) {
+        return;
+      }
+      
+      for (int i = targets.size() - 1; i >= 0; i--) {
+        TrackedReference<Hoplog> hoplog = targets.get(i);
+        releaseHoplog(hoplog, user);
+      }
+    }
+
+    public void releaseHoplog(TrackedReference<Hoplog> target, String user) {
+      if (target ==  null) {
+        return;
+      }
+      
+      target.decrement(user);
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Try release " + target, logPrefix);
+      }
+      if (target.inUse()) {
+        return;
+      }
+      
+      // there are no users of this hoplog. if it is inactive close it.
+      hoplogRWLock.writeLock().lock();
+      try {
+        if (!target.inUse()) {
+          if (inactiveHoplogs.contains(target) ) {
+            int sizeBefore = inactiveHoplogs.size();
+            inactiveHoplogs.remove(target);
+            closeReaderAndSuppressError(target.get(), true);
+            if (logger.isDebugEnabled()) {
+              fineLog("Closed inactive: ", target, " totalBefore:", sizeBefore,
+                  " totalAfter:", inactiveHoplogs.size());
+            }
+          } else if (hoplogs.contains(target)) {
+            closeExcessReaders();              
+          }
+        }
+      } catch (IOException e) {
+        logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR, 
+            "Close reader: " + target.get().getFileName()), e);
+      } finally {
+        hoplogRWLock.writeLock().unlock();
+      }
+    }
+
+    /*
+     * detects if the total number of open hdfs readers is more than configured
+     * max file limit. In case the limit is exceeded, some readers need to be
+     * closed to avoid dadanode receiver overflow error.
+     */
+    private void closeExcessReaders() throws IOException {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Close excess readers. Size:" + hoplogs.size()
+            + " activeReaders:" + activeReaderCount.get() + " limit:"
+            + maxOpenFilesLimit, logPrefix);
+      }
+
+      if (hoplogs.size() <= maxOpenFilesLimit) {
+        return;
+      }
+      
+      if (activeReaderCount.get() <= maxOpenFilesLimit) {
+        return;
+      }
+      
+      for (TrackedReference<Hoplog> hoplog : hoplogs.descendingSet()) {
+        if (!hoplog.inUse() && !hoplog.get().isClosed()) {
+          hoplog.get().close(false);
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Excess reader closed " + hoplog, logPrefix);
+          }
+        }
+        
+        if (activeReaderCount.get() <= maxOpenFilesLimit) {
+          return;
+        }
+      }
+    }
+
+    @Override
+    public void readerCreated() {
+      activeReaderCount.incrementAndGet();
+      stats.incActiveReaders(1);
+      if (logger.isDebugEnabled())
+        logger.debug("{}ActiveReader++", logPrefix);
+    }
+
+    @Override
+    public void readerClosed() {
+      activeReaderCount.decrementAndGet(); 
+      stats.incActiveReaders(-1);
+      if (logger.isDebugEnabled())
+        logger.debug("{}ActiveReader--", logPrefix);
+    }
+  }
+
+  /**
+   * returns an ordered list of oplogs, FOR TESTING ONLY
+   */
+  public List<TrackedReference<Hoplog>> getSortedOplogs() throws IOException {
+    List<TrackedReference<Hoplog>> oplogs = new ArrayList<TrackedReference<Hoplog>>();
+    for (TrackedReference<Hoplog> oplog : hoplogReadersController.hoplogs) {
+        oplogs.add(oplog);
+    }
+    return oplogs;
+  }
+
+  /**
+   * Merged iterator on a list of hoplogs. 
+   */
+  public class BucketIterator implements HoplogIterator<byte[], SortedHoplogPersistedEvent> {
+    // list of hoplogs to be iterated on.
+    final List<TrackedReference<Hoplog>> hoplogList;
+    HoplogSetIterator mergedIter;
+
+    public BucketIterator(List<TrackedReference<Hoplog>> hoplogs) throws IOException {
+      this.hoplogList = hoplogs;
+      try {
+        mergedIter = new HoplogSetIterator(this.hoplogList);
+        if (logger.isDebugEnabled()) {
+          for (TrackedReference<Hoplog> hoplog : hoplogs) {
+            logger.debug("{}BucketIter target hop:" + hoplog.get().getFileName(), logPrefix);
+          }
+        }
+      } catch (IllegalArgumentException e) {
+        if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+          throw handleIOError((IOException) e.getCause());
+        } else {
+          throw e;
+        }
+      } catch (IOException e) {
+        throw handleIOError(e);
+      } catch (HDFSIOException e) {
+        throw handleIOError(e);
+      } 
+    }
+
+    @Override
+    public boolean hasNext() {
+      return mergedIter.hasNext();
+    }
+
+    @Override
+    public byte[] next() throws IOException {
+      try {
+        return HFileSortedOplog.byteBufferToArray(mergedIter.next());
+      } catch (IllegalArgumentException e) {
+        if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+          throw handleIOError((IOException) e.getCause());
+        } else {
+          throw e;
+        }
+      } catch (IOException e) {
+        throw handleIOError(e);
+      }  
+    }
+
+    @Override
+    public byte[] getKey() {
+      // merged iterator returns a byte[]. This needs to be deserialized to the object which was
+      // provided during flush operation
+      return HFileSortedOplog.byteBufferToArray(mergedIter.getKey());
+    }
+
+    @Override
+    public SortedHoplogPersistedEvent getValue() {
+      // merged iterator returns a byte[]. This needs to be deserialized to the
+      // object which was provided during flush operation
+      try {
+        return deserializeValue(HFileSortedOplog.byteBufferToArray(mergedIter.getValue()));
+      } catch (IOException e) {
+        throw new HDFSIOException("Failed to deserialize byte while iterating on partition", e);
+      }
+    }
+
+    @Override
+    public void remove() {
+      mergedIter.remove();
+    }
+
+    @Override
+    public void close() {
+      // TODO release the closed iterators early
+      String user = logger.isDebugEnabled() ? "Scan" : null;
+      hoplogReadersController.releaseHoplogs(hoplogList, user);
+    }
+  }
+  
+  /**
+   * This utility class is used to filter temporary hoplogs in a bucket
+   * directory
+   * 
+   */
+  private static class TmpFilePathFilter implements PathFilter {
+    @Override
+    public boolean accept(Path path) {
+      Matcher matcher = HOPLOG_NAME_PATTERN.matcher(path.getName());
+      if (matcher.matches() && path.getName().endsWith(TEMP_HOPLOG_EXTENSION)) {
+        return true;
+      }
+      return false;
+    }
+  }
+
+  private void fineLog(Object... strings) {
+    if (logger.isDebugEnabled()) {
+      StringBuffer sb = concatString(strings);
+      logger.debug(logPrefix + sb.toString());
+    }
+  }
+
+  private StringBuffer concatString(Object... strings) {
+    StringBuffer sb = new StringBuffer();
+    for (Object str : strings) {
+      sb.append(str.toString());
+    }
+    return sb;
+  }
+
+  @Override
+  public void compactionCompleted(String region, int bucket, boolean isMajor) {
+    // do nothing for compaction events. Hoplog Organizer depends on addition
+    // and deletion of hoplogs only
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
new file mode 100644
index 0000000..e622749
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/Hoplog.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import com.gemstone.gemfire.internal.hll.ICardinality;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumMap;
+
+
+/**
+ * Ordered sequence file
+ */
+public interface Hoplog extends Closeable, Comparable<Hoplog>  {
+  public static final boolean NOP_WRITE = Boolean.getBoolean("Hoplog.NOP_WRITE");
+  
+  /** the gemfire magic number for sorted oplogs */
+  public static final byte[] MAGIC = new byte[] { 0x47, 0x53, 0x4F, 0x50 };
+
+  /**
+   * @return an instance of cached reader, creates one if does not exist
+   * @throws IOException
+   */
+  HoplogReader getReader() throws IOException;
+
+  /**
+   * Creates a new sorted writer.
+   * 
+   * @param keys
+   *          an estimate of the number of keys to be written
+   * @return the writer
+   * @throws IOException
+   *           error creating writer
+   */
+  HoplogWriter createWriter(int keys) throws IOException;
+
+  /**
+   * @param listener listener of reader's activity
+   */
+  void setReaderActivityListener(HoplogReaderActivityListener listener);
+  
+  /**
+   * @return file name
+   */
+  String getFileName();
+
+  /**
+   * @return Entry count estimate for this hoplog
+   */
+  public ICardinality getEntryCountEstimate() throws IOException;
+
+  /**
+   * renames the file to the input name
+   * 
+   * @throws IOException
+   */
+  void rename(String name) throws IOException;
+
+  /**
+   * Deletes the sorted oplog file
+   */
+  void delete() throws IOException;
+  
+  /**
+   * Returns true if the hoplog is closed for reads.
+   * @return true if closed
+   */
+  boolean isClosed();
+  
+  /**
+   * @param clearCache clear this sorted oplog's cache if true
+   * @throws IOException 
+   */
+  void close(boolean clearCache) throws IOException;
+  
+  /**
+   * @return the modification timestamp of the file
+   */
+  long getModificationTimeStamp();
+  
+  /**
+   * @return the size of file
+   */
+  long getSize();
+
+  /**
+   * Reads sorted oplog file.
+   */
+  public interface HoplogReader extends HoplogSetReader<byte[], byte[]> {
+    /**
+     * Returns a byte buffer based view of the value linked to the key
+     */
+    ByteBuffer get(byte[] key) throws IOException;
+
+    /**
+     * @return Returns the bloom filter associated with this sorted oplog file.
+     */
+    BloomFilter getBloomFilter() throws IOException;
+
+    /**
+     * @return number of KV pairs in the file, including tombstone entries
+     */
+    long getEntryCount();
+
+    /**
+     * Returns the {@link ICardinality} implementation that is useful for
+     * estimating the size of this Hoplog.
+     * 
+     * @return the cardinality estimator
+     */
+    ICardinality getCardinalityEstimator();
+  }
+
+  /**
+   * Provides hoplog's reader's activity related events to owners
+   * 
+   */
+  public interface HoplogReaderActivityListener {
+    /**
+     * Invoked when a reader is created and an active reader did not exist
+     * earlier
+     */
+    public void readerCreated();
+    
+    /**
+     * Invoked when an active reader is closed
+     */
+    public void readerClosed();
+  }
+
+  /**
+   * Writes key/value pairs in a sorted oplog file. Each entry that is appended must have a key that
+   * is greater than or equal to the previous key.
+   */
+  public interface HoplogWriter extends Closeable {
+    /**
+     * Appends another key and value. The key is expected to be greater than or equal to the last
+     * key that was appended.
+     * @param key
+     * @param value
+     */
+    void append(byte[] key, byte[] value) throws IOException;
+
+    /**
+     * Appends another key and value. The key is expected to be greater than or equal to the last
+     * key that was appended.
+     */
+    void append(ByteBuffer key, ByteBuffer value) throws IOException;
+
+    void close(EnumMap<Meta, byte[]> metadata) throws IOException;
+    
+    /**
+     * flushes all outstanding data into the OS buffers on all DN replicas 
+     * @throws IOException
+     */
+    void hsync() throws IOException;
+    
+    /**
+     * Gets the size of the data that has already been written
+     * to the writer.  
+     * 
+     * @return number of bytes already written to the writer
+     */
+    public long getCurrentSize() throws IOException; 
+  }
+
+  /**
+   * Identifies the gemfire sorted oplog versions.
+   */
+  public enum HoplogVersion {
+    V1;
+
+    /**
+     * Returns the version string as bytes.
+     * 
+     * @return the byte form
+     */
+    public byte[] toBytes() {
+      return name().getBytes();
+    }
+
+    /**
+     * Constructs the version from a byte array.
+     * 
+     * @param version
+     *          the byte form of the version
+     * @return the version enum
+     */
+    public static HoplogVersion fromBytes(byte[] version) {
+      return HoplogVersion.valueOf(new String(version));
+    }
+  }
+
+  /**
+   * Names the available metadata keys that will be stored in the sorted oplog.
+   */
+  public enum Meta {
+    /** identifies the soplog as a gemfire file, required */
+    GEMFIRE_MAGIC,
+
+    /** identifies the soplog version, required */
+    SORTED_OPLOG_VERSION,
+    
+    /** identifies the gemfire version the soplog was created with */
+    GEMFIRE_VERSION,
+
+    /** identifies the statistics data */
+    STATISTICS,
+
+    /** identifies the embedded comparator types */
+    COMPARATORS,
+    
+    /** identifies the pdx type data, optional */
+    PDX,
+
+    /**
+     * identifies the hyperLogLog byte[] which estimates the cardinality for
+     * only one hoplog
+     */
+    LOCAL_CARDINALITY_ESTIMATE,
+
+    /**
+     * represents the hyperLogLog byte[] after upgrading the constant from
+     * 0.1 to 0.03 (in gfxd 1.4)
+     */
+    LOCAL_CARDINALITY_ESTIMATE_V2
+    ;
+
+    /**
+     * Converts the metadata name to bytes.
+     */
+    public byte[] toBytes() {
+      return ("gemfire." + name()).getBytes();
+    }
+
+    /**
+     * Converts the byte form of the name to an enum.
+     * 
+     * @param key
+     *          the key as bytes
+     * @return the enum form
+     */
+    public static Meta fromBytes(byte[] key) {
+      return Meta.valueOf(new String(key).substring("gemfire.".length()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java
new file mode 100644
index 0000000..7b8415e
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+
+/**
+ * This interface defines all the hoplog configuration related constants. One
+ * location simplifies searching for a constant
+ * 
+ */
+public interface HoplogConfig {
+  // max number of open files per bucket. by default each region has 113
+  // buckets. A typical hdfs deployment has 5 DN each allowing 4096 open
+  // files. The intent is to use around 40 % of these and hence the default
+  // value is 72
+  public static final String BUCKET_MAX_OPEN_HFILES_CONF = "hoplog.bucket.max.open.files";
+  public final Integer BUCKET_MAX_OPEN_HFILES_DEFAULT = 72;
+  
+  public static final String HFILE_BLOCK_SIZE_CONF = "hoplog.hfile.block.size";
+  
+  // Region maintenance activity interval. default is 2 mins
+  public static final String JANITOR_INTERVAL_SECS = "hoplog.janitor.interval.secs";
+  public static final long JANITOR_INTERVAL_SECS_DEFAULT = 120l;
+  
+  // Maximum number of milliseconds to wait for suspension action to complete
+  public static final String SUSPEND_MAX_WAIT_MS = "hoplog.suspend.max.wait.ms";
+  public static final long SUSPEND_MAX_WAIT_MS_DEFAULT = 1000l;
+  
+  // Compaction request queue limit configuraiton
+  public static final String COMPCATION_QUEUE_CAPACITY = "hoplog.compaction.queue.capacity";
+  public static final int COMPCATION_QUEUE_CAPACITY_DEFAULT = 500;
+  
+  // Compaction request queue limit configuraiton
+  public static final String COMPACTION_FILE_RATIO = "hoplog.compaction.file.ratio";
+  public static final float COMPACTION_FILE_RATIO_DEFAULT = 1.3f;
+  
+  //Amount of time before deleting old temporary files
+  public static final String TMP_FILE_EXPIRATION = "hoplog.tmp.file.expiration.ms";
+  public static final long TMP_FILE_EXPIRATION_DEFAULT = 10 * 60 * 1000;
+  
+  // If this property is set as true, GF will let DFS client cache FS objects
+  public static final String USE_FS_CACHE = "hoplog.use.fs.cache";
+
+  // If set hdfs store will be able to connect to local file System
+  public static final String ALLOW_LOCAL_HDFS_PROP = "hoplog.ALLOW_LOCAL_HDFS";
+  
+  // The following constants are used to read kerberos authentication related
+  // configuration. Currently these configurations are provided as client config
+  // file while hdfs store is created
+  public static final String KERBEROS_PRINCIPAL = "gemfirexd.kerberos.principal";
+  public static final String KERBEROS_KEYTAB_FILE= "gemfirexd.kerberos.keytab.file";
+  public static final String PERFORM_SECURE_HDFS_CHECK_PROP = "gemfire.PERFORM_SECURE_HDFS_CHECK";
+  
+  // clean up interval file that exposed to MapReduce jobs
+  public static final String CLEAN_UP_INTERVAL_FILE_NAME = "cleanUpInterval";
+  // Compression settings
+  public static final String COMPRESSION = "hoplog.compression.algorithm";
+  public static final String COMPRESSION_DEFAULT = "NONE";
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java
new file mode 100644
index 0000000..7c3de03
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+
+/**
+ * Defines an observer of asynchronous operations on sorted oplog files associated with a bucket.
+ */
+public interface HoplogListener {
+  /**
+   * Notifies creation of new sorted oplog files. A new file will be created after compaction or
+   * other bucket maintenance activities
+   * 
+   * @throws IOException
+   */
+  void hoplogCreated(String regionFolder, int bucketId, Hoplog... oplogs) throws IOException;
+
+  /**
+   * Notifies file deletion. A file becomes redundant after compaction or other bucket maintenance
+   * activities
+   * @throws IOException 
+   */
+  void hoplogDeleted(String regionFolder, int bucketId, Hoplog... oplogs) throws IOException;
+  
+  /**
+   * Notifies completion of a hoplog compaction cycle. 
+   * @param region Region on which compaction was performed
+   * @param bucket bucket id
+   * @param isMajor true if major compaction was executed
+   */
+  void compactionCompleted(String region, int bucket, boolean isMajor);
+}