You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:51 UTC

[05/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
new file mode 100644
index 0000000..8746a0b
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
@@ -0,0 +1,1044 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.TieredCompactionJUnitTest.TestHoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HdfsSortedOplogOrganizerJUnitTest extends BaseHoplogTestCase {
+  /**
+   * Tests flush operation
+   */
+  public void testFlush() throws Exception {
+    int count = 10;
+    int bucketId = (int) System.nanoTime();
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < count; i++) {
+      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+    }
+    organizer.flush(items.iterator(), count);
+
+    // check file existence in bucket directory
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+                      HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    // only one hoplog should exists
+    assertEquals(1, hoplogs.length);
+    
+    assertEquals(count, organizer.sizeEstimate());
+    assertEquals(0, stats.getActiveReaderCount());
+  }
+
+  /**
+   * Tests reads from a set of hoplogs containing both valid and stale KVs
+   */
+  public void testReopen() throws Exception {
+    int bucketId = (int) System.nanoTime();
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+    
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 100; i++) {
+      items.add(new TestEvent("" + i, ("1-1")));
+    }
+    organizer.flush(items.iterator(), items.size());
+    
+    Hoplog hoplog = organizer.getSortedOplogs().iterator().next().get();
+    byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+    hoplog.close();
+    
+    for (int i = 0; i < 10; i++) {
+      Path path = new Path(testDataDir, getName() + "/" + bucketId + "/" + hoplog.getFileName());
+      HFileSortedOplog oplog = new HFileSortedOplog(hdfsStore, path, blockCache, stats, storeStats);
+      oplog.getReader().read(keyBytes1);
+      oplog.close(false);
+    }
+  }
+  
+  /**
+   * Tests reads from a set of hoplogs containing both valid and stale KVs
+   */
+  public void testRead() throws Exception {
+    doRead(regionManager);
+  }
+  
+//  public void testNewReaderWithNameNodeHA() throws Exception {
+//    deleteMiniClusterDir();
+//    int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
+//    int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
+//    
+//    MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
+//    initClientHAConf(nn1port, nn2port);
+//    
+//    HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
+//    regionfactory.setHDFSStoreName(store1.getName());
+//    Region<Object, Object> region1 = regionfactory.create("region-1");
+//    HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
+//    
+//    HoplogOrganizer<SortedHoplogPersistedEvent> organizer = doRead(regionManager1);
+//    organizer.close();
+//    
+//    dunit.DistributedTestCase.IgnoredException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
+//    NameNode nnode2 = cluster.getNameNode(1);
+//    assertTrue(nnode2.isStandbyState());
+//    cluster.shutdownNameNode(0);
+//    cluster.transitionToActive(1);
+//    assertFalse(nnode2.isStandbyState());
+//    
+//    organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
+//    byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+//    byte[] keyBytes3 = BlobHelper.serializeToBlob("3");
+//    byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
+//    assertEquals("2-1", organizer.read(keyBytes1).getValue());
+//    assertEquals("3-3", organizer.read(keyBytes3).getValue());
+//    assertEquals("1-4", organizer.read(keyBytes4).getValue());
+//    ex.remove();
+//
+//    region1.destroyRegion();
+//    store1.destroy();
+//    cluster.shutdown();
+//    FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
+//  }
+  
+//  public void testActiveReaderWithNameNodeHA() throws Exception {
+//    deleteMiniClusterDir();
+//    int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
+//    int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
+//    
+//    MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
+//    initClientHAConf(nn1port, nn2port);
+//    
+//    HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
+//    regionfactory.setHDFSStoreName(store1.getName());
+//    Region<Object, Object> region1 = regionfactory.create("region-1");
+//    HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
+//    
+//    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
+//    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+//    for (int i = 100000; i < 101000; i++) {
+//      items.add(new TestEvent(("" + i), (i + " some string " + i)));
+//    }
+//    organizer.flush(items.iterator(), items.size());
+//    organizer.getSortedOplogs().get(0).get().getReader();
+//    
+//    dunit.DistributedTestCase.IgnoredException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
+//    NameNode nnode2 = cluster.getNameNode(1);
+//    assertTrue(nnode2.isStandbyState());
+//    cluster.shutdownNameNode(0);
+//    cluster.transitionToActive(1);
+//    assertFalse(nnode2.isStandbyState());
+//    
+//    for (int i = 100000; i < 100500; i++) {
+//      byte[] keyBytes1 = BlobHelper.serializeToBlob("" + i);
+//      assertEquals(i + " some string " + i, organizer.read(keyBytes1).getValue());
+//    }
+//    ex.remove();
+//    region1.destroyRegion();
+//    store1.destroy();
+//    cluster.shutdown();
+//    FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
+//  }
+  
+//  public void testFlushWithNameNodeHA() throws Exception {
+//    deleteMiniClusterDir();
+//    int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
+//    int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
+//    
+//    MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
+//    
+//    initClientHAConf(nn1port, nn2port);
+//    HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
+//    
+//    regionfactory.setHDFSStoreName(store1.getName());
+//    Region<Object, Object> region1 = regionfactory.create("region-1");
+//    HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
+//    
+//    HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
+//    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+//    items.add(new TestEvent(("1"), ("1-1")));
+//    organizer.flush(items.iterator(), items.size());
+//
+//    dunit.DistributedTestCase.IgnoredException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
+//    NameNode nnode2 = cluster.getNameNode(1);
+//    assertTrue(nnode2.isStandbyState());
+//    cluster.shutdownNameNode(0);
+//    cluster.transitionToActive(1);
+//    assertFalse(nnode2.isStandbyState());
+//    
+//    items.add(new TestEvent(("4"), ("1-4")));
+//    organizer.flush(items.iterator(), items.size());
+//    byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+//    byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
+//    assertEquals("1-1", organizer.read(keyBytes1).getValue());
+//    assertEquals("1-4", organizer.read(keyBytes4).getValue());
+//    ex.remove();
+//    
+//    region1.destroyRegion();
+//    store1.destroy();
+//    cluster.shutdown();
+//    FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
+//  }
+
+  public HoplogOrganizer<SortedHoplogPersistedEvent> doRead(HdfsRegionManager rm) throws Exception {
+    HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(rm, 0);
+
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1-1")));
+    items.add(new TestEvent(("4"), ("1-4")));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent(("1"), ("2-1")));
+    items.add(new TestEvent(("3"), ("2-3")));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent(("3"), ("3-3")));
+    items.add(new TestEvent(("5"), ("3-5")));
+    organizer.flush(items.iterator(), items.size());
+
+    // check file existence in bucket directory
+    FileStatus[] hoplogs = getBucketHoplogs(rm.getStore().getFileSystem(),
+        rm.getRegionFolder() + "/" + 0,
+        HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    // expect 3 files are 3 flushes
+    assertEquals(3, hoplogs.length);
+    byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+    byte[] keyBytes3 = BlobHelper.serializeToBlob("3");
+    byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
+    // expect key 1 from hoplog 2
+    assertEquals("2-1", organizer.read(keyBytes1).getValue());
+    // expect key 3 from hoplog 3
+    assertEquals("3-3", organizer.read(keyBytes3).getValue());
+    // expect key 4 from hoplog 1
+    assertEquals("1-4", organizer.read(keyBytes4).getValue());
+    return organizer;
+  }
+
+  /**
+   * Tests bucket organizer initialization during startup. Existing hoplogs should identified and
+   * returned
+   */
+  public void testHoplogIdentification() throws Exception {
+    // create one empty file and one directories in bucket directory
+    Path bucketPath = new Path(testDataDir, getName() + "/0");
+    FileSystem fs = hdfsStore.getFileSystem();
+    fs.createNewFile(new Path(bucketPath, "temp_file"));
+    fs.mkdirs(new Path(bucketPath, "temp_dir"));
+
+    // create 2 hoplogs files each of type flush, minor and major hoplog
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION};
+    for (String string : extensions) {
+      Hoplog oplog = organizer.getTmpSortedOplog(null, string);
+      createHoplog(0, oplog);
+      organizer.makeLegitimate(oplog);
+    }
+
+    // create a temp hoplog
+    Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    createHoplog(0, oplog);
+
+    // bucket directory should have 6 hoplogs, 1 temp log, 1 misc file and 1 directory
+    FileStatus[] results = fs.listStatus(bucketPath);
+    assertEquals(9, results.length);
+
+    // only two are hoplogs
+    List<Hoplog> list = organizer.identifyAndLoadSortedOplogs(true);
+    assertEquals(6, list.size());
+  }
+
+  public void testExpiryMarkerIdentification() throws Exception {
+    // epxired hoplogs from the list below should be deleted
+    String[] files = {
+        "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+        "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION };
+    
+    Path bucketPath = new Path(testDataDir, getName() + "/0");
+    FileSystem fs = hdfsStore.getFileSystem();
+    for (String file : files) {
+      Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+          blockCache, stats, storeStats);
+      createHoplog(10, oplog);
+    }
+
+    String marker1 = "0-4-1234"
+        + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+        + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
+    fs.createNewFile(new Path(bucketPath, marker1));
+    String marker2 = "0-5-1235"
+        + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+        + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
+    fs.createNewFile(new Path(bucketPath, marker2));    
+    
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+    assertEquals(7, hoplogs.length);
+    
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+        regionManager, 0);
+    
+    FileStatus[] markers = organizer.getExpiryMarkers();
+    // one hoplog and one exp marker will be deletion targets
+    assertEquals(2, markers.length);
+    for (FileStatus marker : markers) {
+      String name = marker.getPath().getName();
+      assertTrue(name.equals(marker1) || name.equals(marker2));
+    }
+    organizer.close();
+  }
+  
+  public void testExpiredHoplogCleanup() throws Exception {
+    // epxired hoplogs from the list below should be deleted
+    String[] files = {
+        "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION
+        + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+        
+        "0-2-0000" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+        "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+        
+        "0-3-0000" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+            + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+        
+        "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION };
+    
+    Path bucketPath = new Path(testDataDir, getName() + "/0");
+    FileSystem fs = hdfsStore.getFileSystem();
+    for (String file : files) {
+      if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
+        fs.createNewFile(new Path(bucketPath, file));
+        continue;
+      }
+      Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+          blockCache, stats, storeStats);
+      createHoplog(10, oplog);
+    }
+    
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+    assertEquals(9, hoplogs.length);
+
+    long target = System.currentTimeMillis();
+    TimeUnit.SECONDS.sleep(1);
+    
+    // all but minor compacted files from below this will not be deleted as it
+    // is after target delete time
+    files = new String[] { 
+        "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION
+            + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+            
+        "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+            + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+        "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        
+        "0-6-6666" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+    };
+    for (String file : files) {
+      if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
+        fs.createNewFile(new Path(bucketPath, file));
+        continue;
+      }
+      Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+          blockCache, stats, storeStats);
+      createHoplog(10, oplog);
+    }
+    
+    hoplogs = getBucketHoplogs(getName() + "/0", "");
+    assertEquals(13, hoplogs.length);
+    int hopSize = 0;
+    for (FileStatus file : hoplogs) {
+      if(file.getLen() > hopSize) {
+        hopSize = (int) file.getLen();
+      }
+    }
+
+    final AtomicInteger behavior = new AtomicInteger(0);
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+      @Override
+      protected FileStatus[] getExpiryMarkers() throws IOException {
+        if (behavior.get() == 1) {
+          ArrayList<FileStatus> markers = new ArrayList<FileStatus>();
+          for (FileStatus marker : super.getExpiryMarkers()) {
+            markers.add(marker);
+          }
+          // inject a dummy old expiry marker for major compacted file
+          long age = 2 * HDFSStore.DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS * 60 * 1000;
+          String markerName = "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
+          FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
+          markers.add(marker);
+          return markers.toArray(new FileStatus[markers.size()]);
+        }
+        return super.getExpiryMarkers();
+      }
+    };
+
+    List<FileStatus> list = organizer.getOptimizationTargets(target);
+    assertEquals(6, list.size());
+
+    behavior.set(1);
+    list = organizer.getOptimizationTargets(target);
+    assertEquals(8, list.size());
+    
+    assertEquals(9 * hopSize, stats.getStoreUsageBytes());
+    int count = organizer.deleteExpiredFiles(list);
+    assertEquals(8, count);
+    assertEquals(5 * hopSize, stats.getStoreUsageBytes());
+    
+    List<FileStatus> tmp = new ArrayList<FileStatus>(Arrays.asList(hoplogs));
+    for (Iterator<FileStatus> iter = tmp.iterator(); iter.hasNext();) {
+      hoplogs = getBucketHoplogs(getName() + "/0", "");
+      FileStatus file = iter.next();
+      for (FileStatus hoplog : hoplogs) {
+        if(hoplog.getPath().getName().startsWith("0-5-5555")) {
+          fail("this file should have been deleted" + hoplog.getPath().getName());
+        }
+
+        if (hoplog.getPath().getName().equals(file.getPath().getName())) {
+          iter.remove();
+          break;
+        }
+      }
+    }
+
+    assertEquals(7, tmp.size());
+    organizer.close();
+  }
+  
+  public void testAlterPurgeInterval() throws Exception {
+    // epxired hoplogs from the list below should be deleted
+    String[] files = {
+        "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION };
+    
+    Path bucketPath = new Path(testDataDir, getName() + "/0");
+    hdfsStore.getFileSystem();
+    for (String file : files) {
+      Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+          blockCache, stats, storeStats);
+      createHoplog(10, oplog);
+    }
+    
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+    int hopSize = 0;
+    for (FileStatus file : hoplogs) {
+      if(file.getLen() > hopSize) {
+        hopSize = (int) file.getLen();
+      }
+    }
+
+    final AtomicInteger behavior = new AtomicInteger(0);
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+      @Override
+      protected FileStatus[] getExpiryMarkers() throws IOException {
+        if (behavior.get() == 1) {
+          ArrayList<FileStatus> markers = new ArrayList<FileStatus>();
+          // inject dummy old expiry markers
+          long age = 120 * 1000; // 120 seconds old
+          String markerName = "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
+          FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
+          markers.add(marker);
+          markerName = "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
+          marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
+          markers.add(marker);
+          return markers.toArray(new FileStatus[markers.size()]);
+        }
+        return super.getExpiryMarkers();
+      }
+    };
+
+    behavior.set(1);
+    int count = organizer.initiateCleanup();
+    assertEquals(0, count);
+    
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    mutator.setPurgeInterval(1);
+    hdfsStore.alter(mutator);
+    count = organizer.initiateCleanup();
+    assertEquals(4, count);
+  }
+  
+  public void testInUseExpiredHoplogCleanup() throws Exception {
+    Path bucketPath = new Path(testDataDir, getName() + "/0");
+    FileSystem fs = hdfsStore.getFileSystem();
+    
+    String[] files = new String[] {
+        "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+        "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION };
+    
+    for (String file : files) {
+      Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+          blockCache, stats, storeStats);
+      createHoplog(10, oplog);
+    }
+    
+    final HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+        regionManager, 0);
+    List<TrackedReference<Hoplog>> hopRefs = organizer.getSortedOplogs();
+    assertEquals(files.length, hopRefs.size());
+    
+    // this is expiry marker for one of the files that will be compacted below.
+    // While compaction is going on file deletion should not happen
+    files = new String[] { "0-5-1235"
+        + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+        + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION };
+    
+    for (String file : files) {
+      fs.createNewFile(new Path(bucketPath, file));
+    }
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+    assertEquals(hopRefs.size() + files.length, hoplogs.length);
+    
+    TimeUnit.MILLISECONDS.sleep(200);
+    long target = System.currentTimeMillis();
+    List<FileStatus> list = organizer.getOptimizationTargets(target);
+    assertEquals(2, list.size());
+    
+    for (TrackedReference<Hoplog> ref : hopRefs) {
+      ref.increment("test");
+    }
+
+    fs.delete(new Path(bucketPath, files[0]), false);
+    
+    TimeUnit.MILLISECONDS.sleep(50);
+    organizer.markSortedOplogForDeletion(hopRefs, false);
+    
+    list = organizer.getOptimizationTargets(target);
+    assertEquals(0, list.size());
+    organizer.close();
+  }
+  
+  /**
+   * Tests max sequence initialization when file already exists and server starts
+   */
+  public void testSeqInitialization() throws Exception {
+    // create many hoplogs files
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+        HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION};
+    for (String string : extensions) {
+      Hoplog oplog = organizer.getTmpSortedOplog(null, string);
+      createHoplog(1, oplog);
+      organizer.makeLegitimate(oplog);
+    }
+
+    // a organizer should start creating files starting at 6 as five files already existed
+    organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    createHoplog(1, oplog);
+    organizer.makeLegitimate(oplog);
+    assertEquals(6, HdfsSortedOplogOrganizer.getSequenceNumber(oplog));
+    organizer.close();
+  }
+
+  /**
+   * Tests temp file creation and making file legitimate
+   */
+  public void testMakeLegitimate() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    // create empty tmp hoplog
+    Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    createHoplog(0, oplog);
+
+    Path hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName());
+    FileSystem fs = hdfsStore.getFileSystem();
+    FileStatus hoplogStatus = fs.getFileStatus(hoplogPath);
+    assertNotNull(hoplogStatus);
+
+    organizer.makeLegitimate(oplog);
+
+    try {
+      hoplogStatus = fs.getFileStatus(hoplogPath);
+      assertNull(hoplogStatus);
+    } catch (FileNotFoundException e) {
+      // tmp file is renamed hence should not exist, exception expected
+    }
+
+    assertTrue(oplog.getFileName().endsWith(HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION));
+    hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName());
+    hoplogStatus = fs.getFileStatus(hoplogPath);
+    assertNotNull(hoplogStatus);
+  }
+
+  /**
+   * Tests hoplog file name comparator
+   */
+  public void testHoplogFileComparator() throws IOException {
+    String name1 = "bucket1-10-3.hop";
+    String name2 = "bucket1-1-20.hop";
+    String name3 = "bucket1-30-201.hop";
+    String name4 = "bucket1-100-201.hop";
+
+    TreeSet<TrackedReference<Hoplog>> list = new TreeSet<TrackedReference<Hoplog>>(new HoplogComparator());
+    // insert soplog is the list out of expected order
+    hdfsStore.getFileSystem();
+    list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name2), blockCache, stats, storeStats)));
+    list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name4), blockCache, stats, storeStats)));
+    list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name1), blockCache, stats, storeStats)));
+    list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name3), blockCache, stats, storeStats)));
+
+    Iterator<TrackedReference<Hoplog>> iter = list.iterator();
+    assertEquals(name4, iter.next().get().getFileName());
+    assertEquals(name3, iter.next().get().getFileName());
+    assertEquals(name2, iter.next().get().getFileName());
+    assertEquals(name1, iter.next().get().getFileName());
+  }
+  
+  /**
+   * Tests clear on a set of hoplogs.
+   */
+  public void testClear() throws Exception {
+    int bucketId = (int) System.nanoTime();
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1-1")));
+    items.add(new TestEvent(("4"), ("1-4")));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent(("1"), ("2-1")));
+    items.add(new TestEvent(("3"), ("2-3")));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent(("3"), ("3-3")));
+    items.add(new TestEvent(("5"), ("3-5")));
+    organizer.flush(items.iterator(), items.size());
+
+    // check file existence in bucket directory
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    // expect 3 files are 3 flushes
+    assertEquals(3, hoplogs.length);
+    
+    organizer.clear();
+    
+    // check that all files are now expired
+    hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    FileStatus[] exs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, exs);
+    assertEquals(Collections.EMPTY_LIST, Arrays.asList(valids));
+    
+    assertEquals(0, stats.getActiveFileCount());
+    assertEquals(0, stats.getInactiveFileCount());
+  }
+  
+  public void testFixedIntervalMajorCompaction() throws Exception {
+    final AtomicInteger majorCReqCount = new AtomicInteger(0);
+    
+    final Compactor compactor = new AbstractCompactor() {
+      @Override
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        majorCReqCount.incrementAndGet();
+        return true;
+      }
+    };
+    
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+      @Override
+      public synchronized Compactor getCompactor() {
+        return compactor;
+      }
+    };
+    
+    regionManager.addOrganizer(0, organizer);
+    
+    System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "1");
+    HDFSRegionDirector.resetJanitor();
+    
+    alterMajorCompaction(hdfsStore, true);
+    
+    // create hoplog in the past, 90 seconds before current time
+    organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000));
+    TimeUnit.MILLISECONDS.sleep(50);
+    organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000));
+    
+    List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+    assertEquals(2, hoplogs.size());
+    
+    for (int i = 0; i < 3; i++) {
+      TimeUnit.SECONDS.sleep(1);
+      assertEquals(0, majorCReqCount.get());
+    }
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    mutator.setMajorCompactionInterval(1);
+    hdfsStore.alter(mutator);
+    TimeUnit.SECONDS.sleep(5);
+    assertTrue(3 < majorCReqCount.get());
+  }
+  
+ 
+  public void testCorruptHfileBucketFail() throws Exception {
+    // create a corrupt file
+    FileSystem fs = hdfsStore.getFileSystem();
+    for (int i = 0; i < 113; i++) {
+      FSDataOutputStream opStream = fs.create(new Path(testDataDir.getName() + "/region-1/" + i + "/1-1-1.hop"));
+      opStream.writeBytes("Some random corrupt file");
+      opStream.close();
+    }
+      
+    // create region with store
+    regionfactory.setHDFSStoreName(HDFS_STORE_NAME);
+    Region<Object, Object> region1 = regionfactory.create("region-1");
+    IgnoredException ex = IgnoredException.addIgnoredException("CorruptHFileException");
+    try {
+      region1.get("key");
+      fail("get should have failed with corrupt file error");
+    } catch (HDFSIOException e) {
+      // expected
+    } finally {
+      ex.remove();
+    }
+    
+    region1.destroyRegion();
+  }
+
+  public void testMaxOpenReaders() throws Exception {
+    System.setProperty("hoplog.bucket.max.open.files", "5");
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 10; i++) {
+      items.clear();
+      items.add(new TestEvent("" + i, "" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+    List<TrackedReference<Hoplog>> hoplogs = bucket.getSortedOplogs();
+    int closedCount = 0 ;
+    for (TrackedReference<Hoplog> hoplog : hoplogs) {
+      HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get();
+      if (hfile.isClosed()) { 
+        closedCount++;
+      }
+    }
+    assertEquals(10, closedCount);
+    assertEquals(10, stats.getActiveFileCount());
+    assertEquals(0, stats.getActiveReaderCount());
+    
+    byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+    organizer.read(keyBytes1).getValue();
+    
+    closedCount = 0 ;
+    for (TrackedReference<Hoplog> hoplog : hoplogs) {
+      HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get();
+      if (hfile.isClosed()) { 
+        closedCount++;
+      }
+    }
+    assertEquals(5, closedCount);
+    assertEquals(10, stats.getActiveFileCount());
+    assertEquals(0, stats.getInactiveFileCount());
+    assertEquals(5, stats.getActiveReaderCount());
+    
+    organizer.getCompactor().compact(false, false);
+    assertEquals(1, stats.getActiveFileCount());
+    assertEquals(0, stats.getActiveReaderCount());
+    assertEquals(0, stats.getInactiveFileCount());
+  }
+
+  public void testConcurrentReadInactiveClose() throws Exception {
+    final HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+    alterMinorCompaction(hdfsStore, true);
+
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 4; i++) {
+      items.clear();
+      items.add(new TestEvent("" + i, "" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    final byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+    class ReadTask implements Runnable {
+      public void run() {
+        try {
+          organizer.read(keyBytes1);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    ScheduledExecutorService[] readers = new ScheduledExecutorService[10];
+    for (int i = 0; i < readers.length; i++) {
+      readers[i] = Executors.newSingleThreadScheduledExecutor();
+      readers[i].scheduleWithFixedDelay(new ReadTask(), 0, 1, TimeUnit.MILLISECONDS);
+    }
+    
+    for (int i = 0; i < 100; i++) {
+      items.clear();
+      items.add(new TestEvent("" + i, "" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    for (int i = 0; i < readers.length; i++) {
+      readers[i].shutdown();
+      readers[i].awaitTermination(1, TimeUnit.SECONDS);
+      TimeUnit.MILLISECONDS.sleep(50);
+    }
+    
+    for (int i = 0; i < 20; i++) {
+      if (stats.getActiveFileCount() < 4) {
+        break;
+      }
+      organizer.getCompactor().compact(false, false);
+    }
+
+    organizer.performMaintenance();
+    TimeUnit.SECONDS.sleep(1);
+    
+    assertTrue("" + stats.getActiveFileCount(), stats.getActiveFileCount() <= 4);
+    assertEquals(stats.getActiveReaderCount(), stats.getActiveReaderCount());
+    assertEquals(0, stats.getInactiveFileCount());
+  }
+  
+  public void testEmptyBucketCleanup() throws Exception {
+    HdfsSortedOplogOrganizer o = new HdfsSortedOplogOrganizer(regionManager, 0);
+    long target = System.currentTimeMillis();
+    o.getOptimizationTargets(target);
+    // making sure empty bucket is not causing IO errors. no assertion needed
+    // for this test case.
+  }
+  
+  public void testExpiredFilterAtStartup() throws Exception {
+    HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0);
+    
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1-1")));
+    items.add(new TestEvent(("4"), ("1-4")));
+    bucket.flush(items.iterator(), items.size());
+    
+    items.clear();
+    items.add(new TestEvent(("1"), ("2-1")));
+    items.add(new TestEvent(("3"), ("2-3")));
+    bucket.flush(items.iterator(), items.size());
+    
+    FileStatus[] files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    
+    files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+    List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs();
+    assertEquals(2, hoplogs.size());
+    
+    bucket.clear();
+    
+    files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    
+    files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    
+    bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+    hoplogs = bucket2.getSortedOplogs();
+    assertEquals(0, hoplogs.size());
+    
+    items.clear();
+    items.add(new TestEvent(("1"), ("2-1")));
+    items.add(new TestEvent(("3"), ("2-3")));
+    bucket.flush(items.iterator(), items.size());
+    
+    bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+    hoplogs = bucket2.getSortedOplogs();
+    assertEquals(1, hoplogs.size());
+    bucket.close();
+    bucket2.close();
+  }
+
+  public void testExpireFilterRetartAfterClear() throws Exception {
+    HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0);
+    
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1-1")));
+    items.add(new TestEvent(("4"), ("1-4")));
+    bucket.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent(("1"), ("2-1")));
+    items.add(new TestEvent(("3"), ("2-3")));
+    bucket.flush(items.iterator(), items.size());
+    
+    FileStatus[] files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    
+    files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+    List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs();
+    assertEquals(2, hoplogs.size());
+    
+    bucket.clear();
+    
+    files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    
+    files = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    
+    bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+    hoplogs = bucket2.getSortedOplogs();
+    assertEquals(0, hoplogs.size());
+    bucket.close();
+    bucket2.close();
+  }
+  
+  /**
+   * tests maintenance does not fail even if there are no hoplogs
+   */
+  public void testNoFileJanitor() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer;
+    organizer = regionManager.create(0);
+    organizer.performMaintenance();
+  }
+  
+  public void testValidHoplogRegex() {
+    String[] valid = {"1-1-1.hop", "1-1-1.ihop", "1-1-1.chop"};
+    String[] invalid = {"1-1-1.khop", "1-1-1.hop.tmphop", "1-1-1.hop.ehop", "1-1-.hop", "-1-1.hop"};
+    
+    for (String string : valid) {
+      Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string);
+      assertTrue(matcher.matches());
+    }
+    
+    for (String string : invalid) {
+      Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string);
+      assertFalse(matcher.matches());
+    }
+  }
+  
+  public void testOneHoplogMajorCompaction() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    alterMajorCompaction(hdfsStore, true);
+    
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1-1")));
+    organizer.flush(items.iterator(),items.size());    
+    
+    
+    FileStatus[] files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(1, files.length);    
+    
+    //Minor compaction will not perform on 1 .hop file
+    organizer.getCompactor().compact(false, false);
+    files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    //Major compaction will perform on 1 .hop file
+    organizer.getCompactor().compact(true, false);
+    files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);     
+    assertEquals(1, files.length);
+    String hoplogName =files[0].getPath().getName();    
+    files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    organizer.getCompactor().compact(true, false);
+    files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(1, files.length);
+    assertEquals(hoplogName, files[0].getPath().getName());
+    
+    //Minor compaction does not convert major compacted file
+    organizer.getCompactor().compact(false, false);
+    files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(1, files.length);
+    assertEquals(hoplogName, files[0].getPath().getName());
+    
+    files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(1, files.length);
+    assertNotSame(hoplogName + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION, files[0].getPath().getName() );
+  }
+
+  public void testExposeCleanupInterval() throws Exception {
+    FileSystem fs = hdfsStore.getFileSystem();
+    Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+    assertTrue(fs.exists(cleanUpIntervalPath));
+    long interval = HDFSStore.DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS
+        *60 * 1000;
+    assertEquals(interval, HoplogUtil.readCleanUpIntervalMillis(fs,cleanUpIntervalPath));
+  }
+  
+  @Override
+  protected void setUp() throws Exception {
+    System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "" + HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+    super.setUp();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
new file mode 100644
index 0000000..7420437
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
@@ -0,0 +1,540 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.junit.experimental.categories.Category;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HfileSortedOplogJUnitTest extends BaseHoplogTestCase {
+	ArrayList<Object> toBeCleaned = new ArrayList<>();
+  
+  /**
+   * Tests hoplog creation using a writer. If this test fails, all the tests wills fail as hoplog
+   * creation is the first step
+   */
+  public void testHoplogWriter() throws Exception {
+    String hoplogName = getRandomHoplogName();
+    createHoplog(hoplogName, 1);
+    FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName));
+    assertNotNull(hoplogStatus);
+  }
+
+  /**
+   * Tests hoplog deletion.
+   */
+  public void testDeletion() throws Exception {
+    String hoplogName = getRandomHoplogName();
+    createHoplog(hoplogName, 1);
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+
+    testHoplog.delete();
+
+    try {
+      FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName));
+      // hoplog should not exists. fail if it does
+      assertNull("File deletion failed", hoplogStatus);
+    } catch (FileNotFoundException e) {
+      // exception expected after deletion
+    }
+  }
+
+  /**
+   * Tests hoplog reader creation and key based gets
+   */
+  public void testHoplogReader() throws Exception {
+    String hop1 = getRandomHoplogName();
+    Map<String, String> map = createHoplog(hop1, 10);
+
+    HFileSortedOplog testHoplog1 = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hop1), blockCache, stats, storeStats);
+    HoplogReader reader = testHoplog1.getReader();
+    // verify that each entry put in the hoplog is returned by reader
+    for (Entry<String, String> entry : map.entrySet()) {
+      byte[] value = reader.read(entry.getKey().getBytes());
+      assertNotNull(value);
+    }
+  }
+
+  /**
+   * Tests full iteration on a hoplog. Ensures all inserted keys are returned and no key is missing
+   */
+  public void testIterator() throws IOException {
+    int count = 10;
+    ByteArrayComparator bac = new ByteArrayComparator();
+
+    String hoplogName = getRandomHoplogName();
+    TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
+
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+    HoplogReader reader = testHoplog.getReader();
+
+    Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
+    HoplogIterator<byte[], byte[]> iter = reader.scan();
+    for (; iter.hasNext();) {
+      byte[] key = iter.next();
+      Entry<String, String> entry = mapIter.next();
+      assertEquals(0, bac.compare(key, iter.getKey()));
+      assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
+      assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
+      count--;
+    }
+    assertEquals(0, count);
+  }
+
+  /**
+   * Tests hoplog iterator. after returning first key, has next should return false and all
+   * subsequent next calls should return null
+   */
+  public void testSingleKVIterator() throws Exception {
+    String hoplogName = getRandomHoplogName();
+    TreeMap<String, String> map = createHoplog(hoplogName, 1);
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+    HoplogReader reader = testHoplog.getReader();
+
+    HoplogIterator<byte[], byte[]> iter = reader.scan();
+    assertNull(iter.getKey());
+    assertNull(iter.getValue());
+    assertTrue(iter.hasNext());
+    assertNull(iter.getKey());
+    assertNull(iter.getValue());
+
+    Entry<String, String> entry = map.firstEntry();
+    iter.next();
+    assertNotNull(iter.getKey());
+    assertEquals(entry.getKey(), new String(iter.getKey()));
+    assertNotNull(iter.getValue());
+    assertEquals(entry.getValue(), new String(iter.getValue()));
+
+    assertFalse(iter.hasNext());
+    try {
+      iter.next();
+      fail();
+    } catch (NoSuchElementException e) {
+    }
+  }
+
+  /**
+   * Tests iteration on a hoplog with no keys, using a scanner. Scanner should not return any value
+   * and hasNext should return false everytime
+   */
+  public void testEmptyFileIterator() throws Exception {
+    String hoplogName = getRandomHoplogName();
+    createHoplog(hoplogName, 0);
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+    HoplogReader reader = testHoplog.getReader();
+    HoplogIterator<byte[], byte[]> iter = reader.scan();
+    assertNull(iter.getKey());
+    assertNull(iter.getValue());
+    assertFalse(iter.hasNext());
+    assertNull(iter.getKey());
+    assertNull(iter.getValue());
+    try {
+      iter.next();
+      fail();
+    } catch (NoSuchElementException e) {
+    }
+  }
+
+  /**
+   * Tests from exclusive iterator
+   */
+  public void testFromExclusiveIterator() throws Exception {
+    fromIterator(false);
+  }
+
+  /**
+   * Tests from inclusive iterator
+   */
+  public void testFromInclusiveIterator() throws Exception {
+    fromIterator(true);
+  }
+
+  /**
+   * Tests from condition based iteration. creates hoplog with 10 KVs. Creates a scanner starting at
+   * a middle key and verifies the count of KVs iterated on
+   */
+  public void fromIterator(boolean includeFrom) throws Exception {
+    int count = 10;
+    ByteArrayComparator bac = new ByteArrayComparator();
+
+    String hoplogName = getRandomHoplogName();
+    // sorted map contains the keys inserted in the hoplog for testing
+    TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
+
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+    HoplogReader reader = testHoplog.getReader();
+
+    int middleKey = 4;
+    // remove top keys from the sorted map as the hoplog scanner should not
+    // return those
+    Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
+    for (int i = 0; i < middleKey; i++) {
+      mapIter.next();
+      count--;
+    }
+    if (!includeFrom) {
+      mapIter.next();
+      count--;
+    }
+
+    // keys are like Key-X, for X=0 till X=9. Start iterator at fifth key,
+    // key-4. if excluding from key, start at sixth key, key-5.
+    HoplogIterator<byte[], byte[]> iter = reader.scan(("key-" + middleKey).getBytes(), includeFrom,
+        null, true);
+
+    for (; iter.hasNext();) {
+      byte[] key = iter.next();
+      Entry<String, String> entry = mapIter.next();
+      // make sure the KV returned by iterator match the inserted KV
+      assertEquals(0, bac.compare(key, iter.getKey()));
+      assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
+      assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
+      count--;
+    }
+    assertEquals(0, count);
+  }
+
+  /**
+   * Tests to exclusive iterator
+   */
+  public void testToExclusiveIterator() throws Exception {
+    toIterator(false);
+  }
+
+  /**
+   * Tests to inclusive iterator
+   */
+  public void testToInclusiveIterator() throws Exception {
+    toIterator(true);
+  }
+
+  /**
+   * Tests to condition based iteration. creates hoplog with 10 KVs. Creates a scanner ending at
+   * a middle key and verifies the count of KVs iterated on
+   */
+  public void toIterator(boolean includeTo) throws Exception {
+    int count = 10;
+    ByteArrayComparator bac = new ByteArrayComparator();
+    
+    String hoplogName = getRandomHoplogName();
+    // sorted map contains the keys inserted in the hoplog for testing
+    TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
+    Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
+    
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+    HoplogReader reader = testHoplog.getReader();
+    
+    int middleKey = 4;
+    // keys are like Key-X, for X=0 till X=9. End iterator at fifth key,
+    // key-4. if excluding to key, end at fourth key, key-3.
+    HoplogIterator<byte[], byte[]> iter = reader.scan(null, true, ("key-" + middleKey).getBytes(), includeTo);
+    
+    for (; iter.hasNext();) {
+      byte[] key = iter.next();
+      Entry<String, String> entry = mapIter.next();
+      // make sure the KV returned by iterator match the inserted KV
+      assertEquals(0, bac.compare(key, iter.getKey()));
+      assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
+      assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
+      
+      count --;
+    }
+    
+    if (includeTo) {
+      count++;
+    }
+
+    assertEquals(10, count + middleKey);
+  }
+  
+  /**
+   * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled
+   */
+  public void testFromToIterator() throws IOException {
+    ByteArrayComparator bac = new ByteArrayComparator();
+    String hoplogName = getRandomHoplogName();
+    HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+    
+    int count = 5;
+    HoplogWriter writer = hoplog.createWriter(5);
+    for (int i = 0; i < count; i++) {
+      String value = "value-" + (i * 2);
+      // even keys key-[0 2 4 6 8]
+      writer.append(("key-" + (i * 2)).getBytes(), value.getBytes());
+    }
+    writer.close();
+    
+    HoplogReader reader = hoplog.getReader();
+    HoplogIterator<byte[], byte[]> iter = reader.scan("key-1".getBytes(), true, "key-7".getBytes(), true);
+
+    for (int i = 2; i < 7; i += 2) {
+      assertTrue(iter.hasNext());
+      iter.next();
+      assertEquals(0, bac.compare(("key-" + i).getBytes(), iter.getKey()));
+      assertEquals(0, bac.compare(("value-" + i).getBytes(), iter.getValue()));
+      System.out.println(new String(iter.getKey()));
+    }
+    assertFalse(iter.hasNext());
+  }
+  
+  /**
+   * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled
+   */
+  public void testDuplicateKeys() throws IOException {
+    String hoplogName = getRandomHoplogName();
+    HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+
+    // write duplicate keys
+    int count = 2;
+    HoplogWriter writer = hoplog.createWriter(2);
+    List<String> values = new ArrayList<String>();
+    for(int i = 1; i <= count; i++) {
+      String value = "value" + i;
+      writer.append("key-1".getBytes(), value.getBytes());
+      values.add(value);
+    }
+    writer.close();
+
+    HoplogReader reader = hoplog.getReader();
+    HoplogIterator<byte[], byte[]> scanner = reader.scan();
+    for (byte[] key = null; scanner.hasNext();) {
+      key = scanner.next();
+      count--;
+      assertEquals(0, Bytes.compareTo(key, "key-1".getBytes()));
+      values.remove(new String(scanner.getValue()));
+    }
+    assertEquals(0, count);
+    assertEquals(0, values.size());
+  }
+  
+  public void testOffsetBasedScan() throws Exception {
+    // Each record is 43 bytes. each block is 256 bytes. each block will have 6
+    // records
+     
+    int blocksize = 1 << 8;
+    System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
+        String.valueOf(blocksize));
+
+    int count = 50;
+    String hoplogName = getRandomHoplogName();
+    createHoplog(hoplogName, count);
+
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
+        testDataDir, hoplogName), blockCache, stats, storeStats);
+
+    HoplogReader reader = testHoplog.getReader();
+    
+    HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 1, blocksize * 2);
+    int range1Count = 0;
+    String range1EndKey = null;
+    for (byte[] key = null; scanner.hasNext();) {
+      key = scanner.next();
+      range1Count++;
+      range1EndKey = new String(key);
+    }
+    int range1EndKeyNum = Integer.valueOf(range1EndKey.substring("Key-".length()));
+
+    scanner = reader.scan(blocksize * 2, blocksize * 1);
+    int range2Count = 0;
+    String range2EndKey = null;
+    for (byte[] key = null; scanner.hasNext();) {
+      key = scanner.next();
+      range2Count++;
+      range2EndKey = new String(key);
+    }
+    
+    assertEquals(range2EndKey, range1EndKey);
+    assertEquals(2, range1Count/range2Count);
+    
+    scanner = reader.scan(blocksize * 3, blocksize * 1);
+    String range3FirstKey = new String(scanner.next());
+    
+    int range3FirstKeyNum = Integer.valueOf(range3FirstKey.substring("Key-"
+        .length()));
+    
+    // range 3 starts at the end of range 1. so the two keys must be consecutive
+    assertEquals(range1EndKeyNum + 1, range3FirstKeyNum);
+    
+    testHoplog.close();
+  }
+  
+  public void testOffsetScanBeyondFileSize() throws Exception {
+    // Each record is 43 bytes. each block is 256 bytes. each block will have 6
+    // records
+    
+    int blocksize = 1 << 8;
+    System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
+        String.valueOf(blocksize));
+    
+    int count = 20;
+    String hoplogName = getRandomHoplogName();
+    createHoplog(hoplogName, count);
+    
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
+        testDataDir, hoplogName), blockCache, stats, storeStats);
+    
+    HoplogReader reader = testHoplog.getReader();
+    
+    HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 5, blocksize * 2);
+    assertFalse(scanner.hasNext());
+    
+    testHoplog.close();
+  }
+  
+  public void testZeroValueOffsetScan() throws Exception {
+    // Each record is 43 bytes. each block is 256 bytes. each block will have 6
+    // records
+    
+    int blocksize = 1 << 8;
+    System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
+        String.valueOf(blocksize));
+    
+    int count = 20;
+    String hoplogName = getRandomHoplogName();
+    createHoplog(hoplogName, count);
+    
+    HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
+        testDataDir, hoplogName), blockCache, stats, storeStats);
+    
+    HoplogReader reader = testHoplog.getReader();
+    
+    HoplogIterator<byte[], byte[]> scanner = reader.scan(0, blocksize * 2);
+    assertTrue(scanner.hasNext());
+    int keyNum = Integer.valueOf(new String(scanner.next()).substring("Key-"
+        .length()));
+    assertEquals(100000, keyNum);
+
+    testHoplog.close();
+  }
+  
+  /*
+   * Tests reader succeeds to read data even if FS client is recycled without
+   * this reader knowing
+   */
+  public void testReaderDetectAndUseRecycledFs() throws Exception {
+    HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory();
+    HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1");
+    toBeCleaned.add(store);
+
+    HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats);
+    toBeCleaned.add(hop);
+    TreeMap<String, String> map = createHoplog(10, hop);
+
+    HoplogReader reader = hop.getReader();
+    // verify that each entry put in the hoplog is returned by reader
+    for (Entry<String, String> entry : map.entrySet()) {
+      byte[] value = reader.read(entry.getKey().getBytes());
+      assertNotNull(value);
+    }
+
+    cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>");
+    try {
+      store.getFileSystem().close();
+      store.checkAndClearFileSystem();
+      
+      for (Entry<String, String> entry : map.entrySet()) {
+        reader = hop.getReader();
+        byte[] value = reader.read(entry.getKey().getBytes());
+        assertNotNull(value);
+      }
+    } finally {
+      cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>");
+    }
+  }
+
+  public void testNewScannerDetechAndUseRecycledFs() throws Exception {
+    HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory();
+    HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1");
+    toBeCleaned.add(store);
+
+    HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats);
+    createHoplog(10, hop);
+
+    HoplogIterator<byte[], byte[]> scanner = hop.getReader().scan();
+    // verify that each entry put in the hoplog is returned by reader
+    int i = 0;
+    while (scanner.hasNext()) {
+      byte[] key = scanner.next();
+      assertNotNull(key);
+      i++;
+    }
+    assertEquals(10, i);
+    // flush block cache
+    hop.close(true);
+    hop.delete();
+    
+    hop = new HFileSortedOplog(store, new Path(getName()+"-1-1.hop"), blockCache, stats, storeStats);
+		createHoplog(10, hop);
+  	toBeCleaned.add(hop);
+    hop.getReader();
+    
+    cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>");
+    try {
+      store.getFileSystem().close();
+      store.checkAndClearFileSystem();
+      
+      scanner = hop.getReader().scan();
+      // verify that each entry put in the hoplog is returned by reader
+      i = 0;
+      while (scanner.hasNext()) {
+        byte[] key = scanner.next();
+        assertNotNull(key);
+        i++;
+      }
+      assertEquals(10, i);
+    } finally {
+      cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>");
+    }
+  }
+  
+  @Override
+  protected void tearDown() throws Exception {
+    for (Object obj : toBeCleaned) {
+      try {
+        if (HDFSStoreImpl.class.isInstance(obj)) {
+          ((HDFSStoreImpl) obj).clearFolder();
+        } else if (AbstractHoplog.class.isInstance(obj)) {
+          ((AbstractHoplog) obj).close();
+          ((AbstractHoplog) obj).delete();
+        }
+      } catch (Exception e) {
+        System.out.println(e);
+      }
+    }
+    super.tearDown();
+  }
+    
+  private TreeMap<String, String> createHoplog(String hoplogName, int numKeys) throws IOException {
+    HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+    TreeMap<String, String> map = createHoplog(numKeys, hoplog);
+    return map;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
new file mode 100644
index 0000000..13aa6a9
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
@@ -0,0 +1,178 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class SortedOplogListIterJUnitTest extends BaseHoplogTestCase {
+  public void testOneIterOneKey() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("0"), ("0")));
+    organizer.flush(items.iterator(), items.size());
+
+    List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+    HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+    assertTrue(iter.hasNext());
+    int count = 0;
+    for (ByteBuffer keyBB = null; iter.hasNext();) {
+      keyBB = iter.next();
+      byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+      assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+      count++;
+    }
+    assertEquals(1, count);
+    organizer.close();
+  }
+  
+  public void testOneIterDuplicateKey() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("0"), ("V2")));
+    items.add(new TestEvent(("0"), ("V1")));
+    items.add(new TestEvent(("1"), ("V2")));
+    items.add(new TestEvent(("1"), ("V1")));
+    organizer.flush(items.iterator(), items.size());
+    
+    List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+    HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+    assertTrue(iter.hasNext());
+    int count = 0;
+    for (ByteBuffer keyBB = null; iter.hasNext();) {
+      keyBB = iter.next();
+      byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+      byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
+      assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+      assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
+      count++;
+    }
+    assertEquals(2, count);
+    organizer.close();
+  }
+  
+  public void testTwoIterSameKey() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("0"), ("V1")));
+    organizer.flush(items.iterator(), items.size());
+    items.clear();
+    items.add(new TestEvent(("0"), ("V2")));
+    organizer.flush(items.iterator(), items.size());
+    
+    List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+    HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+    assertTrue(iter.hasNext());
+    int count = 0;
+    for (ByteBuffer keyBB = null; iter.hasNext();) {
+      keyBB = iter.next();
+      byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+      byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
+      assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+      assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
+      count++;
+    }
+    assertEquals(1, count);
+    organizer.close();
+  }
+  
+  public void testTwoIterDiffKey() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("0"), ("V1")));
+    organizer.flush(items.iterator(), items.size());
+    items.clear();
+    items.add(new TestEvent(("1"), ("V1")));
+    organizer.flush(items.iterator(), items.size());
+    
+    List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+    HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+    assertTrue(iter.hasNext());
+    int count = 0;
+    for (ByteBuffer keyBB = null; iter.hasNext();) {
+      keyBB = iter.next();
+      byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+      byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
+      assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+      assertEquals("V1", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
+      count++;
+    }
+    assertEquals(2, count);
+    organizer.close();
+  }
+  
+  public void testMergedIterator() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    // #1
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1")));
+    items.add(new TestEvent(("2"), ("1")));
+    items.add(new TestEvent(("3"), ("1")));
+    items.add(new TestEvent(("4"), ("1")));
+    organizer.flush(items.iterator(), items.size());
+
+    // #2
+    items.clear();
+    items.add(new TestEvent(("2"), ("1")));
+    items.add(new TestEvent(("4"), ("1")));
+    items.add(new TestEvent(("6"), ("1")));
+    items.add(new TestEvent(("8"), ("1")));
+    organizer.flush(items.iterator(), items.size());
+
+    // #3
+    items.clear();
+    items.add(new TestEvent(("1"), ("1")));
+    items.add(new TestEvent(("3"), ("1")));
+    items.add(new TestEvent(("5"), ("1")));
+    items.add(new TestEvent(("7"), ("1")));
+    items.add(new TestEvent(("9"), ("1")));
+    organizer.flush(items.iterator(), items.size());
+
+    // #4
+    items.clear();
+    items.add(new TestEvent(("0"), ("1")));
+    items.add(new TestEvent(("1"), ("1")));
+    items.add(new TestEvent(("4"), ("1")));
+    items.add(new TestEvent(("5"), ("1")));
+    organizer.flush(items.iterator(), items.size());
+
+    List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+    HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+    // the iteration pattern for this test should be 0-9:
+    // 0 1 4 5 oplog #4
+    // 1 3 5 7 9 oplog #3
+    // 2 4 6 8 oplog #2
+    // 1 2 3 4 oplog #1
+    int count = 0;
+    for (ByteBuffer keyBB = null; iter.hasNext();) {
+      keyBB = iter.next();
+      byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+      assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+      count++;
+    }
+    assertEquals(10, count);
+    organizer.close();
+  }
+}