You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:50 UTC

[04/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
new file mode 100644
index 0000000..7b45952
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
@@ -0,0 +1,904 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HdfsSortedOplogOrganizer.HoplogCompactor;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class TieredCompactionJUnitTest extends BaseHoplogTestCase {
+  static long ONE_MB = 1024 * 1024;
+  static long TEN_MB = 10 * ONE_MB;
+  
+  @Override
+  protected void configureHdfsStoreFactory() throws Exception {
+    super.configureHdfsStoreFactory();
+    
+    hsf.setInputFileCountMin(3);
+    hsf.setMinorCompaction(false);
+    hsf.setMajorCompaction(false);
+  }
+  
+  public void testMinorCompaction() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    // #1
+    ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
+    items.add(new TestEvent("1", "1"));
+    items.add(new TestEvent("2", "1"));
+    items.add(new TestEvent("3", "1"));
+    items.add(new TestEvent("4", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    // #2
+    items.clear();
+    items.add(new TestEvent("2", "1"));
+    items.add(new TestEvent("4", "1"));
+    items.add(new TestEvent("6", "1"));
+    items.add(new TestEvent("8", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    // #3
+    items.clear();
+    items.add(new TestEvent("1", "1"));
+    items.add(new TestEvent("3", "1"));
+    items.add(new TestEvent("5", "1"));
+    items.add(new TestEvent("7", "1"));
+    items.add(new TestEvent("9", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    // #4
+    items.clear();
+    items.add(new TestEvent("0", "1"));
+    items.add(new TestEvent("1", "1"));
+    items.add(new TestEvent("4", "1"));
+    items.add(new TestEvent("5", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    // check file existence in bucket directory, expect 4 hoplgos
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(4, hoplogs.length);
+
+    // After compaction expect 1 hoplog only. It should have the same sequence number as that of the
+    // youngest file compacted, which should be 4 in this case
+    organizer.getCompactor().compact(false, false);
+    hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+    assertEquals(1, hoplogs.length);
+    assertEquals(1, organizer.getSortedOplogs().size());
+    Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
+    assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
+
+    // iterate on oplogs to validate data in files
+    HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
+    // the iteration pattern for this test should be 0-9:
+    // 0 1 4 5 oplog #4
+    // 1 3 5 7 9 oplog #3
+    // 2 4 6 8 oplog #2
+    // 1 2 3 4 oplog #1
+    int count = 0;
+    for (ByteBuffer keyBB = null; iter.hasNext();) {
+      keyBB = iter.next();
+      byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+      assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+      count++;
+    }
+    assertEquals(10, count);
+
+    // there must be 4 expired hoplogs now
+    hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(4, hoplogs.length);
+    organizer.close();
+  }
+  
+  public void testIterativeMinorCompaction() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    // #1
+    ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
+    items.add(new TestEvent("1", "1"));
+    items.add(new TestEvent("2", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent("1", "2"));
+    items.add(new TestEvent("3", "2"));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent("4", "3"));
+    items.add(new TestEvent("5", "3"));
+    organizer.flush(items.iterator(), items.size());
+    
+    // check file existence in bucket directory
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(3, hoplogs.length);
+
+    organizer.getCompactor().compact(false, false);
+    
+    FileStatus[] expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(3, expired.length);
+    FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
+    assertEquals(0, valids.length);
+    // After compaction expect 1 hoplog only.
+    hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+    assertEquals(1, hoplogs.length);
+    
+    items.clear();
+    items.add(new TestEvent("4", "4"));
+    items.add(new TestEvent("6", "4"));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent("7", "5"));
+    items.add(new TestEvent("8", "5"));
+    organizer.flush(items.iterator(), items.size());
+    
+    hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(5, hoplogs.length);
+    
+    organizer.getCompactor().compact(false, false);
+    expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(6, expired.length);
+    valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
+    assertEquals(0, valids.length);    
+    hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+    assertEquals(2, hoplogs.length);
+    valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
+    assertEquals(1, valids.length);
+    
+    assertEquals("2", organizer.read(BlobHelper.serializeToBlob("1")).getValue());
+    assertEquals("1", organizer.read(BlobHelper.serializeToBlob("2")).getValue());
+    assertEquals("2", organizer.read(BlobHelper.serializeToBlob("3")).getValue());
+    assertEquals("4", organizer.read(BlobHelper.serializeToBlob("4")).getValue());
+    assertEquals("3", organizer.read(BlobHelper.serializeToBlob("5")).getValue());
+    assertEquals("4", organizer.read(BlobHelper.serializeToBlob("6")).getValue());
+    assertEquals("5", organizer.read(BlobHelper.serializeToBlob("7")).getValue());
+    organizer.close();
+  }
+
+  public void testMajorCompactionWithDelete() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+    // #1
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent("1", "1"));
+    items.add(new TestEvent("2", "1"));
+    items.add(new TestEvent("3", "1"));
+    items.add(new TestEvent("4", "1"));
+    items.add(new TestEvent("4", "10", Operation.DESTROY));
+    organizer.flush(items.iterator(), items.size());
+
+    // #2
+    items.clear();
+    items.add(new TestEvent("2", "1", Operation.DESTROY));
+    items.add(new TestEvent("4", "1", Operation.DESTROY));
+    items.add(new TestEvent("6", "1", Operation.INVALIDATE));
+    items.add(new TestEvent("8", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    // #3
+    items.clear();
+    items.add(new TestEvent("1", "1"));
+    items.add(new TestEvent("3", "1"));
+    items.add(new TestEvent("5", "1"));
+    items.add(new TestEvent("7", "1"));
+    items.add(new TestEvent("9", "1", Operation.DESTROY));
+    organizer.flush(items.iterator(), items.size());
+
+    // #4
+    items.clear();
+    items.add(new TestEvent("0", "1", Operation.DESTROY));
+    items.add(new TestEvent("1", "1"));
+    items.add(new TestEvent("4", "1"));
+    items.add(new TestEvent("5", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    // check file existence in bucket directory, expect 4 hoplgos
+    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(4, hoplogs.length);
+
+    // After compaction expect 1 hoplog only. It should have the same sequence number as that of the
+    // youngest file compacted, which should be 4 in this case
+    organizer.getCompactor().compact(true, false);
+    hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+    assertEquals(1, hoplogs.length);
+    assertEquals(1, organizer.getSortedOplogs().size());
+    Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
+    assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
+
+    // iterate on oplogs to validate data in files
+    HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
+    int count = 0;
+
+    // entries in () are destroyed or invalidated
+    // 1, 2, 3, 4, (11)
+    // (2), (4), (6), 8
+    // 1, 3, 5, 7, (9)
+    // (0), 1, 4, 5
+    String[] expectedValues = { "1", "3", "4", "5", "7", "8" };
+    for (ByteBuffer keyBB = null; iter.hasNext();) {
+      keyBB = iter.next();
+      byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+      assertEquals(expectedValues[count], BlobHelper.deserializeBlob(key));
+      count++;
+    }
+    assertEquals(6, count);
+
+    // there must be 4 expired hoplogs now
+    hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+    assertEquals(4, hoplogs.length);
+    organizer.close();
+  }
+  
+  public void testGainComputation() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+    HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+    ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+    for (int i = 0; i < 10; i++) {
+      targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
+    }    
+
+    // each read has cost 3. Four files read cost is 3 * 4. Reduce read cost of
+    // file after compaction
+    float expect = (float) ((3 * 4.0 - 3) / (20 + 30 + 40 + 50));
+    float result = bucket.computeGain(2, 5, targets);
+    assertTrue(Math.abs(expect - result) < (expect/1000));
+    
+    // each read has cost 3 except 10MB file with read cost 2. 9 files read cost
+    // is 3 * 9. Reduce read cost of file after compaction.
+    expect = (float) ((3 * 9 - 3 - 1.0) / (10 + 20 + 30 + 40 + 50 + 60 + 70 + 80 + 90));
+    result = bucket.computeGain(0, 9, targets);
+    assertTrue(Math.abs(expect - result) < (expect/1000));
+  }
+
+  public void testGainComputeSmallFile() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+    HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+    
+    ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+    for (int i = 0; i < 10; i++) {
+      targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
+    }
+
+    float result = bucket.computeGain(2, 5, targets);
+    assertTrue(Math.abs(8.0 - result) < (1.0/1000));
+  }
+  
+  public void testGainComputeMixedFiles() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+    HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+    
+    ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+    for (int i = 0; i < 10; i++) {
+      targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
+    }
+    TestHoplog midHop = (TestHoplog) targets.get(4).get();
+    // one more than other files
+    midHop.size = 5  * TEN_MB;
+    
+    float expect = (float) ((4 * 2 - 3 + 1.0) / 50);
+    float result = bucket.computeGain(2, 5, targets);
+    System.out.println(expect);
+    System.out.println(result);
+    assertTrue(Math.abs(expect - result) < (expect/1000));
+  }
+  
+  public void testGainComputeBadRatio() throws Exception {
+    HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+    HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+    ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+    for (int i = 0; i < 10; i++) {
+      targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
+    }
+
+    TestHoplog firstHop = (TestHoplog) targets.get(2).get();
+    // one more than other files
+    firstHop.size = (1 + 30 + 40 + 50)  * TEN_MB;
+    Float result = bucket.computeGain(2, 5, targets);
+    assertNull(result);
+  }
+  
+  public void testMinorCompactionTargetMaxSize() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+    ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
+    for (int i = 0; i < 5; i++) {
+      TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+      hop.increment();
+      targets.add(hop);
+    }
+    TrackedReference<TestHoplog> oldestHop = targets.get(targets.size() - 1);
+    TestHoplog thirdHop = (TestHoplog) targets.get(2).get();
+
+    // oldest is more than max size is ignored 
+    oldestHop.get().size = HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB + 100;
+    List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(4, list.size());
+    for (TrackedReference<Hoplog> ref : list) {
+      assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 5 );
+    }
+    
+    // third is more than max size but is not ignored
+    thirdHop.size = HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB + 100;
+    oldestHop.increment();
+    list = (List<TrackedReference<Hoplog>>) targets.clone();
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(4, list.size());
+    int i = 0;
+    for (TrackedReference<Hoplog> ref : list) {
+      if (i != 2) {
+        assertTrue(((TestHoplog) ref.get()).size - TEN_MB < 5);
+      } else {
+        assertTrue(((TestHoplog) ref.get()).size > HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB);
+      }
+      i++;
+    }
+  }
+  
+  public void testAlterMaxInputFileSize() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+    assertTrue(TEN_MB * 2 < hdfsStore.getInputFileSizeMax() * ONE_MB);
+    
+    ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
+    for (int i = 0; i < 5; i++) {
+      TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+      hop.increment();
+      targets.add(hop);
+    }
+    
+    List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(targets.size(), list.size());
+    
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    mutator.setInputFileSizeMax(1);
+    hdfsStore.alter(mutator);
+    
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(0, list.size());
+  }
+  
+  public void testAlterInputFileCount() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+    
+    assertTrue(2 < hdfsStore.getInputFileCountMax());
+    
+    ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
+    for (int i = 0; i < 5; i++) {
+      TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+      hop.increment();
+      targets.add(hop);
+    }
+    
+    List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(targets.size(), list.size());
+    
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    mutator.setInputFileCountMax(2);
+    mutator.setInputFileCountMin(2);
+    hdfsStore.alter(mutator);
+    
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(2, list.size());
+  }
+  
+  public void testAlterMajorCompactionInterval() throws Exception {
+    final AtomicInteger majorCReqCount = new AtomicInteger(0);
+    
+    final Compactor compactor = new AbstractCompactor() {
+      @Override
+      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+        majorCReqCount.incrementAndGet();
+        return true;
+      }
+    };
+    
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+      @Override
+      public synchronized Compactor getCompactor() {
+        return compactor;
+      }
+    };
+
+    // create hoplog in the past, 90 seconds before current time
+    organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
+    TimeUnit.MILLISECONDS.sleep(50);
+    organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
+    
+    alterMajorCompaction(hdfsStore, true);
+    
+    List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+    assertEquals(2, hoplogs.size());
+    
+    organizer.performMaintenance();
+    TimeUnit.MILLISECONDS.sleep(100);
+    assertEquals(0, majorCReqCount.get());
+    
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    mutator.setMajorCompactionInterval(1);
+    hdfsStore.alter(mutator);
+    
+    organizer.performMaintenance();
+    TimeUnit.MILLISECONDS.sleep(100);
+    assertEquals(1, majorCReqCount.get());
+  }
+
+  public void testMinorCompactionTargetMinCount() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+    
+    ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+    for (int i = 0; i < 2; i++) {
+      TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+      hop.increment();
+      targets.add(hop);
+    }
+    compactor.getMinorCompactionTargets(targets, -1);
+    assertEquals(0, targets.size());
+  }
+  
+  public void testMinorCompactionLessTargetsStatsUpdate() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent("1", "1"));
+    organizer.flush(items.iterator(), items.size());
+
+    items.clear();
+    items.add(new TestEvent("2", "2", Operation.DESTROY));
+    organizer.flush(items.iterator(), items.size());
+    
+    TimeUnit.SECONDS.sleep(1);
+    List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+    assertEquals(2, hoplogs.size());
+    
+    organizer.performMaintenance();
+    hoplogs = organizer.getSortedOplogs();
+    assertEquals(2, hoplogs.size());
+  }
+  
+  public void testMinorCompactionTargetsOptimizer() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+    ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+    for (int i = 0; i < 6; i++) {
+      TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+      hop.increment();
+      targets.add(hop);
+    }
+    List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(6, list.size());
+    
+    TestHoplog fifthHop = (TestHoplog) targets.get(4).get();
+    // fifth hop needs additional block read as it has more than max keys size 
+    fifthHop.size = (HdfsSortedOplogOrganizer.AVG_NUM_KEYS_PER_INDEX_BLOCK * 5 + 1) * 64 * 1024;
+    list = (List<TrackedReference<Hoplog>>) targets.clone();
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(4, list.size());
+    for (TrackedReference<Hoplog> ref : list) {
+      assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 4 );
+    }
+  }
+  
+  public void testTargetsReleasedBadRatio() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+    ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+    for (int i = 0; i < 3; i++) {
+      TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+      hop.increment();
+      targets.add(hop);
+    }
+    TestHoplog oldestHop = (TestHoplog) targets.get(2).get();
+    oldestHop.size = (1 + 30)  * TEN_MB;
+    
+    List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+    compactor.getMinorCompactionTargets(list, -1);
+    assertEquals(0, list.size());
+    assertEquals(3, targets.size());
+    for (TrackedReference<Hoplog> ref : targets) {
+      assertEquals(0, ref.uses());
+    }
+  }
+  
+  public void testMinorCTargetsIgnoreMajorC() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 7; i++) {
+      items.clear();
+      items.add(new TestEvent("1" + i, "1" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+    List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
+    compactor.getMinorCompactionTargets(targets, -1);
+    assertEquals(7, targets.size());
+    
+    targets = organizer.getSortedOplogs();
+    for (TrackedReference<Hoplog> ref : targets) {
+      ref.increment();
+    }
+    compactor.getMinorCompactionTargets(targets, 2);
+    assertEquals((7 - 2), targets.size());
+    targets = organizer.getSortedOplogs();
+    for (int i = 0; i < targets.size(); i++) {
+      if (i + 1 <= (7 - 2)) {
+        assertEquals(1, targets.get(i).uses());
+      } else {
+        assertEquals(0, targets.get(i).uses());
+      }
+    }
+    
+    targets = organizer.getSortedOplogs();
+    for (TrackedReference<Hoplog> ref : targets) {
+      if (ref.uses() == 0) {
+        ref.increment();
+      }
+      assertEquals(1, ref.uses());
+    }
+    compactor.getMinorCompactionTargets(targets, 7);
+    assertEquals(0, targets.size());
+    
+    targets = organizer.getSortedOplogs();
+    for (int i = 0; i < targets.size(); i++) {
+      assertEquals(0, targets.get(i).uses());
+    }
+  }
+  
+  public void testTargetOverlap() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 7; i++) {
+      items.clear();
+      items.add(new TestEvent("1" + i, "1" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+    List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
+    assertTrue(compactor.isMinorMajorOverlap(targets, 8));
+    assertTrue(compactor.isMinorMajorOverlap(targets, 7));
+    assertTrue(compactor.isMinorMajorOverlap(targets, 6));
+    assertTrue(compactor.isMinorMajorOverlap(targets, 1));
+    assertFalse(compactor.isMinorMajorOverlap(targets, 0));
+    assertFalse(compactor.isMinorMajorOverlap(targets, -1));
+    
+    targets.remove(targets.size() -1); // remove the last one 
+    targets.remove(targets.size() -1); // remove the last one again
+    assertFalse(compactor.isMinorMajorOverlap(targets, 1));
+    assertFalse(compactor.isMinorMajorOverlap(targets, 2));
+    assertTrue(compactor.isMinorMajorOverlap(targets, 3));
+    
+    targets.remove(3); // remove from the middle, seq num 4
+    assertTrue(compactor.isMinorMajorOverlap(targets, 4));
+    assertTrue(compactor.isMinorMajorOverlap(targets, 3));
+  }
+  
+  public void testSuspendMinorByMajor() throws Exception {
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 5; i++) {
+      items.clear();
+      items.add(new TestEvent("1" + i, "1" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+    Hoplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir + "/"
+        + getName() + "-" + System.currentTimeMillis() + "-1.ihop.tmp"), blockCache, stats, storeStats);
+    compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
+    
+    cache.getLogger().info("<ExpectedException action=add>java.lang.InterruptedException</ExpectedException>");
+    try {
+      compactor.maxMajorCSeqNum.set(3);
+      compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
+      fail();
+    } catch (InterruptedException e) {
+      // expected
+    }
+    cache.getLogger().info("<ExpectedException action=remove>java.lang.InterruptedException</ExpectedException>");
+    organizer.close();
+  }
+  
+  public void testMajorCompactionSetsSeqNum() throws Exception {
+    final CountDownLatch compactionStartedLatch = new CountDownLatch(1);
+    final CountDownLatch waitLatch = new CountDownLatch(1);
+    class MyOrganizer extends HdfsSortedOplogOrganizer {
+      final HoplogCompactor compactor = new MyCompactor();
+      public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
+        super(region, bucketId);
+      }
+      public synchronized Compactor getCompactor() {
+        return compactor;
+      }
+      class MyCompactor extends HoplogCompactor {
+        @Override
+        public long fillCompactionHoplog(boolean isMajor,
+            List<TrackedReference<Hoplog>> targets, Hoplog output,
+            int majorCSeqNum) throws IOException, InterruptedException {
+          compactionStartedLatch.countDown();
+          waitLatch.await();
+          long byteCount = 0;
+          try {
+            byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
+          } catch (ForceReattemptException e) {
+            // we do not expect this in a unit test. 
+          }
+          return byteCount;
+        }
+      }
+    }
+    
+    final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 3; i++) {
+      items.clear();
+      items.add(new TestEvent("1" + i, "1" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    Thread t = new Thread(new Runnable() {
+      public void run() {
+        try {
+          organizer.getCompactor().compact(true, false);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    t.start();
+    compactionStartedLatch.await();
+    assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
+    waitLatch.countDown();
+    t.join();
+  }
+  
+  public void testMinorWatchesMajorsSeqNum() throws Exception {
+    final CountDownLatch majorCStartedLatch = new CountDownLatch(1);
+    final CountDownLatch majorCWaitLatch = new CountDownLatch(1);
+    
+    final CountDownLatch minorCStartedLatch = new CountDownLatch(1);
+    final List<TrackedReference<Hoplog>> minorTargets = new ArrayList<TrackedReference<Hoplog>>();
+    
+    class MyOrganizer extends HdfsSortedOplogOrganizer {
+      final HoplogCompactor compactor = new MyCompactor();
+      public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
+        super(region, bucketId);
+      }
+      public synchronized Compactor getCompactor() {
+        return compactor;
+      }
+      class MyCompactor extends HoplogCompactor {
+        @Override
+        public long fillCompactionHoplog(boolean isMajor,
+            List<TrackedReference<Hoplog>> targets, Hoplog output,
+            int majorCSeqNum) throws IOException, InterruptedException {
+          if (isMajor) {
+            majorCStartedLatch.countDown();
+            majorCWaitLatch.await();
+          } else {
+            minorCStartedLatch.countDown();
+            minorTargets.addAll(targets);
+          }
+          long byteCount =0;
+          try {
+            byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
+          } catch (ForceReattemptException e) {
+            // we do not expect this in a unit test. 
+          }
+          return byteCount;
+        }
+      }
+    }
+    
+    final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 3; i++) {
+      items.clear();
+      items.add(new TestEvent("1" + i, "1" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    Thread majorCThread = new Thread(new Runnable() {
+      public void run() {
+        try {
+          organizer.getCompactor().compact(true, false);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    majorCThread.start();
+    majorCStartedLatch.await();
+    assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
+
+    // create more files for minor C
+    for (int i = 0; i < 4; i++) {
+      items.clear();
+      items.add(new TestEvent("1" + i, "1" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+    
+    Thread minorCThread = new Thread(new Runnable() {
+      public void run() {
+        try {
+          organizer.getCompactor().compact(false, false);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    minorCThread.start();
+    minorCThread.join();
+    assertEquals(4, minorTargets.size());
+    for (TrackedReference<Hoplog> ref : minorTargets) {
+      assertTrue(organizer.getSequenceNumber(ref.get()) >= 4);
+    }
+    
+    majorCWaitLatch.countDown();
+    majorCThread.join();
+  }
+  
+  public void testTimeBoundedSuspend() throws Exception {
+    final AtomicBoolean barrier = new AtomicBoolean(true);
+    
+    class MyOrganizer extends HdfsSortedOplogOrganizer {
+      public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
+        super(region, bucketId);
+      }
+      public synchronized Compactor getCompactor() {
+        return new MyCompactor();
+      }
+      class MyCompactor extends HoplogCompactor {
+        public long fillCompactionHoplog(boolean isMajor, List<TrackedReference<Hoplog>> targets, Hoplog output)
+            throws IOException, InterruptedException {
+          barrier.set(false);
+          TimeUnit.SECONDS.sleep(5 * HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+          long byteCount =0;
+          try {
+            byteCount = super.fillCompactionHoplog(isMajor, targets, output, -1);
+          } catch (ForceReattemptException e) {
+            // we do not expect this in a unit test. 
+          }
+          return byteCount;
+        }
+      }
+    }
+    
+    HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < 4; i++) {
+      items.clear();
+      items.add(new TestEvent("1" + i, "1" + i));
+      organizer.flush(items.iterator(), items.size());
+    }
+
+    final HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+    ExecutorService service = Executors.newCachedThreadPool();
+    service.execute(new Runnable() {
+      public void run() {
+        try {
+          compactor.compact(false, false);
+        } catch (Exception e) {
+        }
+      }
+    });
+    
+    final AtomicLong start = new AtomicLong(0);
+    final AtomicLong end = new AtomicLong(0);
+    service.execute(new Runnable() {
+      public void run() {
+        while (barrier.get()) {
+          try {
+            TimeUnit.MILLISECONDS.sleep(50);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+        
+        start.set(System.currentTimeMillis());
+        compactor.suspend();
+        end.set(System.currentTimeMillis());
+      }
+    });
+    
+    for (long i = 0; i < 5; i++) {
+      if (end.get() == 0) {
+        TimeUnit.MILLISECONDS.sleep(HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT / 2);
+      } else {
+        break;
+      }
+    }
+    
+    assertTrue(end.get() - start.get() < 100 + HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+  }
+  
+  public static class TestHoplog extends AbstractHoplog {
+    long size;
+    long creationTime;
+    TestHoplog(HDFSStoreImpl store, long size) throws IOException {
+      this(store, size, 0);
+    }
+    
+    TestHoplog(HDFSStoreImpl store, long size, long creationTime) throws IOException {
+      super(store, new Path("1-" + creationTime + "-1.hop"), null);
+      this.size = size;
+      this.creationTime = creationTime;
+    }
+    
+    @Override
+    public long getSize() {
+      return size;
+    }
+    @Override
+    public long getModificationTimeStamp() {
+      if (creationTime > 0) {
+        return creationTime;
+      }
+      return super.getModificationTimeStamp();
+    }
+    @Override
+    public String toString() {
+      long name = size -  TEN_MB;
+      if (name < 0) name = size - (TEN_MB / 1024);
+      return name + "";
+    }
+    public boolean isClosed() {
+      return false;
+    }
+    public void close() throws IOException {
+    }
+    public HoplogReader getReader() throws IOException {
+      return null;
+    }
+    public HoplogWriter createWriter(int keys) throws IOException {
+      return null;
+    }
+    public void close(boolean clearCache) throws IOException {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
new file mode 100644
index 0000000..fe15305
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
@@ -0,0 +1,50 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class GFKeyJUnitTest extends TestCase {
+  public void testSerde() throws Exception {
+    String str = "str";
+    GFKey key = new GFKey();
+    key.setKey(str);
+    
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    key.write(dos);
+    
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInputStream dis = new DataInputStream(bais);
+    key.readFields(dis);
+    
+    assertEquals(str, key.getKey());
+  }
+  
+  public void testCompare() {
+    GFKey keya = new GFKey();
+    keya.setKey("a");
+    
+    GFKey keyb = new GFKey();
+    keyb.setKey("b");
+    
+    assertEquals(-1, keya.compareTo(keyb));
+    assertEquals(1, keyb.compareTo(keya));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
new file mode 100644
index 0000000..5ebb00e
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
@@ -0,0 +1,265 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HFileSortedOplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSSplitIteratorJUnitTest extends BaseHoplogTestCase {
+  public void test1Hop1BlockIter() throws Exception {
+    Path path = new Path(testDataDir, "region/0/1-1-1.hop");
+    Hoplog oplog = new HFileSortedOplog(hdfsStore, path, blockCache, stats,
+        storeStats);
+    createHoplog(10, oplog);
+
+    Path[] paths = {path};
+    long[] starts = {0};
+    long[] lengths = {oplog.getSize()};
+    HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    
+    int count = 0;
+    while (iter.hasNext()) {
+      boolean success = iter.next();
+      assertTrue(success);
+      assertEquals("key-" + count, new String((byte[])iter.getKey()));
+      count++;
+    }
+    assertEquals(10, count);
+  }
+  
+  public void test1HopNBlockIter() throws Exception {
+    Path path = new Path(testDataDir, "region/0/1-1-1.hop");
+    Hoplog oplog = new HFileSortedOplog(hdfsStore, path,
+        blockCache, stats, storeStats);
+    createHoplog(2000, oplog);
+    
+    FileSystem fs = hdfsStore.getFileSystem();
+    Reader reader = HFile.createReader(fs, path, new CacheConfig(fs.getConf()));
+    BlockIndexReader bir = reader.getDataBlockIndexReader();
+    int blockCount = bir.getRootBlockCount();
+    reader.close();
+    
+    // make sure there are more than 1 hfile blocks in the hoplog
+    assertTrue(1 < blockCount);
+
+    Path[] paths = {path};
+    long half = oplog.getSize()/2;
+    long[] starts = {0};
+    long[] lengths = {half};
+    HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    
+    int count = 0;
+    while (iter.hasNext()) {
+      boolean success = iter.next();
+      assertTrue(success);
+      assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
+      count++;
+    }
+    // the number of iterations should be less than number of keys inserted in
+    // the hoplog
+    assertTrue(count < 2000 && count > 0);
+
+    paths = new Path[] {path};
+    starts = new long[] {half + 1};
+    lengths = new long[] {oplog.getSize()};
+    iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    
+    while (iter.hasNext()) {
+      boolean success = iter.next();
+      assertTrue(success);
+      assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
+      count++;
+    }
+    assertEquals(2000, count);
+
+    paths = new Path[] {path, path};
+    starts = new long[] {0, half + 1};
+    lengths = new long[] {half, oplog.getSize()};
+    iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    
+    count = 0;
+    while (iter.hasNext()) {
+      boolean success = iter.next();
+      assertTrue(success);
+      assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
+      count++;
+    }
+    assertEquals(2000, count);
+  }
+
+  /*
+   * This tests iterates over 3 hoplog files. The three hoplog files have the
+   * same content. Duplicate keys should not get discarded
+   */
+  public void testNHoplogNBlockIter() throws Exception {
+    Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
+    Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
+        blockCache, stats, storeStats);
+    createHoplog(2000, oplog);
+    
+    FileSystem fs = hdfsStore.getFileSystem();
+    Reader reader = HFile.createReader(fs, path1, new CacheConfig(fs.getConf()));
+    BlockIndexReader bir = reader.getDataBlockIndexReader();
+    int blockCount = bir.getRootBlockCount();
+    reader.close();
+    
+    // make sure there are more than 1 hfile blocks in the hoplog
+    assertTrue(1 < blockCount);
+    
+    Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
+    oplog = new HFileSortedOplog(hdfsStore, path2,
+        blockCache, stats, storeStats);
+    createHoplog(2000, oplog);
+
+    Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
+    oplog = new HFileSortedOplog(hdfsStore, path3,
+        blockCache, stats, storeStats);
+    createHoplog(2000, oplog);
+    
+    Path[] paths = {path1, path2, path3, path1, path2, path3};
+    long half = oplog.getSize()/2;
+    long[] starts = {0, 0, 0, half + 1, half + 1, half + 1};
+    long[] lengths = {half, half, half, oplog.getSize(), oplog.getSize(), oplog.getSize()};
+    HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    
+    int[] keyCounts = new int[2000];
+    while (iter.hasNext()) {
+      boolean success = iter.next();
+      assertTrue(success);
+      String key = new String((byte[])iter.getKey()).substring("key-".length());
+      keyCounts[Integer.valueOf(key) - 100000] ++;
+    }
+    
+    for (int i : keyCounts) {
+      assertEquals(3, i);
+    }
+  }
+  
+  public void testMRLikeNHopIter() throws Exception {
+    Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
+    Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
+        blockCache, stats, storeStats);
+    createHoplog(10, oplog);
+    
+    Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
+    oplog = new HFileSortedOplog(hdfsStore, path2,
+        blockCache, stats, storeStats);
+    createHoplog(10, oplog);
+    
+    Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
+    oplog = new HFileSortedOplog(hdfsStore, path3,
+        blockCache, stats, storeStats);
+    createHoplog(10, oplog);
+    
+    Path[] paths = {path1, path2, path3};
+    long[] starts = {0, 0, 0};
+    long[] lengths = {oplog.getSize(), oplog.getSize(), oplog.getSize()};
+    HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    
+    int[] keyCounts = new int[10];
+    while (iter.hasNext()) {
+      boolean success = iter.next();
+      assertTrue(success);
+      // extra has next before key read
+      iter.hasNext(); 
+      String key = new String((byte[])iter.getKey()).substring("key-".length());
+      System.out.println(key);
+      keyCounts[Integer.valueOf(key)] ++;
+    }
+    
+    for (int i : keyCounts) {
+      assertEquals(3, i);
+    }
+  }
+  
+  public void test1Hop1BlockIterSkipDeletedHoplogs() throws Exception {
+    FileSystem fs = hdfsStore.getFileSystem();
+    Path path = new Path(testDataDir, "region/0/1-1-1.hop");
+    Hoplog oplog = new HFileSortedOplog(hdfsStore, path,
+        blockCache, stats, storeStats);
+    createHoplog(10, oplog);
+
+    Path[] paths = {path};
+    long[] starts = {0};
+    long[] lengths = {oplog.getSize()};
+    
+    //Delete the Hoplog file
+    fs.delete(path, true);
+    
+    HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    assertFalse(iter.hasNext());
+    
+  }
+  
+  public void testMRLikeNHopIterSkipDeletedHoplogs() throws Exception {
+    FileSystem fs = hdfsStore.getFileSystem();
+    //Create Hoplogs
+    Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
+    Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
+        blockCache, stats, storeStats);
+    createHoplog(10, oplog);
+    
+    Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
+    oplog = new HFileSortedOplog(hdfsStore, path2,
+        blockCache, stats, storeStats);
+    createHoplog(10, oplog);
+    
+    Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
+    oplog = new HFileSortedOplog(hdfsStore, path3,
+        blockCache, stats, storeStats);
+    createHoplog(10, oplog);
+    
+    Path[] paths = {path1, path2, path3};
+    long[] starts = {0, 0, 0};
+    long[] lengths = {oplog.getSize(), oplog.getSize(), oplog.getSize()};
+    HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+        hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+    int count = 0;
+    while (iter.hasNext()) {
+      boolean success = iter.next();
+      assertTrue(success);
+      count++;
+    }
+    assertEquals(30, count);
+    
+    for(int i = 0; i < 3; ++i){
+      fs.delete(paths[i], true);
+      iter = HDFSSplitIterator.newInstance(
+          hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+      count = 0;
+      while (iter.hasNext()) {
+        boolean success = iter.next();
+        assertTrue(success);
+        count++;
+      }
+      assertEquals(20, count);
+      oplog = new HFileSortedOplog(hdfsStore, paths[i],
+          blockCache, stats, storeStats);
+      createHoplog(10, oplog);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
new file mode 100644
index 0000000..a209b6e
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
@@ -0,0 +1,305 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HdfsSortedOplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HoplogUtilJUnitTest extends BaseHoplogTestCase {
+  Path regionPath = null;
+  
+  @Override
+  protected void configureHdfsStoreFactory() throws Exception {
+    super.configureHdfsStoreFactory();
+    
+    hsf.setInputFileCountMin(3);
+    hsf.setMinorCompaction(false);
+    hsf.setMajorCompaction(false);
+  }
+  
+  public void testHoplogListingMultiBucket() throws Exception {
+    createHoplogs();
+
+    Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    assertEquals(5, hdfsStore.getFileSystem().listStatus(regionPath).length);
+    assertEquals(15, hoplogs.size());
+  }
+
+  public void testHoplogListingMixFileTypes() throws Exception {
+    createHoplogs();
+
+    HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+    organizer.getCompactor().compact(false, false);
+
+    Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    assertEquals(7,
+        hdfsStore.getFileSystem().listStatus(new Path(regionPath, "0")).length);
+    assertEquals(15, hoplogs.size());
+  }
+
+  public void testHoplogListingEmptyBucket() throws Exception {
+    createHoplogs();
+
+    hdfsStore.getFileSystem().mkdirs(new Path(regionPath, "100"));
+
+    Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    assertEquals(6, hdfsStore.getFileSystem().listStatus(regionPath).length);
+    assertEquals(15, hoplogs.size());
+  }
+
+  public void testHoplogListingInvalidBucket() throws Exception {
+    createHoplogs();
+
+    hdfsStore.getFileSystem().rename(new Path(regionPath, "0"),
+        new Path(regionPath, "not_a_bucket"));
+
+    Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    assertEquals(5, hdfsStore.getFileSystem().listStatus(regionPath).length);
+    assertEquals(12, hoplogs.size());
+  }
+
+  public void testHoplogListingInvalidFiles() throws Exception {
+    createHoplogs();
+
+    Path bucketPath = new Path(regionPath, "0");
+    FSDataOutputStream stream = hdfsStore.getFileSystem().create(
+        new Path(bucketPath, "not_a_hoplog"));
+    stream.close();
+
+    Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+    assertEquals(4, hdfsStore.getFileSystem().listStatus(bucketPath).length);
+    assertEquals(15, hoplogs.size());
+  }
+
+  public void testTimeRange() throws Exception {
+    createHoplogs();
+    // rename hoplogs for testing purpose
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+        regionManager, 0);
+    List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+    assertEquals(3, hoplogs.size());
+    hoplogs.get(0).get().rename("0-300-1.hop");
+    hoplogs.get(1).get().rename("0-310-1.hop");
+    hoplogs.get(2).get().rename("0-320-1.hop");
+    organizer.close();
+
+    organizer = new HdfsSortedOplogOrganizer(regionManager, 3);
+    hoplogs = organizer.getSortedOplogs();
+    assertEquals(3, hoplogs.size());
+    hoplogs.get(0).get().rename("0-600-1.hop");
+    hoplogs.get(1).get().rename("0-610-1.hop");
+    hoplogs.get(2).get().rename("0-620-1.hop");
+    organizer.close();
+
+    organizer = new HdfsSortedOplogOrganizer(regionManager, 6);
+    hoplogs = organizer.getSortedOplogs();
+    assertEquals(3, hoplogs.size());
+    hoplogs.get(0).get().rename("0-100-1.hop");
+    hoplogs.get(1).get().rename("0-110-1.hop");
+    hoplogs.get(2).get().rename("0-120-1.hop");
+
+    Collection<FileStatus> filtered = HoplogUtil.getRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 300, 305);
+    assertEquals(5, filtered.size());
+    assertTrue(containsHoplogWithName(filtered, "0-300-1.hop"));
+    assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+    assertTrue(containsHoplogWithName(filtered, "0-600-1.hop"));
+
+    filtered = HoplogUtil.getRegionHoplogs(regionPath,
+        hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 250, 310);
+    assertEquals(6, filtered.size());
+    assertTrue(containsHoplogWithName(filtered, "0-300-1.hop"));
+    assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+    assertTrue(containsHoplogWithName(filtered, "0-320-1.hop"));
+
+    filtered = HoplogUtil.getRegionHoplogs(regionPath,
+        hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 301, 311);
+    assertEquals(5, filtered.size());
+    assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+    assertTrue(containsHoplogWithName(filtered, "0-320-1.hop"));
+
+    filtered = HoplogUtil.getRegionHoplogs(regionPath,
+        hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 301, 309);
+    assertEquals(4, filtered.size());
+    assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+    organizer.close();
+  }
+  
+  public void testExcludeSoonCleanedHoplogs() throws Exception {
+    FileSystem fs = hdfsStore.getFileSystem();
+    Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+        regionManager, 0);
+    //delete the auto generated clean up interval file   
+    if (fs.exists(cleanUpIntervalPath)){
+      fs.delete(cleanUpIntervalPath, true);
+    }
+    
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    int count = 10;
+    for (int fileCount = 0; fileCount < 3; fileCount++) {
+      items.clear();
+      for (int itemCount = 0; itemCount < count; itemCount++) {
+        items.add(new TestEvent(("key-" + itemCount), "value"));
+      }
+      organizer.flush(items.iterator(), count);
+    }
+    List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+    
+    for(TrackedReference<Hoplog> hoplog : hoplogs) {
+      Path p = new Path(testDataDir, getName() + "/0/" +
+          hoplog.get().getFileName() + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+      fs.createNewFile(p);
+    }
+    Collection<FileStatus> files = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(3, files.size());
+    
+    TimeUnit.MINUTES.sleep(2);
+    //No clean up interval file, all expired files will be included
+    files = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(3, files.size());
+    
+    
+    long interval = 1 * 60 * 1000;
+    HoplogUtil.exposeCleanupIntervalMillis(fs,cleanUpIntervalPath,interval);
+    
+    files = HoplogUtil.getAllRegionHoplogs(
+        regionPath, hdfsStore.getFileSystem(),
+        AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+    assertEquals(0, files.size());
+    organizer.close();  
+  }
+  
+  
+  public void testCheckpointSelection() throws Exception {
+    createHoplogs();
+    // rename hoplogs for testing purpose
+    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+        regionManager, 0);
+    List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+    assertEquals(3, hoplogs.size());
+    hoplogs.get(0).get().rename("0-300-1.chop");
+    hoplogs.get(1).get().rename("0-310-1.hop");
+    hoplogs.get(2).get().rename("0-320-1.hop"); // checkpoint file
+    organizer.close();
+    
+    organizer = new HdfsSortedOplogOrganizer(regionManager, 3);
+    hoplogs = organizer.getSortedOplogs();
+    assertEquals(3, hoplogs.size());
+    hoplogs.get(0).get().rename("0-600-1.hop");
+    hoplogs.get(1).get().rename("0-610-1.chop");
+    hoplogs.get(2).get().rename("0-620-1.hop");
+    organizer.close();
+    
+    organizer = new HdfsSortedOplogOrganizer(regionManager, 6);
+    hoplogs = organizer.getSortedOplogs();
+    assertEquals(3, hoplogs.size());
+    hoplogs.get(0).get().rename("0-100-1.hop");
+    hoplogs.get(1).get().rename("0-110-1.hop");
+    hoplogs.get(2).get().rename("0-120-1.chop");
+    
+    Collection<FileStatus> filtered = HoplogUtil.filterHoplogs(
+        hdfsStore.getFileSystem(), regionPath, 290, 305, false);
+    assertEquals(4, filtered.size());
+    assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+    assertTrue(containsHoplogWithName(filtered, "0-600-1.hop"));
+    
+    filtered = HoplogUtil.filterHoplogs(hdfsStore.getFileSystem(),
+        regionPath, 290, 305, true);
+    assertEquals(3, filtered.size());
+    assertTrue(containsHoplogWithName(filtered, "0-300-1.chop"));
+    assertTrue(containsHoplogWithName(filtered, "0-610-1.chop"));
+    assertTrue(containsHoplogWithName(filtered, "0-120-1.chop"));
+    organizer.close();
+  }
+  
+  private boolean containsHoplogWithName(Collection<FileStatus> filtered,
+      String name) {
+    for (FileStatus file : filtered) {
+      if (file.getPath().getName().equals(name)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void createHoplogs() throws IOException, Exception {
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    int count = 10;
+    for (int bucketId = 0; bucketId < 15; bucketId += 3) {
+      HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager,
+          bucketId);
+      for (int fileCount = 0; fileCount < 3; fileCount++) {
+        items.clear();
+        for (int itemCount = 0; itemCount < count; itemCount++) {
+          items.add(new TestEvent(("key-" + itemCount), "value"));
+        }
+        organizer.flush(items.iterator(), count);
+      }
+    }
+  }
+  
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    regionPath = new Path(testDataDir, getName());
+  }
+  
+  @Override 
+  protected void tearDown() throws Exception{
+    FileSystem fs = hdfsStore.getFileSystem();
+    Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(),HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+    if (fs.exists(cleanUpIntervalPath)){
+      fs.delete(cleanUpIntervalPath, true);
+    }  
+    super.tearDown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
index 7e4acbf..f1b9746 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
@@ -308,7 +308,7 @@ public class Bug38741DUnitTest extends ClientServerTestCase {
             BucketRegion br = (BucketRegion) r;
             try {
               KeyInfo keyInfo = new KeyInfo(k1, null, bucketId);
-              RawValue rv = br.getSerialized(keyInfo, false, false, null, null, false);
+              RawValue rv = br.getSerialized(keyInfo, false, false, null, null, false, false);
               Object val = rv.getRawValue();
               assertTrue(val instanceof CachedDeserializable);
               CachedDeserializable cd = (CachedDeserializable)val;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java
new file mode 100644
index 0000000..5e2ba4f
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java
@@ -0,0 +1,33 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+/**
+ * Tests regions operations when entries are not yet persisted
+ * in HDFS but are in HDFSAsyncQueue
+ * @author sbawaska
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSQueueRegionOperationsJUnitTest extends
+    HDFSRegionOperationsJUnitTest {
+
+  @Override
+  protected int getBatchTimeInterval() {
+    return 50*1000;
+  }
+
+  @Override
+  protected void sleep(String regionPath) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java
new file mode 100644
index 0000000..f28c138
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java
@@ -0,0 +1,54 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Properties;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSQueueRegionOperationsOffHeapJUnitTest extends HDFSQueueRegionOperationsJUnitTest {
+  static {
+    System.setProperty("gemfire.trackOffHeapRefCounts", "true");
+  }
+  
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    OffHeapTestUtil.checkOrphans();
+  }
+  @Override
+  protected Region<Integer, String> createRegion(String regionName) {
+    RegionFactory<Integer, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+    PartitionAttributes prAttr = new PartitionAttributesFactory().setTotalNumBuckets(10).create();
+    rf.setPartitionAttributes(prAttr);
+    rf.setOffHeap(true);
+    rf.setHDFSStoreName(hdfsStore.getName());
+    Region<Integer, String> r = rf.create(regionName);
+//    addListener(r);
+    
+    ((PartitionedRegion) r).setQueryHDFS(true);
+    return r;
+  }
+  @Override
+  protected Properties getDSProps() {
+    Properties props = super.getDSProps();
+    props.setProperty("off-heap-memory-size", "50m");
+    return props;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java
new file mode 100644
index 0000000..6cf9c6a
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java
@@ -0,0 +1,542 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.FixMethodOrder;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAlgorithm;
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.hll.HyperLogLog;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+/**
+ * Tests that region operations work as expected when data is in HDFS.
+ * This test explicitly clears in-memory ConcurrentHashMap that back
+ * AbstractRegionMap before validating region operations.
+ * 
+ * @author sbawaska
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSRegionOperationsJUnitTest extends TestCase {
+
+  protected Cache cache;
+  protected HDFSStore hdfsStore;
+
+  public void setUp() throws Exception {
+    Properties props = getDSProps();
+    cache = new CacheFactory(props).create();
+    System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+    String storeName = getName()+"-store";
+    HDFSStoreFactory hsf = cache.createHDFSStoreFactory();
+    hsf.setHomeDir(getName()+"-test");
+    hsf.setBatchInterval(getBatchTimeInterval());
+    hdfsStore = hsf.create(storeName);
+  }
+
+  protected Properties getDSProps() {
+    Properties props = new Properties();
+    props.put("mcast-port", "0");
+    props.put("locators", "");
+    props.put("log-level", "config");
+    return props;
+  }
+
+  public void tearDown() throws Exception {
+    for (Region r : cache.rootRegions()) {
+      if (r != null) {
+        r.close();
+      }
+    }
+
+    if (cache.getRegion(getName()) != null) {
+      cache.getRegion(getName()).destroyRegion();
+    }
+    DiskStore ds = cache.findDiskStore(null);
+    if (ds != null) {
+      ds.destroy();
+    }
+    
+    ((HDFSStoreImpl)hdfsStore).getFileSystem().delete(new Path(hdfsStore.getHomeDir()), true);
+  }
+
+  protected int getBatchTimeInterval() {
+    return 1000;
+  }
+
+  protected Region<Integer, String> createRegion(String regionName) {
+    RegionFactory<Integer, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+    PartitionAttributes prAttr = new PartitionAttributesFactory().setTotalNumBuckets(10).create();
+    rf.setPartitionAttributes(prAttr);
+    rf.setHDFSStoreName(hdfsStore.getName());
+    Region<Integer, String> r = rf.create(regionName);
+    
+    ((PartitionedRegion) r).setQueryHDFS(true);
+    return r;
+  }
+
+  protected void clearBackingCHM(Region<Integer, String> r) {
+    PartitionedRegion pr = (PartitionedRegion)r;
+    for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+      assertTrue(br.getRegionMap() instanceof HDFSRegionMap);
+      ((AbstractRegionMap)br.getRegionMap())._getMap().clear();
+      // wait here to make sure that the queue has been flushed
+    }
+    sleep(pr.getFullPath());
+  }
+
+  protected void sleep(String regionPath) {
+    String qname = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
+    GemFireCacheImpl.getExisting().waitForSenderQueueFlush(qname, true, 30);
+  }
+
+  public void test010PUTDMLSupport() {
+    Region<Integer, String> r = createRegion(getName());
+    SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + getName());
+    assertEquals(0, stats.getRead().getCount());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    assertEquals(100, stats.getRead().getCount());
+    sleep(r.getFullPath());
+    clearBackingCHM(r);
+    LocalRegion lr = (LocalRegion) r;
+    for (int i=0; i<200; i++) {
+      EntryEventImpl ev = lr.newPutEntryEvent(i, "value"+i, null);
+      lr.validatedPut(ev, System.currentTimeMillis());
+    }
+    // verify that read count on HDFS does not change
+    assertEquals(100, stats.getRead().getCount());
+    sleep(r.getFullPath());
+    clearBackingCHM(r);
+    for (int i=0; i<200; i++) {
+      assertEquals("value"+i, r.get(i));
+    }
+    if (getBatchTimeInterval() > 1000) {
+      // reads from async queue
+      assertEquals(100, stats.getRead().getCount());
+    } else {
+      assertEquals(300, stats.getRead().getCount());
+    }
+  }
+
+  public void test020GetOperationalData() throws Exception {
+    Region<Integer, String> r = createRegion(getName());
+    SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + getName());
+    assertEquals(0, stats.getRead().getCount());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    int expectedReadsFromHDFS = 100;
+    assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+    sleep(r.getFullPath());
+    clearBackingCHM(r);
+    LocalRegion lr = (LocalRegion) r;
+    for (int i=0; i<200; i++) {
+      if (i < 100) {
+        assertEquals("value"+i, r.get(i));
+      } else {
+        assertNull(r.get(i));
+      }
+    }
+    if (getBatchTimeInterval() > 1000) {
+      // reads from async queue
+      expectedReadsFromHDFS = 200; // initial 100 + 100 for misses
+    } else {
+      expectedReadsFromHDFS = 300; // initial 100 + 200 for reads
+    }
+    assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+    for (int i=0; i<200; i++) {
+      assertNull(lr.get(i, null, true, false, false, null, null, false, false/*allowReadFromHDFS*/));
+    }
+    // no increase in HDFS reads
+    assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+    
+    /**MergeGemXDHDFSToGFE Have not merged this API as this api is not called by any code*/ 
+    //   test the dataView API
+    //for (int i=0; i<200; i++) {
+    //  assertNull(lr.getDataView().getLocally(i, null, i%10, lr, true, true, null, null, false, false/*allowReadFromHDFS*/));
+    //}
+    // no increase in HDFS reads
+    assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+  }
+  
+  public void test030RemoveOperationalData() throws Exception {
+    Region<Integer, String> r = createRegion(getName());
+    SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + getName());
+    assertEquals(0, stats.getRead().getCount());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    int expectedReadsFromHDFS = 100;
+    assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+    sleep(r.getFullPath());
+    PartitionedRegion lr = (PartitionedRegion) r;
+    for(int i =0; i < 50; i++) {
+      lr.getBucketRegion(i).customEvictDestroy(i);
+    }
+    for (int i=0; i<200; i++) {
+      if (i < 100) {
+        assertEquals("value"+i, r.get(i));
+      } else {
+        assertNull(r.get(i));
+      }
+    }
+    if (getBatchTimeInterval() > 1000) {
+      // reads from async queue
+      expectedReadsFromHDFS = 200; // initial 100 + 100 for misses
+    } else {
+      expectedReadsFromHDFS = 250; // initial 100 + 200 for reads + 50 for 
+    }
+    assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+    for (int i=0; i<50; i++) {
+      assertNull(lr.get(i, null, true, false, false, null,  null, false, false/*allowReadFromHDFS*/));
+    }
+    for (int i=50; i<100; i++) {
+      assertEquals("value"+i, lr.get(i, null, true, false, false, null,null, false, false/*allowReadFromHDFS*/));
+    }
+    for (int i=100; i<200; i++) {
+      assertNull(lr.get(i, null, true, false, false, null,  null, false, false/*allowReadFromHDFS*/));
+    }
+    // no increase in HDFS reads
+    assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+  }
+
+  public void _test040NoAutoEviction() throws Exception {
+    if (!cache.isClosed()) {
+      tearDown();
+      cache.close();
+      System.setProperty("gemfire.disableAutoEviction", "true");
+      setUp();
+    }
+    Region<Integer, String> r = createRegion(getName());
+    System.setProperty("gemfire.disableAutoEviction", "false");
+    for (int i =0; i<5; i++) {
+      r.put(i, "value"+i);
+    }
+    PartitionedRegion pr = (PartitionedRegion) r;
+    BucketRegion br = pr.getBucketRegion(1);
+    assertNotNull(br.getAttributes().getEvictionAttributes());
+    assertEquals(EvictionAlgorithm.NONE, br.getAttributes().getEvictionAttributes().getAlgorithm());
+
+    GemFireCacheImpl cache = (GemFireCacheImpl) r.getCache();
+    assertEquals(0.0f, cache.getResourceManager().getEvictionHeapPercentage());
+  }
+
+  public void test050LRURegionAttributesForPR() {
+    RegionFactory<Integer, String> rf = cache.createRegionFactory();
+    rf.setHDFSStoreName(hdfsStore.getName());
+    rf.setDataPolicy(DataPolicy.HDFS_PARTITION);
+    verifyLRURegionAttributesForPR(rf.create(getName()));
+  }
+
+  public void test060LRURegionAttributesForRegionShortcutPR() {
+    verifyLRURegionAttributesForPR(createRegion(getName()));
+  }
+
+  private void verifyLRURegionAttributesForPR(Region r) {
+    for (int i =0; i<200; i++) {
+      r.put(i, "value"+i);
+    }
+    RegionAttributes<Integer, String> ra = r.getAttributes();
+    assertNotNull(ra.getEvictionAttributes());
+    // default eviction action for region shortcut
+    assertEquals(EvictionAction.OVERFLOW_TO_DISK, ra.getEvictionAttributes().getAction());
+
+    GemFireCacheImpl cache = (GemFireCacheImpl) r.getCache();
+    assertEquals(80.0f, cache.getResourceManager().getEvictionHeapPercentage());
+    DiskStore ds = cache.findDiskStore(null);
+    assertNotNull(ds);
+    Set s = cache.getResourceManager().getResourceListeners(ResourceType.HEAP_MEMORY);
+    Iterator it = s.iterator();
+    boolean regionFound = false;
+    while (it.hasNext()) {
+      Object o = it.next();
+      if (o instanceof PartitionedRegion) {
+        PartitionedRegion pr = (PartitionedRegion) o;
+        if (getName().equals(pr.getName())) {
+          regionFound = true;
+        } else {
+          continue;
+        }
+        for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+          assertNotNull(br.getAttributes().getEvictionAttributes());
+          assertEquals(EvictionAlgorithm.LRU_HEAP, br.getAttributes().getEvictionAttributes().getAlgorithm());
+          assertEquals(EvictionAction.OVERFLOW_TO_DISK, br.getAttributes().getEvictionAttributes().getAction());
+        }
+      }
+    }
+    assertTrue(regionFound);
+
+  }
+
+  public void test070SizeEstimate() {
+    Region<Integer, String> r = createRegion(getName());
+    int size = 226;
+    Random rand = new Random();
+    for (int i=0; i<size; i++) {
+      r.put(rand.nextInt(), "value"+i);
+    }
+    // size before flush
+    LocalRegion lr = (LocalRegion) r;
+    long estimate = lr.sizeEstimate();
+    double err = Math.abs(estimate - size) / (double) size;
+    // on a busy system flush might start before we call estimateSize, so rather than equality,
+    // test for error margin. fixes bug 49381
+    assertTrue("size:"+size+" estimate:"+estimate, err < 0.02 * 10); // each bucket can have an error of 0.02
+
+    // size after flush
+    sleep(r.getFullPath());
+    estimate = lr.sizeEstimate();
+    err = Math.abs(estimate - size) / (double) size;
+    assertTrue("size:"+size+" estimate:"+estimate, err < 0.02 * 10); // each bucket can have an error of 0.02
+  }
+
+  public void test080PutGet() throws InterruptedException {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    for (int i=0; i<100; i++) {
+      assertEquals("value"+i, r.get(i));
+    }
+    
+    //Do a put while there are entries in the map
+    r.put(0, "value"+0);
+    
+    r.destroy(1, "value"+1);
+  }
+
+  public void test090Delete() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<11; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    int delKey = 9;
+    r.destroy(delKey);
+    assertNull(r.get(delKey));
+    assertFalse(r.containsKey(delKey));
+  }
+
+  public void test100Invalidate() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    int invKey = 9;
+    r.invalidate(invKey);
+    assertNull(r.get(invKey));
+    assertTrue(r.containsKey(invKey));
+  }
+
+  public void test110Size() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    assertEquals(100, r.size());
+    r.destroy(45);
+    assertEquals(99, r.size());
+    r.invalidate(55);
+    r.invalidate(65);
+    assertEquals(99, r.size());
+  }
+
+  public void test120KeyIterator() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    Set<Integer> keys = r.keySet();
+    int c = 0;
+    for (int i : keys) {
+//      assertEquals(c, i);
+      c++;
+    }
+    assertEquals(100, c);
+    assertEquals(100, keys.size());
+    int delKey = 88;
+    r.destroy(delKey);
+    r.invalidate(39);
+    keys = r.keySet();
+    c = 0;
+    for (int i : keys) {
+      if (c == delKey) {
+        c++;
+      }
+//      assertEquals(c, i);
+      c++;
+    }
+    assertEquals(99, keys.size());
+  }
+
+  public void test130EntriesIterator() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    Set<Entry<Integer, String>> entries = r.entrySet();
+    int c = 0;
+    for (Entry<Integer, String> e : entries) {
+//      assertEquals(c, (int) e.getKey());
+      assertEquals("value"+e.getKey(), e.getValue());
+      c++;
+    }
+    assertEquals(100, c);
+    assertEquals(100, entries.size());
+    int delKey = 88;
+    r.destroy(delKey);
+    int invKey = 39;
+    r.invalidate(invKey);
+    entries = r.entrySet();
+    c = 0;
+    for (Entry<Integer, String> e : entries) {
+      if (c == delKey) {
+        c++;
+      } else if (e.getKey() == invKey) {
+//        assertEquals(c, (int) e.getKey());
+        assertNull(e.getValue());
+      } else {
+//        assertEquals(c, (int) e.getKey());
+        assertEquals("value"+e.getKey(), e.getValue());
+      }
+      c++;
+    }
+    assertEquals(99, entries.size());
+  }
+
+  public void test140ContainsKey() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    assertTrue(r.containsKey(80));
+    r.destroy(80);
+    assertFalse(r.containsKey(80));
+    r.invalidate(64);
+    assertTrue(r.containsKey(64));
+  }
+
+  public void test150ContainsValue() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    assertTrue(r.containsValue("value45"));
+    r.destroy(45);
+    assertFalse(r.containsValue("value45"));
+    r.invalidate(64);
+    assertFalse(r.containsValue("value64"));
+  }
+
+  public void test160DestroyRegion() {
+    Region<Integer, String> r = createRegion(getName());
+    for (int i=0; i<100; i++) {
+      r.put(i, "value"+i);
+    }
+    clearBackingCHM(r);
+    r.destroyRegion();
+    try {
+      r.get(3);
+      fail("expected exception not thrown");
+    } catch (RegionDestroyedException expected) {
+    }
+  }
+
+  public void test170PutIfAbsent() {
+    Region<Integer, String> r = createRegion(getName());
+    r.put(1, "value1");
+    clearBackingCHM(r);
+    assertEquals("value1", r.putIfAbsent(1, "value2"));
+  }
+
+  public void test180Replace() {
+    Region<Integer, String> r = createRegion(getName());
+    assertNull(r.replace(1, "value"));
+    r.put(1, "value1");
+    clearBackingCHM(r);
+    assertEquals("value1", r.replace(1, "value2"));
+  }
+
+  public void test190ReplaceKVV() {
+    Region<Integer, String> r = createRegion(getName());
+    assertFalse(r.replace(1, "oldValue", "newValue"));
+    r.put(1, "value1");
+    clearBackingCHM(r);
+    assertTrue(r.replace(1, "value1", "value2"));
+  }
+
+  public void test200Accuracy() throws IOException {
+    double sum=0.0;
+    int iter = 10;
+    for (int t=0; t<iter; t++) {
+      Random r = new Random();
+      HashSet<Integer> vals = new HashSet<Integer>();
+      HyperLogLog hll = new HyperLogLog(0.03);
+      //HyperLogLog hll = new HyperLogLog(0.1);
+      double accuracy = 0.0;
+      for (int i = 0; i < 2 * 1000000; i++) {
+        int val = r.nextInt();
+        vals.add(val);
+        hll.offer(val);
+      }
+      long size = vals.size();
+      long est = hll.cardinality();
+      
+      accuracy = 100.0 * (size - est) / est;
+      System.out.printf("Accuracy is %f hll size is %d\n", accuracy, hll.getBytes().length);
+      sum+=Math.abs(accuracy);
+    }
+    double avgAccuracy = sum/(iter*1.0);
+    System.out.println("Avg accuracy is:"+avgAccuracy);
+    assertTrue(avgAccuracy < 6);
+  }
+}