You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:55 UTC

[09/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java
new file mode 100644
index 0000000..a1c9eb1
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java
@@ -0,0 +1,520 @@
+ /*=========================================================================
+   * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+   * This product is protected by U.S. and international copyright
+   * and intellectual property laws. Pivotal products are covered by
+   * one or more patents listed at http://www.pivotal.io/patents.
+   *=========================================================================
+   */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheXmlException;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.junit.experimental.categories.Category;
+
+/**
+ * A test class for testing the configuration option for HDFS 
+ * 
+ * @author Hemant Bhanawat
+ * @author Ashvin Agrawal
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSConfigJUnitTest extends TestCase {
+  private GemFireCacheImpl c;
+
+  public HDFSConfigJUnitTest() {
+    super();
+  }
+
+  @Override
+  public void setUp() {
+    System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+    this.c = createCache();
+    AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
+  }
+
+  @Override
+  public void tearDown() {
+    this.c.close();
+  }
+    
+    public void testHDFSStoreCreation() throws Exception {
+      this.c.close();
+      this.c = createCache();
+      try {
+        HDFSStoreFactory hsf = this.c.createHDFSStoreFactory();
+        HDFSStore store = hsf.create("myHDFSStore");
+        RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+        Region r1 = rf1.setHDFSStoreName("myHDFSStore").create("r1");
+       
+        r1.put("k1", "v1");
+        
+        assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+        assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+        assertEquals(false, r1.getAttributes().getHDFSWriteOnly());
+        assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+        assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 3600", store.getWriteOnlyFileRolloverInterval() == 3600);
+        assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 256MB", store.getWriteOnlyFileRolloverSize() == 256);
+        this.c.close();
+        
+        
+        this.c = createCache();
+        hsf = this.c.createHDFSStoreFactory();
+        hsf.create("myHDFSStore");
+        
+        r1 = this.c.createRegionFactory(RegionShortcut.PARTITION_WRITEONLY_HDFS_STORE).setHDFSStoreName("myHDFSStore")
+              .create("r1");
+       
+        r1.put("k1", "v1");
+        assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+        assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+        assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: true", r1.getAttributes().getHDFSWriteOnly()== true);
+        assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+        assertTrue("Mismatch in attributes, actual.batchInterval: " + store.getBatchInterval() + " and expected batchsize: 60000", store.getBatchInterval()== 60000);
+        assertTrue("Mismatch in attributes, actual.isDiskSynchronous: " + store.getSynchronousDiskWrite() + " and expected isDiskSynchronous: true", store.getSynchronousDiskWrite()== true);
+        
+        this.c.close();
+
+        this.c = createCache();
+        
+        File directory = new File("HDFS" + "_disk_"
+            + System.currentTimeMillis());
+        directory.mkdir();
+        File[] dirs1 = new File[] { directory };
+        DiskStoreFactory dsf = this.c.createDiskStoreFactory();
+        dsf.setDiskDirs(dirs1);
+        dsf.create("mydisk");
+        
+        
+        hsf = this.c.createHDFSStoreFactory();
+        hsf.setBatchSize(50);
+        hsf.setDiskStoreName("mydisk");
+        hsf.setBufferPersistent(true);
+        hsf.setBatchInterval(50);
+        hsf.setSynchronousDiskWrite(false);
+        hsf.setHomeDir("/home/hemant");
+        hsf.setNameNodeURL("mymachine");
+        hsf.setWriteOnlyFileRolloverSize(1);
+        hsf.setWriteOnlyFileRolloverInterval(10);
+        hsf.create("myHDFSStore");
+        
+        
+        r1 = this.c.createRegionFactory(RegionShortcut.PARTITION_WRITEONLY_HDFS_STORE).setHDFSStoreName("myHDFSStore")
+            .setHDFSWriteOnly(true).create("r1");
+       
+        r1.put("k1", "v1");
+        store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+        
+        assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 50", store.getBatchSize()== 50);
+        assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: true", store.getBufferPersistent()== true);
+        assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: true", r1.getAttributes().getHDFSWriteOnly()== true);
+        assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: mydisk", store.getDiskStoreName()== "mydisk");
+        assertTrue("Mismatch in attributes, actual.HDFSStoreName: " + r1.getAttributes().getHDFSStoreName() + " and expected getDiskStoreName: myHDFSStore", r1.getAttributes().getHDFSStoreName()== "myHDFSStore");
+        assertTrue("Mismatch in attributes, actual.getFolderPath: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir() + " and expected getDiskStoreName: /home/hemant", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir()== "/home/hemant");
+        assertTrue("Mismatch in attributes, actual.getNamenode: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL()+ " and expected getDiskStoreName: mymachine", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL()== "mymachine");
+        assertTrue("Mismatch in attributes, actual.batchInterval: " + store.getBatchInterval() + " and expected batchsize: 50 ", store.getBatchSize()== 50);
+        assertTrue("Mismatch in attributes, actual.isDiskSynchronous: " + store.getSynchronousDiskWrite() + " and expected isPersistent: false", store.getSynchronousDiskWrite()== false);
+        assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 10", store.getWriteOnlyFileRolloverInterval() == 10);
+        assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 1MB", store.getWriteOnlyFileRolloverSize() == 1);
+        this.c.close();
+      } finally {
+        this.c.close();
+      }
+    }
+       
+    public void testCacheXMLParsing() throws Exception {
+      try {
+        this.c.close();
+
+        Region r1 = null;
+
+        // use a cache.xml to recover
+        this.c = createCache();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        PrintWriter pw = new PrintWriter(new OutputStreamWriter(baos), true); 
+        pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+//      pw.println("<?xml version=\"1.0\"?>");
+//      pw.println("<!DOCTYPE cache PUBLIC");
+//      pw.println("  \"-//GemStone Systems, Inc.//GemFire Declarative Caching 7.5//EN\"");
+//      pw.println("  \"http://www.gemstone.com/dtd/cache7_5.dtd\">");
+        pw.println("<cache ");
+        pw.println("xmlns=\"http://schema.pivotal.io/gemfire/cache\"");
+        pw.println("xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"");
+        pw.println(" xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"");
+        pw.println("version=\"9.0\">");
+
+        pw.println("  <hdfs-store name=\"myHDFSStore\" namenode-url=\"mynamenode\"  home-dir=\"mypath\" />");
+        pw.println("  <region name=\"r1\" refid=\"PARTITION_HDFS\">");
+        pw.println("    <region-attributes hdfs-store-name=\"myHDFSStore\"/>");
+        pw.println("  </region>");
+        pw.println("</cache>");
+        pw.close();
+        byte[] bytes = baos.toByteArray();  
+        this.c.loadCacheXml(new ByteArrayInputStream(bytes));
+        
+        r1 = this.c.getRegion("/r1");
+        HDFSStoreImpl store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+        r1.put("k1", "v1");
+        assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+        assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+        assertEquals(false, r1.getAttributes().getHDFSWriteOnly());
+        assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+        assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 3600", store.getWriteOnlyFileRolloverInterval() == 3600);
+        assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 256MB", store.getWriteOnlyFileRolloverSize() == 256);
+        
+        this.c.close();
+        
+        // use a cache.xml to recover
+        this.c = createCache();
+        baos = new ByteArrayOutputStream();
+        pw = new PrintWriter(new OutputStreamWriter(baos), true);
+        pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+//      pw.println("<?xml version=\"1.0\"?>");
+//      pw.println("<!DOCTYPE cache PUBLIC");
+//      pw.println("  \"-//GemStone Systems, Inc.//GemFire Declarative Caching 7.5//EN\"");
+//      pw.println("  \"http://www.gemstone.com/dtd/cache7_5.dtd\">");
+        pw.println("<cache ");
+        pw.println("xmlns=\"http://schema.pivotal.io/gemfire/cache\"");
+        pw.println("xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"");
+        pw.println(" xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"");
+        pw.println("version=\"9.0\">");
+        pw.println("  <hdfs-store name=\"myHDFSStore\" namenode-url=\"mynamenode\"  home-dir=\"mypath\" />");
+        pw.println("  <region name=\"r1\" refid=\"PARTITION_WRITEONLY_HDFS_STORE\">");
+        pw.println("    <region-attributes hdfs-store-name=\"myHDFSStore\"/>");
+        pw.println("  </region>");
+        pw.println("</cache>");
+        pw.close();
+        bytes = baos.toByteArray();  
+        this.c.loadCacheXml(new ByteArrayInputStream(bytes));
+        
+        r1 = this.c.getRegion("/r1");
+        store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+        r1.put("k1", "v1");
+        assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+        assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+        assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: false", r1.getAttributes().getHDFSWriteOnly()== false);
+        assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+        
+        this.c.close();
+        
+        // use a cache.xml to recover
+        this.c = createCache();
+        baos = new ByteArrayOutputStream();
+        pw = new PrintWriter(new OutputStreamWriter(baos), true);
+        pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+//        pw.println("<?xml version=\"1.0\"?>");
+//        pw.println("<!DOCTYPE cache PUBLIC");
+//        pw.println("  \"-//GemStone Systems, Inc.//GemFire Declarative Caching 7.5//EN\"");
+//        pw.println("  \"http://www.gemstone.com/dtd/cache7_5.dtd\">");
+        pw.println("<cache ");
+        pw.println("xmlns=\"http://schema.pivotal.io/gemfire/cache\"");
+        pw.println("xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"");
+        pw.println(" xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"");
+        pw.println("version=\"9.0\">");
+
+        pw.println("  <disk-store name=\"mydiskstore\"/>");
+        pw.println("  <hdfs-store name=\"myHDFSStore\" namenode-url=\"mynamenode\"  home-dir=\"mypath\" max-write-only-file-size=\"1\" write-only-file-rollover-interval=\"10\" ");
+        pw.println("    batch-size=\"151\" buffer-persistent =\"true\" disk-store=\"mydiskstore\" synchronous-disk-write=\"false\" batch-interval=\"50\"");
+        pw.println("  />");
+        pw.println("  <region name=\"r1\" refid=\"PARTITION_WRITEONLY_HDFS_STORE\">");
+        pw.println("    <region-attributes hdfs-store-name=\"myHDFSStore\" hdfs-write-only=\"false\">");
+        pw.println("    </region-attributes>");
+        pw.println("  </region>");
+        pw.println("</cache>");
+        pw.close();
+        bytes = baos.toByteArray();
+        this.c.loadCacheXml(new ByteArrayInputStream(bytes));
+        
+        r1 = this.c.getRegion("/r1");
+        store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+        r1.put("k1", "v1");
+        assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 151", store.getBatchSize()== 151);
+        assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: true", store.getBufferPersistent()== true);
+        assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: true", r1.getAttributes().getHDFSWriteOnly()== false);
+        assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: mydiskstore", store.getDiskStoreName().equals("mydiskstore"));
+        assertTrue("Mismatch in attributes, actual.HDFSStoreName: " + r1.getAttributes().getHDFSStoreName() + " and expected getDiskStoreName: myHDFSStore", r1.getAttributes().getHDFSStoreName().equals("myHDFSStore"));
+        assertTrue("Mismatch in attributes, actual.getFolderPath: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir() + " and expected getDiskStoreName: mypath", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir().equals("mypath"));
+        assertTrue("Mismatch in attributes, actual.getNamenode: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL()+ " and expected getDiskStoreName: mynamenode", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL().equals("mynamenode"));
+        assertTrue("Mismatch in attributes, actual.batchInterval: " + store.getBatchInterval() + " and expected batchsize: 50", store.getBatchInterval()== 50);
+        assertTrue("Mismatch in attributes, actual.isDiskSynchronous: " + store.getSynchronousDiskWrite() + " and expected isDiskSynchronous: false", store.getSynchronousDiskWrite()== false);
+        assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 10", store.getWriteOnlyFileRolloverInterval() == 10);
+        assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 1MB", store.getWriteOnlyFileRolloverSize() == 1);
+        
+        this.c.close();
+      } finally {
+          this.c.close();
+      }
+    }
+   
+  /**
+   * Validates if hdfs store conf is getting completely and correctly parsed
+   */
+  public void testHdfsStoreConfFullParsing() {
+    String conf = createStoreConf("123");
+    this.c.loadCacheXml(new ByteArrayInputStream(conf.getBytes()));
+    HDFSStoreImpl store = ((GemFireCacheImpl)this.c).findHDFSStore("store");
+    assertEquals("namenode url mismatch.", "url", store.getNameNodeURL());
+    assertEquals("home-dir mismatch.", "dir", store.getHomeDir());
+    assertEquals("hdfs-client-config-file mismatch.", "client", store.getHDFSClientConfigFile());
+    assertEquals("read-cache-size mismatch.", 24.5f, store.getBlockCacheSize());
+    
+    assertFalse("compaction auto-compact mismatch.", store.getMinorCompaction());
+    assertTrue("compaction auto-major-compact mismatch.", store.getMajorCompaction());
+    assertEquals("compaction max-concurrency", 23, store.getMinorCompactionThreads());
+    assertEquals("compaction max-major-concurrency", 27, store.getMajorCompactionThreads());
+    assertEquals("compaction major-interval", 711, store.getPurgeInterval());
+  }
+  
+  /**
+   * Validates that the config defaults are set even with minimum XML configuration 
+   */
+  public void testHdfsStoreConfMinParse() {
+    this.c.loadCacheXml(new ByteArrayInputStream(XML_MIN_CONF.getBytes()));
+    HDFSStoreImpl store = ((GemFireCacheImpl)this.c).findHDFSStore("store");
+    assertEquals("namenode url mismatch.", "url", store.getNameNodeURL());
+    assertEquals("home-dir mismatch.", "gemfire", store.getHomeDir());
+    
+    assertTrue("compaction auto-compact mismatch.", store.getMinorCompaction());
+    assertTrue("compaction auto-major-compact mismatch.", store.getMajorCompaction());
+    assertEquals("compaction max-input-file-size mismatch.", 512, store.getInputFileSizeMax());
+    assertEquals("compaction min-input-file-count.", 4, store.getInputFileCountMin());
+    assertEquals("compaction max-iteration-size.", 10, store.getInputFileCountMax());
+    assertEquals("compaction max-concurrency", 10, store.getMinorCompactionThreads());
+    assertEquals("compaction max-major-concurrency", 2, store.getMajorCompactionThreads());
+    assertEquals("compaction major-interval", 720, store.getMajorCompactionInterval());
+    assertEquals("compaction cleanup-interval", 30, store.getPurgeInterval());
+  }
+  
+  /**
+   * Validates that cache creation fails if a compaction configuration is
+   * provided which is not applicable to the selected compaction strategy
+   */
+  public void testHdfsStoreInvalidCompactionConf() {
+    String conf = createStoreConf("123");
+    try {
+      this.c.loadCacheXml(new ByteArrayInputStream(conf.getBytes()));
+      // expected
+    } catch (CacheXmlException e) {
+      fail();
+    }
+  }
+  
+  /**
+   * Validates that cache creation fails if a compaction configuration is
+   * provided which is not applicable to the selected compaction strategy
+   */
+  public void testInvalidConfigCheck() throws Exception {
+    this.c.close();
+
+    this.c = createCache();
+
+    HDFSStoreFactory hsf; 
+    hsf = this.c.createHDFSStoreFactory();
+    
+    try {
+      hsf.setInputFileSizeMax(-1);
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+    try {
+      hsf.setInputFileCountMin(-1);
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+    try {
+      hsf.setInputFileCountMax(-1);
+      //expected
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+    }
+    try {
+      hsf.setMinorCompactionThreads(-1);
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+    try {
+      hsf.setMajorCompactionInterval(-1);
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+    try {
+      hsf.setMajorCompactionThreads(-1);
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+    try {
+      hsf.setPurgeInterval(-1);
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+    try {
+      hsf.setInputFileCountMin(2);
+      hsf.setInputFileCountMax(1);
+      hsf.create("test");
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+    try {
+      hsf.setInputFileCountMax(1);
+      hsf.setInputFileCountMin(2);
+      hsf.create("test");
+      fail("validation failed");
+    } catch (IllegalArgumentException e) {
+      //expected
+    }
+  }
+  
+  /**
+   * Validates cache creation fails if invalid integer size configuration is provided
+   * @throws Exception
+   */
+  public void testHdfsStoreConfInvalidInt() throws Exception {
+    String conf = createStoreConf("NOT_INTEGER");
+    try {
+      this.c.loadCacheXml(new ByteArrayInputStream(conf.getBytes()));
+      fail();
+    } catch (CacheXmlException e) {
+      // expected
+    }
+  }
+  
+
+  private static String XML_MIN_CONF = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n"
+  + "<cache \n"
+  + "xmlns=\"http://schema.pivotal.io/gemfire/cache\"\n"
+  + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+  + " xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"\n"
+  + "version=\"9.0\">" +
+          "  <hdfs-store name=\"store\" namenode-url=\"url\" />" +
+          "</cache>";
+   
+  private static String XML_FULL_CONF = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n"
+                                        + "<cache \n"
+                                        + "xmlns=\"http://schema.pivotal.io/gemfire/cache\"\n"
+                                        + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+                                        + " xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"\n"
+                                        + "version=\"9.0\">"
+      + "  <hdfs-store name=\"store\" namenode-url=\"url\" "
+      + "              home-dir=\"dir\" "
+      + "              read-cache-size=\"24.5\" "
+      + "              max-write-only-file-size=\"FILE_SIZE_CONF\" "
+      + "              minor-compaction-threads = \"23\""
+      + "              major-compaction-threads = \"27\""
+      + "              major-compaction=\"true\" "
+      + "              minor-compaction=\"false\" "
+      + "              major-compaction-interval=\"781\" "
+      + "              purge-interval=\"711\" hdfs-client-config-file=\"client\" />\n"
+      + "</cache>";
+  // potential replacement targets
+  String FILE_SIZE_CONF_SUBSTRING = "FILE_SIZE_CONF";
+  
+  private String createStoreConf(String fileSize) {
+    String result = XML_FULL_CONF;
+    
+    String replaceWith = (fileSize == null) ? "123" : fileSize;
+    result = result.replaceFirst(FILE_SIZE_CONF_SUBSTRING, replaceWith);
+
+    return result;
+  }
+  
+  public void _testBlockCacheConfiguration() throws Exception {
+    this.c.close();
+    this.c = createCache();
+    try {
+      HDFSStoreFactory hsf = this.c.createHDFSStoreFactory();
+      
+      //Configure a block cache to cache about 20 blocks.
+      long heapSize = HeapMemoryMonitor.getTenuredPoolMaxMemory();
+      int blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL;
+      int blockCacheSize = 5 * blockSize;
+      int entrySize = blockSize / 2;
+      
+      
+      float percentage = 100 * (float) blockCacheSize / (float) heapSize;
+      hsf.setBlockCacheSize(percentage);
+      HDFSStoreImpl store = (HDFSStoreImpl) hsf.create("myHDFSStore");
+      RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+      //Create a region that evicts everything
+      LocalRegion r1 = (LocalRegion) rf1.setHDFSStoreName("myHDFSStore").setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(1)).create("r1");
+     
+      //Populate about many times our block cache size worth of data
+      //We want to try to cache at least 5 blocks worth of index and metadata
+      byte[] value = new byte[entrySize];
+      int numEntries = 10 * blockCacheSize / entrySize;
+      for(int i = 0; i < numEntries; i++) {
+        r1.put(i, value);
+      }
+
+      //Wait for the events to be written to HDFS.
+      Set<String> queueIds = r1.getAsyncEventQueueIds();
+      assertEquals(1, queueIds.size());
+      AsyncEventQueueImpl queue = (AsyncEventQueueImpl) c.getAsyncEventQueue(queueIds.iterator().next());
+      long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(120);
+      while(queue.size() > 0 && System.nanoTime() < end) {
+        Thread.sleep(10);
+      }
+      assertEquals(0, queue.size());
+      
+      
+      Thread.sleep(10000);
+
+      //Do some reads to cache some blocks. Note that this doesn't
+      //end up caching data blocks, just index and bloom filters blocks.
+      for(int i = 0; i < numEntries; i++) {
+        r1.get(i);
+      }
+      
+      long statSize = store.getStats().getBlockCache().getBytesCached();
+      assertTrue("Block cache stats expected to be near " + blockCacheSize + " was " + statSize, 
+          blockCacheSize / 2  < statSize &&
+          statSize <=  2 * blockCacheSize);
+      
+      long currentSize = store.getBlockCache().getCurrentSize();
+      assertTrue("Block cache size expected to be near " + blockCacheSize + " was " + currentSize, 
+          blockCacheSize / 2  < currentSize &&
+          currentSize <= 2 * blockCacheSize);
+      
+    } finally {
+      this.c.close();
+    }
+  }
+
+  protected GemFireCacheImpl createCache() {
+    return (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").set("log-level", "info")
+    .create();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
new file mode 100644
index 0000000..75dfa93
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
@@ -0,0 +1,227 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.internal.ParallelAsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedListForAsyncQueueJUnitTest.KeyValue;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@SuppressWarnings("rawtypes")
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSEntriesSetJUnitTest extends TestCase {
+  private GemFireCacheImpl cache;
+  private HDFSStoreImpl store;
+  private PartitionedRegion region;
+  private BucketRegion bucket;
+  private HDFSParallelGatewaySenderQueue queue;
+  
+  private HDFSBucketRegionQueue brq;
+  private HoplogOrganizer hdfs;
+  
+  public void setUp() throws Exception {
+    System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+    cache = (GemFireCacheImpl) new CacheFactory()
+        .set("mcast-port", "0")
+        .set("log-level", "info")
+        .create();
+    
+    HDFSStoreFactory hsf = this.cache.createHDFSStoreFactory();
+    hsf.setHomeDir("hoplogs");
+    store = (HDFSStoreImpl) hsf.create("test");
+
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(1);
+    
+    RegionFactory rf = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+    region = (PartitionedRegion) rf.setHDFSStoreName("test").setPartitionAttributes(paf.create()).create("test");
+    
+    // prime the region so buckets get created
+    region.put("test", "test");
+    GatewaySenderAttributes g = new GatewaySenderAttributes();
+    g.isHDFSQueue = true;
+    g.id = "HDFSEntriesSetJUnitTest_Queue";
+    ParallelAsyncEventQueueImpl gatewaySender = new ParallelAsyncEventQueueImpl(cache, g);
+    Set<Region> set = new HashSet<Region>();
+    set.add(region);
+    
+    queue = new HDFSParallelGatewaySenderQueue(gatewaySender, set, 0, 1);
+    brq = (HDFSBucketRegionQueue)((PartitionedRegion) queue.getRegion()).getDataStore().getLocalBucketById(0);
+    bucket = region.getDataStore().getLocalBucketById(0);
+        
+    HdfsRegionManager mgr = HDFSRegionDirector.getInstance().manageRegion(region, "test", null);
+    hdfs =  mgr.<SortedHoplogPersistedEvent>create(0);
+    AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
+  }
+  
+  public void tearDown() throws Exception {
+    store.getFileSystem().delete(new Path("hoplogs"), true);
+    hdfs.close();
+    
+    cache.close();
+  }
+  
+  public void testEmptyIterator() throws Exception {
+    checkIteration(Collections.<String>emptyList(), new KeyValue[] { }, new KeyValue[] { });
+  }
+  
+  public void testQueueOnlyIterator() throws Exception {
+    KeyValue[] qvals = new KeyValue[] {
+      new KeyValue("K0", "0"),
+      new KeyValue("K1", "1"),
+      new KeyValue("K2", "2"),
+      new KeyValue("K3", "3"),
+      new KeyValue("K4", "4")
+    };
+    checkIteration(getExpected(), qvals, new KeyValue[] { });
+  }
+  
+  public void testHdfsOnlyIterator() throws Exception {
+    KeyValue[] hvals = new KeyValue[] {
+      new KeyValue("K0", "0"),
+      new KeyValue("K1", "1"),
+      new KeyValue("K2", "2"),
+      new KeyValue("K3", "3"),
+      new KeyValue("K4", "4")
+    };
+    checkIteration(getExpected(), new KeyValue[] { }, hvals);
+  }
+  
+  public void testUnevenIterator() throws Exception {
+    KeyValue[] qvals = new KeyValue[] {
+        new KeyValue("K0", "0"),
+        new KeyValue("K2", "2"),
+      };
+
+    KeyValue[] hvals = new KeyValue[] {
+      new KeyValue("K1", "1"),
+      new KeyValue("K3", "3"),
+      new KeyValue("K4", "4")
+    };
+    
+    checkIteration(getExpected(), qvals, hvals);
+  }
+
+  public void testEitherOrIterator() throws Exception {
+    KeyValue[] qvals = new KeyValue[] {
+        new KeyValue("K0", "0"),
+        new KeyValue("K2", "2"),
+        new KeyValue("K4", "4")
+      };
+
+    KeyValue[] hvals = new KeyValue[] {
+      new KeyValue("K1", "1"),
+      new KeyValue("K3", "3")
+    };
+    
+    checkIteration(getExpected(), qvals, hvals);
+  }
+
+  public void testDuplicateIterator() throws Exception {
+    KeyValue[] qvals = new KeyValue[] {
+        new KeyValue("K0", "0"),
+        new KeyValue("K1", "1"),
+        new KeyValue("K2", "2"),
+        new KeyValue("K3", "3"),
+        new KeyValue("K4", "4"),
+        new KeyValue("K4", "4")
+      };
+
+    KeyValue[] hvals = new KeyValue[] {
+        new KeyValue("K0", "0"),
+        new KeyValue("K1", "1"),
+        new KeyValue("K2", "2"),
+        new KeyValue("K3", "3"),
+        new KeyValue("K4", "4"),
+        new KeyValue("K4", "4")
+    };
+    
+    checkIteration(getExpected(), qvals, hvals);
+  }
+
+  private List<String> getExpected() {
+    List<String> expected = new ArrayList<String>();
+    expected.add("0");
+    expected.add("1");
+    expected.add("2");
+    expected.add("3");
+    expected.add("4");
+    return expected;
+  }
+  
+  private void checkIteration(List<String> expected, KeyValue[] qvals, KeyValue[] hvals) 
+  throws Exception {
+    int seq = 0;
+    List<PersistedEventImpl> evts = new ArrayList<PersistedEventImpl>();
+    for (KeyValue kv : hvals) {
+      evts.add(new SortedHDFSQueuePersistedEvent(getNewEvent(kv.key, kv.value, seq++)));
+    }
+    hdfs.flush(evts.iterator(), evts.size());
+
+    for (KeyValue kv : qvals) {
+      queue.put(getNewEvent(kv.key, kv.value, seq++));
+    }
+
+    List<String> actual = new ArrayList<String>();
+    Iterator vals = new HDFSEntriesSet(bucket, brq, hdfs, IteratorType.VALUES, null).iterator();
+    while (vals.hasNext()) {
+      Object val = vals.next();
+      if(val instanceof CachedDeserializable) {
+        val = ((CachedDeserializable) val).getDeserializedForReading();
+      }
+      actual.add((String) val);
+    }
+    
+    assertEquals(expected, actual);
+  }
+  
+  private HDFSGatewayEventImpl getNewEvent(Object key, Object value, long seq) throws Exception {
+    EntryEventImpl evt = EntryEventImpl.create(region, Operation.CREATE,
+        key, value, null, false, (DistributedMember) cache.getMyId());
+    
+    evt.setEventId(new EventID(cache.getDistributedSystem()));
+    HDFSGatewayEventImpl event = new HDFSGatewayEventImpl(EnumListenerEvent.AFTER_CREATE, evt, null, true, 0);
+    event.setShadowKey(seq);
+    
+    return event;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
new file mode 100644
index 0000000..b8cbb0d
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
@@ -0,0 +1,191 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HdfsStoreMutatorJUnitTest extends BaseHoplogTestCase {
+  public void testMutatorInitialState() {
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    assertEquals(-1, mutator.getWriteOnlyFileRolloverInterval());
+    assertEquals(-1, mutator.getWriteOnlyFileRolloverSize());
+    
+    assertEquals(-1, mutator.getInputFileCountMax());
+    assertEquals(-1, mutator.getInputFileSizeMax());
+    assertEquals(-1, mutator.getInputFileCountMin());
+    assertEquals(-1, mutator.getMinorCompactionThreads());
+    assertNull(mutator.getMinorCompaction());
+    
+    assertEquals(-1, mutator.getMajorCompactionInterval());
+    assertEquals(-1, mutator.getMajorCompactionThreads());
+    assertNull(mutator.getMajorCompaction());
+    
+    assertEquals(-1, mutator.getPurgeInterval());
+    
+    assertEquals(-1, mutator.getBatchSize());
+    assertEquals(-1, mutator.getBatchInterval());
+  }
+  
+  public void testMutatorSetInvalidValue() {
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+
+    try {
+      mutator.setWriteOnlyFileRolloverInterval(-3);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      mutator.setWriteOnlyFileRolloverSize(-5);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    
+    try {
+      mutator.setInputFileCountMin(-1);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      mutator.setInputFileCountMax(-1);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      mutator.setInputFileSizeMax(-1);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      mutator.setMinorCompactionThreads(-9);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      mutator.setMajorCompactionInterval(-6);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      mutator.setMajorCompactionThreads(-1);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      mutator.setPurgeInterval(-4);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+/*    try {
+      qMutator.setBatchSizeMB(-985);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+    try {
+      qMutator.setBatchTimeInterval(-695);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+*/    
+    try {
+      mutator.setInputFileCountMin(10);
+      mutator.setInputFileCountMax(5);
+      hdfsStore.alter(mutator);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+  }
+  
+  public void testMutatorReturnsUpdatedValues() {
+    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+    
+    mutator.setWriteOnlyFileRolloverInterval(121);
+    mutator.setWriteOnlyFileRolloverSize(234);
+    
+    mutator.setInputFileCountMax(87);
+    mutator.setInputFileSizeMax(45);
+    mutator.setInputFileCountMin(34);
+    mutator.setMinorCompactionThreads(843);
+    mutator.setMinorCompaction(false);
+
+    mutator.setMajorCompactionInterval(26);
+    mutator.setMajorCompactionThreads(92);
+    mutator.setMajorCompaction(false);
+    
+    mutator.setPurgeInterval(328);
+    
+    mutator.setBatchSize(985);
+    mutator.setBatchInterval(695);
+    
+    assertEquals(121, mutator.getWriteOnlyFileRolloverInterval());
+    assertEquals(234, mutator.getWriteOnlyFileRolloverSize());
+    
+    assertEquals(87, mutator.getInputFileCountMax());
+    assertEquals(45, mutator.getInputFileSizeMax());
+    assertEquals(34, mutator.getInputFileCountMin());
+    assertEquals(843, mutator.getMinorCompactionThreads());
+    assertFalse(mutator.getMinorCompaction());
+    
+    assertEquals(26, mutator.getMajorCompactionInterval());
+    assertEquals(92, mutator.getMajorCompactionThreads());
+    assertFalse(mutator.getMajorCompaction());
+    
+    assertEquals(328, mutator.getPurgeInterval());
+    
+    assertEquals(985, mutator.getBatchSize());
+    assertEquals(695, mutator.getBatchInterval());
+    
+    // repeat the cycle once more
+    mutator.setWriteOnlyFileRolloverInterval(14);
+    mutator.setWriteOnlyFileRolloverSize(56);
+    
+    mutator.setInputFileCountMax(93);
+    mutator.setInputFileSizeMax(85);
+    mutator.setInputFileCountMin(64);
+    mutator.setMinorCompactionThreads(59);
+    mutator.setMinorCompaction(true);
+    
+    mutator.setMajorCompactionInterval(26);
+    mutator.setMajorCompactionThreads(92);
+    mutator.setMajorCompaction(false);
+    
+    mutator.setPurgeInterval(328);
+    
+    assertEquals(14, mutator.getWriteOnlyFileRolloverInterval());
+    assertEquals(56, mutator.getWriteOnlyFileRolloverSize());
+    
+    assertEquals(93, mutator.getInputFileCountMax());
+    assertEquals(85, mutator.getInputFileSizeMax());
+    assertEquals(64, mutator.getInputFileCountMin());
+    assertEquals(59, mutator.getMinorCompactionThreads());
+    assertTrue(mutator.getMinorCompaction());
+    
+    assertEquals(26, mutator.getMajorCompactionInterval());
+    assertEquals(92, mutator.getMajorCompactionThreads());
+    assertFalse(mutator.getMajorCompaction());
+    
+    assertEquals(328, mutator.getPurgeInterval());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
new file mode 100644
index 0000000..290f8d1
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
@@ -0,0 +1,415 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.File;
+import java.io.IOException;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.FileUtil;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * A class for testing the recovery after restart for GemFire cluster that has
+ * HDFS regions
+ * 
+ * @author Hemant Bhanawat
+ */
+@SuppressWarnings({ "serial", "deprecation", "rawtypes" })
+public class RegionRecoveryDUnitTest extends CacheTestCase {
+  public RegionRecoveryDUnitTest(String name) {
+    super(name);
+  }
+
+  private static String homeDir = null;
+
+  @Override
+  public void preTearDownCacheTestCase() throws Exception {
+    for (int h = 0; h < Host.getHostCount(); h++) {
+      Host host = Host.getHost(h);
+      SerializableCallable cleanUp = cleanUpStores();
+      for (int v = 0; v < host.getVMCount(); v++) {
+        VM vm = host.getVM(v);
+        vm.invoke(cleanUp);
+      }
+    }
+    super.preTearDownCacheTestCase();
+  }
+
+  public SerializableCallable cleanUpStores() throws Exception {
+    SerializableCallable cleanUp = new SerializableCallable() {
+      public Object call() throws Exception {
+        if (homeDir != null) {
+          // Each VM will try to delete the same directory. But that's okay as
+          // the subsequent invocations will be no-ops.
+          FileUtil.delete(new File(homeDir));
+          homeDir = null;
+        }
+        return 0;
+      }
+    };
+    return cleanUp;
+  }
+
+  /**
+   * Tests a basic restart of the system. Events if in HDFS should be read back.
+   * The async queue is not persisted so we wait until async queue persists the
+   * items to HDFS.
+   * 
+   * @throws Exception
+   */
+  public void testBasicRestart() throws Exception {
+    disconnectFromDS();
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Going two level up to avoid home directories getting created in
+    // VM-specific directory. This avoids failures in those tests where
+    // datastores are restarted and bucket ownership changes between VMs.
+    homeDir = "../../testBasicRestart";
+    String uniqueName = "testBasicRestart";
+
+    createServerRegion(vm0, 11, 1, 500, 500, homeDir, uniqueName);
+    createServerRegion(vm1, 11, 1, 500, 500, homeDir, uniqueName);
+    createServerRegion(vm2, 11, 1, 500, 500, homeDir, uniqueName);
+    createServerRegion(vm3, 11, 1, 500, 500, homeDir, uniqueName);
+
+    doPuts(vm0, uniqueName, 1, 50);
+    doPuts(vm1, uniqueName, 40, 100);
+    doPuts(vm2, uniqueName, 40, 100);
+    doPuts(vm3, uniqueName, 90, 150);
+
+    cacheClose(vm0, true);
+    cacheClose(vm1, true);
+    cacheClose(vm2, true);
+    cacheClose(vm3, true);
+
+    createServerRegion(vm0, 11, 1, 500, 500, homeDir, uniqueName);
+    createServerRegion(vm1, 11, 1, 500, 500, homeDir, uniqueName);
+    createServerRegion(vm2, 11, 1, 500, 500, homeDir, uniqueName);
+    createServerRegion(vm3, 11, 1, 500, 500, homeDir, uniqueName);
+
+    verifyGetsForValue(vm0, uniqueName, 1, 50, false);
+    verifyGetsForValue(vm1, uniqueName, 40, 100, false);
+    verifyGetsForValue(vm2, uniqueName, 40, 100, false);
+    verifyGetsForValue(vm3, uniqueName, 90, 150, false);
+
+    cacheClose(vm0, false);
+    cacheClose(vm1, false);
+    cacheClose(vm2, false);
+    cacheClose(vm3, false);
+
+    disconnectFromDS();
+
+  }
+
+  /**
+   * Servers are stopped and restarted. Disabled due to bug 48067.
+   */
+  public void testPersistedAsyncQueue_Restart() throws Exception {
+    disconnectFromDS();
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Going two level up to avoid home directories getting created in
+    // VM-specific directory. This avoids failures in those tests where
+    // datastores are restarted and bucket ownership changes between VMs.
+    homeDir = "../../testPersistedAsyncQueue_Restart";
+    String uniqueName = "testPersistedAsyncQueue_Restart";
+
+    // create cache and region
+    createPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
+    createPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
+    createPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
+    createPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
+
+    // do some puts
+    AsyncInvocation a0 = doAsyncPuts(vm0, uniqueName, 1, 50);
+    AsyncInvocation a1 = doAsyncPuts(vm1, uniqueName, 40, 100);
+    AsyncInvocation a2 = doAsyncPuts(vm2, uniqueName, 40, 100);
+    AsyncInvocation a3 = doAsyncPuts(vm3, uniqueName, 90, 150);
+
+    a3.join();
+    a2.join();
+    a1.join();
+    a0.join();
+
+    // close the cache
+    cacheClose(vm0, true);
+    cacheClose(vm1, true);
+    cacheClose(vm2, true);
+    cacheClose(vm3, true);
+
+    // recreate the cache and regions
+    a3 = createAsyncPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
+    a2 = createAsyncPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
+    a1 = createAsyncPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
+    a0 = createAsyncPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
+
+    a3.join();
+    a2.join();
+    a1.join();
+    a0.join();
+
+    // these gets should probably fetch the data from async queue
+    verifyGetsForValue(vm0, uniqueName, 1, 50, false);
+    verifyGetsForValue(vm1, uniqueName, 40, 100, false);
+    verifyGetsForValue(vm2, uniqueName, 40, 100, false);
+    verifyGetsForValue(vm3, uniqueName, 90, 150, false);
+
+    // these gets wait for sometime before fetching the data. this will ensure
+    // that the reads are done from HDFS
+    verifyGetsForValue(vm0, uniqueName, 1, 50, true);
+    verifyGetsForValue(vm1, uniqueName, 40, 100, true);
+    verifyGetsForValue(vm2, uniqueName, 40, 100, true);
+    verifyGetsForValue(vm3, uniqueName, 90, 150, true);
+
+    cacheClose(vm0, false);
+    cacheClose(vm1, false);
+    cacheClose(vm2, false);
+    cacheClose(vm3, false);
+
+    disconnectFromDS();
+  }
+
+  /**
+   * Stops a single server. A different node becomes primary for the buckets on
+   * the stopped node. Everything should work fine. Disabled due to bug 48067
+   * 
+   */
+  public void testPersistedAsyncQueue_ServerRestart() throws Exception {
+    disconnectFromDS();
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Going two level up to avoid home directories getting created in
+    // VM-specific directory. This avoids failures in those tests where
+    // datastores are restarted and bucket ownership changes between VMs.
+    homeDir = "../../testPAQ_ServerRestart";
+    String uniqueName = "testPAQ_ServerRestart";
+
+    createPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
+    createPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
+    createPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
+    createPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
+
+    AsyncInvocation a0 = doAsyncPuts(vm0, uniqueName, 1, 50);
+    AsyncInvocation a1 = doAsyncPuts(vm1, uniqueName, 50, 75);
+    AsyncInvocation a2 = doAsyncPuts(vm2, uniqueName, 75, 100);
+    AsyncInvocation a3 = doAsyncPuts(vm3, uniqueName, 100, 150);
+
+    a3.join();
+    a2.join();
+    a1.join();
+    a0.join();
+
+    cacheClose(vm0, false);
+
+    // these gets should probably fetch the data from async queue
+    verifyGetsForValue(vm1, uniqueName, 1, 50, false);
+    verifyGetsForValue(vm2, uniqueName, 40, 100, false);
+    verifyGetsForValue(vm3, uniqueName, 70, 150, false);
+
+    // these gets wait for sometime before fetching the data. this will ensure
+    // that
+    // the reads are done from HDFS
+    verifyGetsForValue(vm2, uniqueName, 1, 100, true);
+    verifyGetsForValue(vm3, uniqueName, 40, 150, true);
+
+    cacheClose(vm1, false);
+    cacheClose(vm2, false);
+    cacheClose(vm3, false);
+
+    disconnectFromDS();
+  }
+
+  private int createPersistedServerRegion(final VM vm, final int totalnumOfBuckets,
+      final int batchSize, final int batchInterval, final int maximumEntries, 
+      final String folderPath, final String uniqueName) throws IOException {
+    
+    return (Integer) vm.invoke(new PersistedRegionCreation(vm, totalnumOfBuckets,
+      batchSize, batchInterval, maximumEntries, folderPath, uniqueName));
+  }
+  private AsyncInvocation createAsyncPersistedServerRegion(final VM vm, final int totalnumOfBuckets,
+      final int batchSize, final int batchInterval, final int maximumEntries, final String folderPath, 
+      final String uniqueName) throws IOException {
+    
+    return (AsyncInvocation) vm.invokeAsync(new PersistedRegionCreation(vm, totalnumOfBuckets,
+      batchSize, batchInterval, maximumEntries, folderPath, uniqueName));
+  }
+  
+  class PersistedRegionCreation extends SerializableCallable {
+    private VM vm;
+    private int totalnumOfBuckets;
+    private int batchSize;
+    private int maximumEntries;
+    private String folderPath;
+    private String uniqueName;
+    private int batchInterval;
+
+    PersistedRegionCreation(final VM vm, final int totalnumOfBuckets,
+        final int batchSize, final int batchInterval, final int maximumEntries,
+        final String folderPath, final String uniqueName) throws IOException {
+      this.vm = vm;
+      this.totalnumOfBuckets = totalnumOfBuckets;
+      this.batchSize = batchSize;
+      this.maximumEntries = maximumEntries;
+      this.folderPath = new File(folderPath).getCanonicalPath();
+      this.uniqueName = uniqueName;
+      this.batchInterval = batchInterval;
+    }
+
+    public Object call() throws Exception {
+
+      AttributesFactory af = new AttributesFactory();
+      af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+      PartitionAttributesFactory paf = new PartitionAttributesFactory();
+      paf.setTotalNumBuckets(totalnumOfBuckets);
+      paf.setRedundantCopies(1);
+
+      af.setPartitionAttributes(paf.create());
+
+      HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
+      hsf.setHomeDir(folderPath);
+      homeDir = folderPath; // for clean-up in tearDown2()
+      hsf.setBatchSize(batchSize);
+      hsf.setBatchInterval(batchInterval);
+      hsf.setBufferPersistent(true);
+      hsf.setDiskStoreName(uniqueName + vm.getPid());
+
+      getCache().createDiskStoreFactory().create(uniqueName + vm.getPid());
+
+      af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
+      af.setHDFSStoreName(uniqueName);
+      af.setHDFSWriteOnly(false);
+
+      hsf.create(uniqueName);
+
+      createRootRegion(uniqueName, af.create());
+
+      return 0;
+    }
+  };
+
+  private int createServerRegion(final VM vm, final int totalnumOfBuckets,
+      final int batchSize, final int batchInterval, final int maximumEntries,
+      final String folderPath, final String uniqueName) {
+    SerializableCallable createRegion = new SerializableCallable() {
+      public Object call() throws Exception {
+        AttributesFactory af = new AttributesFactory();
+        af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setTotalNumBuckets(totalnumOfBuckets);
+        paf.setRedundantCopies(1);
+        af.setPartitionAttributes(paf.create());
+
+        HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
+        homeDir = new File(folderPath).getCanonicalPath();
+        hsf.setHomeDir(homeDir);
+        hsf.setBatchSize(batchSize);
+        hsf.setBatchInterval(batchInterval);
+        hsf.setBufferPersistent(false);
+        hsf.setMaxMemory(1);
+        hsf.create(uniqueName);
+        af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
+
+        af.setHDFSWriteOnly(false);
+        af.setHDFSStoreName(uniqueName);
+        createRootRegion(uniqueName, af.create());
+
+        return 0;
+      }
+    };
+
+    return (Integer) vm.invoke(createRegion);
+  }
+
+  private void cacheClose(VM vm, final boolean sleep) {
+    vm.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        if (sleep)
+          Thread.sleep(2000);
+        getCache().getLogger().info("Cache close in progress ");
+        getCache().close();
+        getCache().getDistributedSystem().disconnect();
+        getCache().getLogger().info("Cache closed");
+        return null;
+      }
+    });
+
+  }
+
+  private void doPuts(VM vm, final String regionName, final int start, final int end) throws Exception {
+    vm.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region r = getRootRegion(regionName);
+        getCache().getLogger().info("Putting entries ");
+        for (int i = start; i < end; i++) {
+          r.put("K" + i, "V" + i);
+        }
+        return null;
+      }
+
+    });
+  }
+
+  private AsyncInvocation doAsyncPuts(VM vm, final String regionName,
+      final int start, final int end) throws Exception {
+    return vm.invokeAsync(new SerializableCallable() {
+      public Object call() throws Exception {
+        Region r = getRootRegion(regionName);
+        getCache().getLogger().info("Putting entries ");
+        for (int i = start; i < end; i++) {
+          r.put("K" + i, "V" + i);
+        }
+        return null;
+      }
+
+    });
+  }
+
+  private void verifyGetsForValue(VM vm, final String regionName, final int start, final int end, final boolean sleep) throws Exception {
+    vm.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        if (sleep) {
+          Thread.sleep(2000);
+        }
+        getCache().getLogger().info("Getting entries ");
+        Region r = getRootRegion(regionName);
+        for (int i = start; i < end; i++) {
+          String k = "K" + i;
+          Object s = r.get(k);
+          String v = "V" + i;
+          assertTrue("The expected key " + v+ " didn't match the received value " + s, v.equals(s));
+        }
+        return null;
+      }
+
+    });
+
+  }
+}