You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:52 UTC

[06/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
new file mode 100644
index 0000000..011d82b
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
@@ -0,0 +1,449 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager.CompactionRequest;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSCompactionManagerJUnitTest extends BaseHoplogTestCase {
+  /**
+   * Tests queueing of major and minor compaction requests in respective queues
+   */
+  public void testMinMajCompactionIsolation() throws Exception {
+    // no-op compactor
+    Compactor compactor = new AbstractCompactor() {
+      Object minor = new Object();
+      Object major = new Object();
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        try {
+          if (isMajor) {
+            synchronized (major) {
+              major.wait();
+            }
+          } else {
+            synchronized (minor) {
+              minor.wait();
+            }
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+        return true;
+      }
+    };
+
+    // compaction is disabled. all requests will wait in queue
+    HDFSCompactionManager instance = HDFSCompactionManager.getInstance(hdfsStore);
+    alterMinorCompaction(hdfsStore, true);
+    alterMajorCompaction(hdfsStore, true);
+    
+    assertEquals(0, instance.getMinorCompactor().getActiveCount());
+    assertEquals(0, instance.getMajorCompactor().getActiveCount());
+    
+    //minor request
+    CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
+    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    //major request
+    cr = new CompactionRequest("region", 0, compactor, true);
+    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    
+    //wait for requests to get in queue
+    TimeUnit.MILLISECONDS.sleep(50);
+    assertEquals(1, instance.getMinorCompactor().getActiveCount());
+    assertEquals(1, instance.getMajorCompactor().getActiveCount());
+  }
+
+  /**
+   * Tests compaction pause. Once compaction is stopped, requests will 
+   * start getting rejected
+   */
+  public void testAlterAutoMinorCompaction() throws Exception {
+    // each new compaction execution increments counter by 1. this way track how many pending tasks
+    final AtomicInteger totalExecuted = new AtomicInteger(0);
+    Compactor compactor = new AbstractCompactor() {
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        totalExecuted.incrementAndGet();
+        return true;
+      }
+    };
+
+    // compaction is enabled. submit requests and after some time counter should be 0
+    alterMinorCompaction(hdfsStore, true);
+    CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
+    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    cr = new CompactionRequest("region", 1, compactor, false);
+    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+
+    int totalWait = 20;
+    while (totalWait > 0 && 2 != totalExecuted.get()) {
+      // wait for operations to complete. The execution will terminate as soon as possible
+      System.out.println("waiting one small cycle for dummy request to complete");
+      TimeUnit.MILLISECONDS.sleep(50);
+      totalWait--;
+    }
+    assertEquals(2, totalExecuted.get());
+
+    // so compaction works. now disable comapction and submit large number of requests till rejected
+    // execution counter should not increase
+    alterMinorCompaction(hdfsStore, false);
+    boolean success = false;
+    int i = 0;
+    do {
+      cr = new CompactionRequest("region", ++i, compactor, false);
+      success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+    } while (success);
+
+    TimeUnit.MILLISECONDS.sleep(500);
+    assertEquals(2, totalExecuted.get());
+  }
+  public void testAlterAutoMajorCompaction() throws Exception {
+    // each new compaction execution increments counter by 1. this way track how many pending tasks
+    final AtomicInteger totalExecuted = new AtomicInteger(0);
+    Compactor compactor = new AbstractCompactor() {
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        totalExecuted.incrementAndGet();
+        return true;
+      }
+    };
+    
+    // compaction is enabled. submit requests and after some time counter should be 0
+    alterMajorCompaction(hdfsStore, true);
+    CompactionRequest cr = new CompactionRequest("region", 0, compactor, true);
+    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    cr = new CompactionRequest("region", 1, compactor, true);
+    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    
+    int totalWait = 20;
+    while (totalWait > 0 && 2 != totalExecuted.get()) {
+      // wait for operations to complete. The execution will terminate as soon as possible
+      System.out.println("waiting one small cycle for dummy request to complete");
+      TimeUnit.MILLISECONDS.sleep(50);
+      totalWait--;
+    }
+    assertEquals(2, totalExecuted.get());
+    
+    // so compaction works. now disable comapction and submit large number of requests till rejected
+    // execution counter should not increase
+    alterMajorCompaction(hdfsStore, false);
+    boolean success = false;
+    int i = 0;
+    do {
+      cr = new CompactionRequest("region", ++i, compactor, true);
+      success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+      System.out.println("success: " + success);
+    } while (success);
+    
+    TimeUnit.MILLISECONDS.sleep(500);
+    assertEquals(2, totalExecuted.get());
+  }
+  
+  /**
+   * Tests duplicate compaction requests do not cause rejection
+   */
+   public void testDuplicateRequests() throws Exception {
+    final AtomicBoolean barrierOpen = new AtomicBoolean(false);
+    class TestCompactor extends AbstractCompactor {
+      AtomicBoolean busy = new AtomicBoolean(false);
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        synchronized (barrierOpen) {
+          busy.set(true);
+          if (barrierOpen.get()) {
+            return false;
+          }
+          try {
+            barrierOpen.wait();
+          } catch (InterruptedException e) {
+            return false;
+          }
+          busy.set(false);
+        }
+        return true;
+      }
+      public boolean isBusy(boolean isMajor) {return busy.get();}
+    };
+    
+    System.setProperty(HoplogConfig.COMPCATION_QUEUE_CAPACITY, "10");
+
+    alterMinorCompaction(hdfsStore, true);
+    alterMajorCompaction(hdfsStore, true);
+    // capacity is 10, thread num is 2, so only the first 12 request will be
+    // submitted
+    for (int i = 0; i < 15; i++) {
+      CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), true);
+      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+      if (success) {
+        assertTrue("failed for " + i, i < 12);
+      } else {
+        assertTrue("failed for " + i, i >= 12);
+      }
+    }
+    
+    synchronized (barrierOpen) {
+      barrierOpen.set(true);
+      barrierOpen.notifyAll();
+    }
+    TimeUnit.MILLISECONDS.sleep(100);
+    barrierOpen.set(false);
+    
+    HDFSCompactionManager.getInstance(hdfsStore).reset();
+    TestCompactor compactor = new TestCompactor();
+    for (int i = 0; i < 10; i++) {
+      TimeUnit.MILLISECONDS.sleep(20);
+      CompactionRequest cr = new CompactionRequest("region", 0, compactor, true);
+      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+      if (success) {
+        assertTrue("failed for " + i, i < 2);
+      } else {
+        assertTrue("failed for " + i, i > 0);
+      }
+    }
+  }
+
+  public void testForceCompactionWithAutoDisabled() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1-1")));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent(("2"), ("2-1")));
+    organizer.flush(items.iterator(), items.size());
+    
+    FileStatus[] files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    CompactionRequest cr = new CompactionRequest(getName(), 0, organizer.getCompactor(), true);
+    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    TimeUnit.MILLISECONDS.sleep(500);
+
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    organizer.forceCompaction(true);
+    TimeUnit.MILLISECONDS.sleep(500);
+    
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(1, files.length);
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+  }
+
+  /**
+   * Test force major compaction completes on version upgrade even when there is only one hoplog
+   */
+  public void testForceCompaction() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent(("1"), ("1-1")));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent(("2"), ("2-1")));
+    organizer.flush(items.iterator(), items.size());
+    
+    FileStatus[] files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(0, files.length);
+    
+    // isForced is true for user submitted compaction requests (through system procedure)
+    // we do not want to compact an already compacted file
+    CompactionRequest cr = new CompactionRequest(getName(), 0, organizer.getCompactor(), true, true/*isForced*/);
+    Future<CompactionStatus> status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    status.get().equals(true);
+
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(1, files.length);
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+
+    // second request to force compact does not do anything
+    status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    status.get().equals(false);
+    
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(1, files.length);
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+
+    // upon version upgrade force compaction is allowed
+    cr = new CompactionRequest(getName(), 0, organizer.getCompactor(), true, true, true);
+    status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+    status.get().equals(true);
+    
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(2, files.length);
+    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(3, files.length); // + 1 for old major hoplog
+  }
+
+  /**
+   * Test successful sequential submission
+   */
+  public void testSameBucketSeqRequest() throws Exception {
+    final AtomicInteger counter = new AtomicInteger(0);
+    Compactor compactor = new AbstractCompactor() {
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        counter.set(1);
+        return true;
+      }
+    };
+
+    HDFSCompactionManager.getInstance(hdfsStore).reset();
+    alterMinorCompaction(hdfsStore, true);
+    alterMajorCompaction(hdfsStore, true);
+    CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
+    assertEquals(0, counter.get());
+    boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+    assertEquals(true, success);
+    while (!counter.compareAndSet(1, 0)) {
+      TimeUnit.MILLISECONDS.sleep(20);
+    }
+    
+    assertEquals(0, counter.get());
+    success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+    assertEquals(true, success);
+    for (int i = 0; i < 10; i++) {
+      TimeUnit.MILLISECONDS.sleep(20);
+      if (counter.get() == 1) {
+        break;
+      }
+    }
+    assertEquals(1, counter.get());
+  }
+  
+  public void testAlterMinorThreadsIncrease() throws Exception {
+    doAlterCompactionThreads(false, false);
+  }
+  public void testAlterMinorThreadsDecrease() throws Exception {
+    doAlterCompactionThreads(false, true);
+  }
+  public void testAlterMajorThreadsIncrease() throws Exception {
+    doAlterCompactionThreads(true, false);
+  }
+  public void testAlterMajorThreadsDecrease() throws Exception {
+    doAlterCompactionThreads(true, true);
+  }
+  
+  public void doAlterCompactionThreads(final boolean testMajor, boolean decrease) throws Exception {
+    final AtomicBoolean barrierOpen = new AtomicBoolean(false);
+    final AtomicInteger counter = new AtomicInteger(0);
+    class TestCompactor extends AbstractCompactor {
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        synchronized (barrierOpen) {
+          if ((testMajor && !isMajor)  || (!testMajor && isMajor)) {
+            return true;
+          }
+          if (barrierOpen.get()) {
+            return false;
+          }
+          try {
+            barrierOpen.wait();
+          } catch (InterruptedException e) {
+            return false;
+          }
+          counter.incrementAndGet();
+        }
+        return true;
+      }
+    };
+    
+    System.setProperty(HoplogConfig.COMPCATION_QUEUE_CAPACITY, "1");
+
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    int defaultThreadCount = 10;
+    if (testMajor) {
+      alterMajorCompaction(hdfsStore, true);
+      defaultThreadCount = 2;
+      mutator.setMajorCompactionThreads(15);
+      if (decrease) {
+        mutator.setMajorCompactionThreads(1);
+      }
+    } else {
+      alterMinorCompaction(hdfsStore, true);
+      mutator.setMinorCompactionThreads(15);
+      if (decrease) {
+        mutator.setMinorCompactionThreads(1);
+      }
+    }
+    
+    // capacity is 1, thread num is 10 or 2, so only the first 11 or 3 request will be
+    // submitted
+    cache.getLogger().info("<ExpectedException action=add>java.util.concurrent.RejectedExecutionException</ExpectedException>");
+    for (int i = 0; i < 15; i++) {
+      CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), testMajor);
+      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+      if (success) {
+        assertTrue("failed for " + i, i <= defaultThreadCount);
+      } else {
+        assertTrue("failed for " + i, i > defaultThreadCount);
+      }
+    }
+    
+    TimeUnit.MILLISECONDS.sleep(500);
+    assertEquals(0, counter.get());
+    synchronized (barrierOpen) {
+      barrierOpen.set(true);
+      barrierOpen.notifyAll();
+    }
+    TimeUnit.MILLISECONDS.sleep(500);
+    assertEquals(defaultThreadCount, counter.get());
+    
+    hdfsStore.alter(mutator);
+
+    counter.set(0);
+    barrierOpen.set(false);
+    for (int i = 0; i < 15; i++) {
+      TimeUnit.MILLISECONDS.sleep(100);
+      CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), testMajor);
+      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+      if (decrease) {
+        if (i > 3) {
+          assertFalse("failed for " + i, success);
+        }
+      } else {
+        assertTrue("failed for " + i, success);
+      }
+    }
+    TimeUnit.MILLISECONDS.sleep(500);
+    synchronized (barrierOpen) {
+      barrierOpen.set(true);
+      barrierOpen.notifyAll();
+    }
+    TimeUnit.MILLISECONDS.sleep(500);
+    if (decrease) {
+      assertTrue(counter.get() < 4);
+    } else {
+      assertEquals(15, counter.get());
+    }
+
+    cache.getLogger().info("<ExpectedException action=remove>java.util.concurrent.RejectedExecutionException</ExpectedException>");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
new file mode 100644
index 0000000..dc7b987
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
@@ -0,0 +1,97 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSRegionDirectorJUnitTest extends BaseHoplogTestCase {
+  public void testDirector() throws Exception {
+    int bucketId = 0;
+
+    HdfsRegionManager mgr = regionManager;
+    
+    // no buckets have been created so far.
+    assertEquals(0, director.getBucketCount("/" + getName()));
+
+    // one bucket created
+    mgr.create(bucketId);
+    assertEquals(1, director.getBucketCount("/" + getName()));
+
+    // close bucket test
+    mgr.close(bucketId);
+    
+    // all buckets have been closed.
+    assertEquals(0, director.getBucketCount("/" + getName()));
+
+    mgr.create(bucketId);
+    assertEquals(1, director.getBucketCount("/" + getName()));
+    director.clear("/" + getName());
+    try {
+      assertEquals(0, director.getBucketCount("/" + getName()));
+      fail("The region is no longer managed, hence an exception is expected");
+    } catch (IllegalStateException e) {
+      // exception expected as the region is no longer managed
+    }
+  }
+  
+  public void testCompactionEvents() throws Exception {
+    final AtomicInteger counter = new AtomicInteger(0);
+    HoplogListener myListener = new HoplogListener() {
+      public void hoplogDeleted(String regionFolder, int bucketId, Hoplog... oplogs)
+          throws IOException {
+      }
+      public void hoplogCreated(String regionFolder, int bucketId, Hoplog... oplogs)
+          throws IOException {
+      }
+      public void compactionCompleted(String region, int bucket, boolean isMajor) {
+        counter.incrementAndGet();
+      }
+    };
+
+    HoplogListenerForRegion listenerManager = ((LocalRegion)region).getHoplogListener();
+    listenerManager.addListener(myListener);
+    
+    HoplogOrganizer bucket = regionManager.create(0);
+    // #1
+    ArrayList<PersistedEventImpl> items = new ArrayList<PersistedEventImpl>();
+    items.add(new TestEvent("1", "1"));
+    bucket.flush(items.iterator(), items.size());
+
+    // #2
+    items.clear();
+    items.add(new TestEvent("2", "1"));
+    bucket.flush(items.iterator(), items.size());
+
+    // #3
+    items.clear();
+    items.add(new TestEvent("3", "1"));
+    bucket.flush(items.iterator(), items.size());
+    
+    // #4
+    items.clear();
+    items.add(new TestEvent("4", "1"));
+    bucket.flush(items.iterator(), items.size());
+    
+    bucket.getCompactor().compact(false, false);
+    assertEquals(1, counter.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
new file mode 100644
index 0000000..1d17232
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
@@ -0,0 +1,250 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSStatsJUnitTest extends BaseHoplogTestCase {
+  public void testStoreUsageStats() throws Exception {
+    HoplogOrganizer bucket = regionManager.create(0);
+    
+    long oldUsage = 0;
+    assertEquals(oldUsage, stats.getStoreUsageBytes());
+
+    for (int j = 0; j < 5; j++) {
+      ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+      for (int i = 0; i < 100; i++) {
+        String key = ("key-" + (j * 100 + i));
+        String value = ("value-" + System.nanoTime());
+        items.add(new TestEvent(key, value));
+      }
+      bucket.flush(items.iterator(), 100);
+    }
+    
+    assertTrue(0 < stats.getStoreUsageBytes());
+    oldUsage = stats.getStoreUsageBytes();
+    
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    assertEquals(2, stats.getStoreUsageBytes() / oldUsage);
+    
+    organizer.close();
+    assertEquals(1, stats.getStoreUsageBytes() / oldUsage);
+  }
+  
+  public void testWriteStats() throws Exception {
+    HoplogOrganizer bucket = regionManager.create(0);
+
+    // validate flush stats
+    // flush and create many hoplogs and execute one compaction cycle also
+    // 5 hoplogs, total 500 keys
+    assertEquals(0, stats.getFlush().getCount());
+    assertEquals(0, stats.getFlush().getBytes());
+    assertEquals(0, stats.getActiveFileCount());
+    int bytesSent = 0;
+    for (int j = 0; j < 5; j++) {
+      ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+      for (int i = 0; i < 100; i++) {
+        String key = ("key-" + (j * 100 + i));
+        String value = ("value-" + System.nanoTime());
+        items.add(new TestEvent(key, value));
+        bytesSent += (key.getBytes().length + value.getBytes().length);
+      }
+      bucket.flush(items.iterator(), 100);
+
+      // verify stats show
+      assertEquals(j + 1, stats.getFlush().getCount());
+      assertTrue(stats.getFlush().getBytes() > bytesSent);
+      assertEquals(j + 1, stats.getActiveFileCount());
+    }
+
+    // verify compaction stats
+    assertEquals(0, stats.getMinorCompaction().getCount());
+    assertEquals(0, stats.getMinorCompaction().getBytes());
+    assertEquals(0, stats.getInactiveFileCount());
+    bucket.getCompactor().compact(false, false);
+    assertEquals(1, stats.getMinorCompaction().getCount());
+    assertEquals(1, stats.getActiveFileCount());
+    assertEquals(0, stats.getInactiveFileCount());
+    assertEquals(stats.getMinorCompaction().getBytes(), stats.getFlush()
+        .getBytes());
+  }
+  
+  public void testInactiveFileStats() throws Exception {
+    // steps 
+    // create files -> validate active and inactive file count
+    // -> increment reference by using scanner-> compact -> verify active and inactive file count 
+    HoplogOrganizer bucket = regionManager.create(0);
+    assertEquals(0, stats.getActiveFileCount());
+    assertEquals(0, stats.getInactiveFileCount());
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int j = 0; j < 5; j++) {
+      items.clear();
+      for (int i = 0; i < 100; i++) {
+        String key = ("key-" + (j * 100 + i));
+        String value = ("value-" + System.nanoTime());
+        items.add(new TestEvent(key, value));
+      }
+      bucket.flush(items.iterator(), 100);
+    }
+    
+    assertEquals(5, stats.getActiveFileCount());
+    assertEquals(0, stats.getInactiveFileCount());
+    
+    HoplogIterator<byte[], PersistedEventImpl> scanner = bucket.scan();
+    bucket.getCompactor().compact(true, false);
+    assertEquals(1, stats.getActiveFileCount());
+    assertEquals(5, stats.getInactiveFileCount());
+    
+    scanner.close();
+    assertEquals(1, stats.getActiveFileCount());
+    assertEquals(0, stats.getInactiveFileCount());
+  }
+
+  public void testReadStats() throws Exception {
+    HoplogOrganizer<SortedHoplogPersistedEvent> bucket = regionManager.create(0);
+
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 100; i++) {
+      items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
+    }
+    bucket.flush(items.iterator(), 100);
+    
+    // validate read stats
+    assertEquals(0, stats.getRead().getCount());
+    assertEquals(0, stats.getRead().getBytes());
+    // number of bytes read must be greater than size of key and value and must be increasing
+    int bytesRead = "key-1".getBytes().length + "value=1233232".getBytes().length;
+    for (int i = 0; i < 5; i++) {
+      long previousRead = stats.getRead().getBytes();
+      PersistedEventImpl e = bucket.read(BlobHelper.serializeToBlob("key-" + i));
+      assertNotNull(e);
+      assertEquals(i + 1, stats.getRead().getCount());
+      assertTrue( (bytesRead + previousRead) < stats.getRead().getBytes());
+    }
+    
+    //Make sure the block cache stats are being updated.
+//    assertTrue(storeStats.getBlockCache().getMisses() > 0);
+//    assertTrue(storeStats.getBlockCache().getBytesCached() > 0);
+//    assertTrue(storeStats.getBlockCache().getCached() > 0);
+    
+    //Do a duplicate read to make sure we get a hit in the cache
+//    bucket.read(BlobHelper.serializeToBlob("key-" + 0));
+//    assertTrue(storeStats.getBlockCache().getHits() > 0);
+  }
+
+  public void testBloomStats() throws Exception {
+    HoplogOrganizer bucket = regionManager.create(0);
+
+    // create 10 hoplogs
+    for (int j = 0; j < 5; j++) {
+      ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+      for (int i = 0; i < 100; i++) {
+        String key = ("key-" + (j * 100 + i));
+        String value = ("value-" + System.nanoTime());
+        items.add(new TestEvent(key, value));
+      }
+      bucket.flush(items.iterator(), 100);
+    }
+
+    // initially bloom stat will be zero
+    // reading key in first hop will increase bloom hit by 1 (key 0 to 99)
+    // reading key in 5 hoplog will increase bloom hit by 5 (key 400 to 499)
+    assertEquals(0, stats.getBloom().getCount());
+    bucket.read(BlobHelper.serializeToBlob("key-450"));
+    assertEquals(1, stats.getBloom().getCount());
+    bucket.read(BlobHelper.serializeToBlob("key-50"));
+    assertEquals(6, stats.getBloom().getCount());
+  }
+  
+  public void testScanStats() throws Exception {
+    HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(
+          testDataDir, "H-1-1.hop"),blockCache, stats, storeStats);
+    createHoplog(5, hoplog);
+    
+    // initially scan stats will be zero. creating a scanner should increase
+    // scan iteration stats and bytes. On scanner close scan count should be
+    // incremented
+    assertEquals(0, stats.getScan().getCount());
+    assertEquals(0, stats.getScan().getBytes());
+    assertEquals(0, stats.getScan().getTime());
+    assertEquals(0, stats.getScan().getIterations());
+    assertEquals(0, stats.getScan().getIterationTime());
+    
+    HoplogIterator<byte[], byte[]> scanner = hoplog.getReader().scan();
+    assertEquals(0, stats.getScan().getCount());
+    int count = 0;
+    for (byte[] bs = null; scanner.hasNext(); ) {
+      bs = scanner.next();
+      count += bs.length + scanner.getValue().length;
+    }
+    assertEquals(count, stats.getScan().getBytes());
+    assertEquals(5, stats.getScan().getIterations());
+    assertTrue(0 < stats.getScan().getIterationTime());
+    // getcount will be 0 as scanner.close is not being called
+    assertEquals(0, stats.getScan().getCount());
+    assertEquals(0, stats.getScan().getTime());
+    assertEquals(1, stats.getScan().getInProgress());
+    
+    scanner.close();
+    assertEquals(1, stats.getScan().getCount());
+    assertTrue(0 < stats.getScan().getTime());
+    assertTrue(stats.getScan().getIterationTime() <= stats.getScan().getTime());
+  }
+  
+  /**
+   * Validates two buckets belonging to same region update the same stats
+   */
+  public void testRegionBucketShareStats() throws Exception {
+    HoplogOrganizer bucket1 = regionManager.create(0);
+    HoplogOrganizer bucket2 = regionManager.create(1);
+
+    // validate flush stats
+    assertEquals(0, stats.getFlush().getCount());
+    assertEquals(0, stats.getActiveFileCount());
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 100; i++) {
+      items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
+    }
+    bucket1.flush(items.iterator(), 100);
+    assertEquals(1, stats.getFlush().getCount());
+    assertEquals(1, stats.getActiveFileCount());
+    items.clear();
+
+    for (int i = 0; i < 100; i++) {
+      items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
+    }
+    bucket2.flush(items.iterator(), 100);
+    assertEquals(2, stats.getFlush().getCount());
+    assertEquals(2, stats.getActiveFileCount());
+  }
+
+  @Override
+  protected Cache createCache() {
+    CacheFactory cf = new CacheFactory().set("mcast-port", "0")
+        .set("log-level", "info")
+        .set("enable-time-statistics", "true")
+//        .set("statistic-archive-file", "statArchive.gfs")
+        ;
+    cache = cf.create();
+
+    return cache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
new file mode 100644
index 0000000..ab1ccac
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
@@ -0,0 +1,297 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog.SequenceFileIterator;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+/**
+ * Test class to test hoplog functionality for streaming ingest 
+ * 
+ * @author hemantb
+ *
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSUnsortedHoplogOrganizerJUnitTest extends BaseHoplogTestCase {
+ 
+  /**
+   * Tests flush operation
+   */
+  public void testFlush() throws Exception {
+    int count = 10;
+    int bucketId = (int) System.nanoTime();
+    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
+
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < count; i++) {
+      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+    }
+    
+    organizer.flush(items.iterator(), count);
+    organizer.closeCurrentWriter();
+    
+    // check file existence in bucket directory
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+                      HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+
+    // only one hoplog should exists
+    assertEquals(1, hoplogs.length);
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+  }
+  
+  public void testAlterRollOverInterval() throws Exception {
+    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, 0);
+    
+    // flush 4 times with small delays. Only one seq file will be created
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int j = 0; j < 3; j++) {
+      items.clear();
+      for (int i = 0; i < 10; i++) {
+        items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
+      }
+      organizer.flush(items.iterator(), 10);
+      TimeUnit.MILLISECONDS.sleep(1100);
+    }
+    organizer.closeCurrentWriter();
+    
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+    
+    // only one hoplog should exists
+    assertEquals(1, hoplogs.length);
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+    
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    mutator.setWriteOnlyFileRolloverInterval(1);
+    hdfsStore.alter(mutator);
+    
+    TimeUnit.MILLISECONDS.sleep(1100);
+    for (int j = 0; j < 2; j++) {
+      items.clear();
+      for (int i = 0; i < 10; i++) {
+        items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
+      }
+      organizer.flush(items.iterator(), 10);
+      TimeUnit.MILLISECONDS.sleep(1100);
+    }
+    organizer.closeCurrentWriter();
+    hoplogs = getBucketHoplogs(getName() + "/" + 0,
+        HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+    assertEquals(3, hoplogs.length);
+  }
+  
+  public void testSequenceFileScan() throws Exception {
+    int count = 10000;
+    int bucketId = (int) System.nanoTime();
+    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
+
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < count; i++) {
+      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+    }
+    
+    organizer.flush(items.iterator(), count);
+    organizer.closeCurrentWriter();
+    
+    // check file existence in bucket directory
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+                      HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+
+    // only one hoplog should exists
+    assertEquals(1, hoplogs.length);
+    
+    SequenceFileDetails sfd = getSequenceFileDetails(hdfsStore.getFileSystem(), hoplogs[0].getPath());
+    
+    // End position is before a sync. Should read until sync.
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync ,
+        0, sfd.posBeforeSecondSync);
+    
+    // Start position is inside header. Should start from first key and go to next sync point. 
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync, 
+        10, sfd.posAfterFirstSync);
+    
+    // Start and end position are between two sync markers. Should not read any keys.    
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 29, 28, 
+        sfd.posAfterFirstSync, sfd.posBeforeSecondSync - sfd.posAfterFirstSync);
+    
+    // Start position is after a sync and End position is beyond the file size. 
+    //Should read all the records after the next sync.
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), sfd.indexOfKeyAfterFirstSync, 9999, 
+        sfd.posBeforeFirstSync, 10000000);
+    
+    // Should read all the records. 
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, 9999, 0, -1);
+  }
+  
+  class SequenceFileDetails {
+    public int posBeforeFirstSync;
+    public int indexOfKeyBeforeFirstSync;
+    
+    public int posAfterFirstSync;
+    public int indexOfKeyAfterFirstSync; 
+    
+    public int posBeforeSecondSync;
+    public int indexOfKeyBeforeSecondSync;
+  }
+  
+  public SequenceFileDetails getSequenceFileDetails(FileSystem inputFS, Path sequenceFileName) throws Exception {
+    SequenceFileDetails fd = new SequenceFileDetails();
+    SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
+      
+    SequenceFileIterator iter = (SequenceFileIterator)hoplog.getReader().scan();;
+    int currentkeyStartPos = 0;
+    int cursorPos = 0;
+    String currentKey = null;
+    boolean firstSyncSeen = false; 
+    try {
+      while (iter.hasNext()) {
+        iter.next();
+        currentkeyStartPos = cursorPos;
+        currentKey = ((String)CacheServerHelper.deserialize(iter.getKey()));
+        cursorPos = (int)iter.getPosition();
+        if (iter.syncSeen()){
+          if (firstSyncSeen) {
+            
+            fd.posBeforeSecondSync = currentkeyStartPos;
+            fd.indexOfKeyBeforeSecondSync = Integer.parseInt(currentKey.substring(4));
+            break;
+          } else {
+            fd.posBeforeFirstSync = currentkeyStartPos;
+            fd.indexOfKeyBeforeFirstSync = Integer.parseInt(currentKey.substring(4));
+            
+            fd.posAfterFirstSync = cursorPos;
+            fd.indexOfKeyAfterFirstSync = Integer.parseInt(currentKey.substring(4)) + 1;
+            firstSyncSeen = true;
+          }
+        }
+      }
+
+    } catch (Exception e) {
+      assertTrue(e.toString(), false);
+    }
+    iter.close();
+    hoplog.close();
+    return fd;
+  }
+  
+  public void testClear() throws Exception {
+    int count = 10;
+    int bucketId = (int) System.nanoTime();
+    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
+
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < count; i++) {
+      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+    }
+    organizer.flush(items.iterator(), count);
+    organizer.closeCurrentWriter();
+    // check file existence in bucket directory
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+                      AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
+    assertEquals(1, hoplogs.length);
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+    
+    
+    // write another batch but do not close the data. 
+    organizer.flush(items.iterator(), count);
+    
+    organizer.clear();
+    
+    hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+        AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
+    // check file existence in bucket directory
+    FileStatus[] expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+                      AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    
+    // two expired hoplog should exists
+    assertEquals(2, expiredhoplogs.length);
+    assertEquals(2, hoplogs.length);
+    // check the expired hops name should be same 
+    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
+        expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
+        expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+    
+    // Test that second time clear should be harmless and should not result in extra files. 
+    organizer.clear();
+    hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+        AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
+    // check file existence in bucket directory
+    expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
+                      AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    
+    // two expired hoplog should exists
+    assertEquals(2, expiredhoplogs.length);
+    assertEquals(2, hoplogs.length);
+    // check the expired hops name should be same 
+    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
+        expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
+        expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+    
+    
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[1].getPath(), 0);
+  }
+  
+  public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index)  throws IOException{
+    readSequenceFile(inputFS, sequenceFileName, index, -1, 0, -1);
+  }
+  /**
+   * Reads the sequence file assuming that it has keys and values starting from index that 
+   * is specified as parameter. 
+   * 
+   */
+  public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index, int endIndex,
+      int startoffset, int length) throws IOException {
+    SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
+    
+    HoplogIterator<byte[], byte[]> iter = null;
+    if (length == -1){
+      iter = hoplog.getReader().scan();
+    }
+    else {
+      iter = hoplog.getReader().scan(startoffset, length);
+    }
+    
+    try {
+      while (iter.hasNext()) {
+        iter.next();
+        PersistedEventImpl te = UnsortedHoplogPersistedEvent.fromBytes(iter.getValue());
+        String stringkey = ((String)CacheServerHelper.deserialize(iter.getKey()));
+        assertTrue("Expected key: key-" + index + ". Actual key: " + stringkey , ((String)stringkey).equals("key-" + index));
+        index++;
+      }
+      if (endIndex != -1)
+      assertTrue ("The keys should have been until key-"+ endIndex + " but they are until key-"+ (index-1),  index == endIndex + 1) ;
+    } catch (Exception e) {
+      assertTrue(e.toString(), false);
+    }
+    iter.close();
+    hoplog.close();
+ }
+
+}