You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:55 UTC
[09/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java
new file mode 100644
index 0000000..a1c9eb1
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSConfigJUnitTest.java
@@ -0,0 +1,520 @@
+ /*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheXmlException;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.junit.experimental.categories.Category;
+
+/**
+ * A test class for testing the configuration option for HDFS
+ *
+ * @author Hemant Bhanawat
+ * @author Ashvin Agrawal
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSConfigJUnitTest extends TestCase {
+ private GemFireCacheImpl c;
+
+ public HDFSConfigJUnitTest() {
+ super();
+ }
+
+ @Override
+ public void setUp() {
+ System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+ this.c = createCache();
+ AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
+ }
+
+ @Override
+ public void tearDown() {
+ this.c.close();
+ }
+
+ public void testHDFSStoreCreation() throws Exception {
+ this.c.close();
+ this.c = createCache();
+ try {
+ HDFSStoreFactory hsf = this.c.createHDFSStoreFactory();
+ HDFSStore store = hsf.create("myHDFSStore");
+ RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+ Region r1 = rf1.setHDFSStoreName("myHDFSStore").create("r1");
+
+ r1.put("k1", "v1");
+
+ assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+ assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+ assertEquals(false, r1.getAttributes().getHDFSWriteOnly());
+ assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+ assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 3600", store.getWriteOnlyFileRolloverInterval() == 3600);
+ assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 256MB", store.getWriteOnlyFileRolloverSize() == 256);
+ this.c.close();
+
+
+ this.c = createCache();
+ hsf = this.c.createHDFSStoreFactory();
+ hsf.create("myHDFSStore");
+
+ r1 = this.c.createRegionFactory(RegionShortcut.PARTITION_WRITEONLY_HDFS_STORE).setHDFSStoreName("myHDFSStore")
+ .create("r1");
+
+ r1.put("k1", "v1");
+ assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+ assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+ assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: true", r1.getAttributes().getHDFSWriteOnly()== true);
+ assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+ assertTrue("Mismatch in attributes, actual.batchInterval: " + store.getBatchInterval() + " and expected batchsize: 60000", store.getBatchInterval()== 60000);
+ assertTrue("Mismatch in attributes, actual.isDiskSynchronous: " + store.getSynchronousDiskWrite() + " and expected isDiskSynchronous: true", store.getSynchronousDiskWrite()== true);
+
+ this.c.close();
+
+ this.c = createCache();
+
+ File directory = new File("HDFS" + "_disk_"
+ + System.currentTimeMillis());
+ directory.mkdir();
+ File[] dirs1 = new File[] { directory };
+ DiskStoreFactory dsf = this.c.createDiskStoreFactory();
+ dsf.setDiskDirs(dirs1);
+ dsf.create("mydisk");
+
+
+ hsf = this.c.createHDFSStoreFactory();
+ hsf.setBatchSize(50);
+ hsf.setDiskStoreName("mydisk");
+ hsf.setBufferPersistent(true);
+ hsf.setBatchInterval(50);
+ hsf.setSynchronousDiskWrite(false);
+ hsf.setHomeDir("/home/hemant");
+ hsf.setNameNodeURL("mymachine");
+ hsf.setWriteOnlyFileRolloverSize(1);
+ hsf.setWriteOnlyFileRolloverInterval(10);
+ hsf.create("myHDFSStore");
+
+
+ r1 = this.c.createRegionFactory(RegionShortcut.PARTITION_WRITEONLY_HDFS_STORE).setHDFSStoreName("myHDFSStore")
+ .setHDFSWriteOnly(true).create("r1");
+
+ r1.put("k1", "v1");
+ store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+
+ assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 50", store.getBatchSize()== 50);
+ assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: true", store.getBufferPersistent()== true);
+ assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: true", r1.getAttributes().getHDFSWriteOnly()== true);
+ assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: mydisk", store.getDiskStoreName()== "mydisk");
+ assertTrue("Mismatch in attributes, actual.HDFSStoreName: " + r1.getAttributes().getHDFSStoreName() + " and expected getDiskStoreName: myHDFSStore", r1.getAttributes().getHDFSStoreName()== "myHDFSStore");
+ assertTrue("Mismatch in attributes, actual.getFolderPath: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir() + " and expected getDiskStoreName: /home/hemant", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir()== "/home/hemant");
+ assertTrue("Mismatch in attributes, actual.getNamenode: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL()+ " and expected getDiskStoreName: mymachine", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL()== "mymachine");
+ assertTrue("Mismatch in attributes, actual.batchInterval: " + store.getBatchInterval() + " and expected batchsize: 50 ", store.getBatchSize()== 50);
+ assertTrue("Mismatch in attributes, actual.isDiskSynchronous: " + store.getSynchronousDiskWrite() + " and expected isPersistent: false", store.getSynchronousDiskWrite()== false);
+ assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 10", store.getWriteOnlyFileRolloverInterval() == 10);
+ assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 1MB", store.getWriteOnlyFileRolloverSize() == 1);
+ this.c.close();
+ } finally {
+ this.c.close();
+ }
+ }
+
+ public void testCacheXMLParsing() throws Exception {
+ try {
+ this.c.close();
+
+ Region r1 = null;
+
+ // use a cache.xml to recover
+ this.c = createCache();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(new OutputStreamWriter(baos), true);
+ pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+// pw.println("<?xml version=\"1.0\"?>");
+// pw.println("<!DOCTYPE cache PUBLIC");
+// pw.println(" \"-//GemStone Systems, Inc.//GemFire Declarative Caching 7.5//EN\"");
+// pw.println(" \"http://www.gemstone.com/dtd/cache7_5.dtd\">");
+ pw.println("<cache ");
+ pw.println("xmlns=\"http://schema.pivotal.io/gemfire/cache\"");
+ pw.println("xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"");
+ pw.println(" xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"");
+ pw.println("version=\"9.0\">");
+
+ pw.println(" <hdfs-store name=\"myHDFSStore\" namenode-url=\"mynamenode\" home-dir=\"mypath\" />");
+ pw.println(" <region name=\"r1\" refid=\"PARTITION_HDFS\">");
+ pw.println(" <region-attributes hdfs-store-name=\"myHDFSStore\"/>");
+ pw.println(" </region>");
+ pw.println("</cache>");
+ pw.close();
+ byte[] bytes = baos.toByteArray();
+ this.c.loadCacheXml(new ByteArrayInputStream(bytes));
+
+ r1 = this.c.getRegion("/r1");
+ HDFSStoreImpl store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+ r1.put("k1", "v1");
+ assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+ assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+ assertEquals(false, r1.getAttributes().getHDFSWriteOnly());
+ assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+ assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 3600", store.getWriteOnlyFileRolloverInterval() == 3600);
+ assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 256MB", store.getWriteOnlyFileRolloverSize() == 256);
+
+ this.c.close();
+
+ // use a cache.xml to recover
+ this.c = createCache();
+ baos = new ByteArrayOutputStream();
+ pw = new PrintWriter(new OutputStreamWriter(baos), true);
+ pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+// pw.println("<?xml version=\"1.0\"?>");
+// pw.println("<!DOCTYPE cache PUBLIC");
+// pw.println(" \"-//GemStone Systems, Inc.//GemFire Declarative Caching 7.5//EN\"");
+// pw.println(" \"http://www.gemstone.com/dtd/cache7_5.dtd\">");
+ pw.println("<cache ");
+ pw.println("xmlns=\"http://schema.pivotal.io/gemfire/cache\"");
+ pw.println("xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"");
+ pw.println(" xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"");
+ pw.println("version=\"9.0\">");
+ pw.println(" <hdfs-store name=\"myHDFSStore\" namenode-url=\"mynamenode\" home-dir=\"mypath\" />");
+ pw.println(" <region name=\"r1\" refid=\"PARTITION_WRITEONLY_HDFS_STORE\">");
+ pw.println(" <region-attributes hdfs-store-name=\"myHDFSStore\"/>");
+ pw.println(" </region>");
+ pw.println("</cache>");
+ pw.close();
+ bytes = baos.toByteArray();
+ this.c.loadCacheXml(new ByteArrayInputStream(bytes));
+
+ r1 = this.c.getRegion("/r1");
+ store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+ r1.put("k1", "v1");
+ assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 32", store.getBatchSize()== 32);
+ assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: false", store.getBufferPersistent()== false);
+ assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: false", r1.getAttributes().getHDFSWriteOnly()== false);
+ assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: null", store.getDiskStoreName()== null);
+
+ this.c.close();
+
+ // use a cache.xml to recover
+ this.c = createCache();
+ baos = new ByteArrayOutputStream();
+ pw = new PrintWriter(new OutputStreamWriter(baos), true);
+ pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
+// pw.println("<?xml version=\"1.0\"?>");
+// pw.println("<!DOCTYPE cache PUBLIC");
+// pw.println(" \"-//GemStone Systems, Inc.//GemFire Declarative Caching 7.5//EN\"");
+// pw.println(" \"http://www.gemstone.com/dtd/cache7_5.dtd\">");
+ pw.println("<cache ");
+ pw.println("xmlns=\"http://schema.pivotal.io/gemfire/cache\"");
+ pw.println("xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"");
+ pw.println(" xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"");
+ pw.println("version=\"9.0\">");
+
+ pw.println(" <disk-store name=\"mydiskstore\"/>");
+ pw.println(" <hdfs-store name=\"myHDFSStore\" namenode-url=\"mynamenode\" home-dir=\"mypath\" max-write-only-file-size=\"1\" write-only-file-rollover-interval=\"10\" ");
+ pw.println(" batch-size=\"151\" buffer-persistent =\"true\" disk-store=\"mydiskstore\" synchronous-disk-write=\"false\" batch-interval=\"50\"");
+ pw.println(" />");
+ pw.println(" <region name=\"r1\" refid=\"PARTITION_WRITEONLY_HDFS_STORE\">");
+ pw.println(" <region-attributes hdfs-store-name=\"myHDFSStore\" hdfs-write-only=\"false\">");
+ pw.println(" </region-attributes>");
+ pw.println(" </region>");
+ pw.println("</cache>");
+ pw.close();
+ bytes = baos.toByteArray();
+ this.c.loadCacheXml(new ByteArrayInputStream(bytes));
+
+ r1 = this.c.getRegion("/r1");
+ store = c.findHDFSStore(r1.getAttributes().getHDFSStoreName());
+ r1.put("k1", "v1");
+ assertTrue("Mismatch in attributes, actual.batchsize: " + store.getBatchSize() + " and expected batchsize: 151", store.getBatchSize()== 151);
+ assertTrue("Mismatch in attributes, actual.isPersistent: " + store.getBufferPersistent() + " and expected isPersistent: true", store.getBufferPersistent()== true);
+ assertTrue("Mismatch in attributes, actual.isRandomAccessAllowed: " + r1.getAttributes().getHDFSWriteOnly() + " and expected isRandomAccessAllowed: true", r1.getAttributes().getHDFSWriteOnly()== false);
+ assertTrue("Mismatch in attributes, actual.getDiskStoreName: " + store.getDiskStoreName() + " and expected getDiskStoreName: mydiskstore", store.getDiskStoreName().equals("mydiskstore"));
+ assertTrue("Mismatch in attributes, actual.HDFSStoreName: " + r1.getAttributes().getHDFSStoreName() + " and expected getDiskStoreName: myHDFSStore", r1.getAttributes().getHDFSStoreName().equals("myHDFSStore"));
+ assertTrue("Mismatch in attributes, actual.getFolderPath: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir() + " and expected getDiskStoreName: mypath", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getHomeDir().equals("mypath"));
+ assertTrue("Mismatch in attributes, actual.getNamenode: " + ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL()+ " and expected getDiskStoreName: mynamenode", ((GemFireCacheImpl)this.c).findHDFSStore("myHDFSStore").getNameNodeURL().equals("mynamenode"));
+ assertTrue("Mismatch in attributes, actual.batchInterval: " + store.getBatchInterval() + " and expected batchsize: 50", store.getBatchInterval()== 50);
+ assertTrue("Mismatch in attributes, actual.isDiskSynchronous: " + store.getSynchronousDiskWrite() + " and expected isDiskSynchronous: false", store.getSynchronousDiskWrite()== false);
+ assertTrue("Mismatch in attributes, actual.getFileRolloverInterval: " + store.getWriteOnlyFileRolloverInterval() + " and expected getFileRolloverInterval: 10", store.getWriteOnlyFileRolloverInterval() == 10);
+ assertTrue("Mismatch in attributes, actual.getMaxFileSize: " + store.getWriteOnlyFileRolloverSize() + " and expected getMaxFileSize: 1MB", store.getWriteOnlyFileRolloverSize() == 1);
+
+ this.c.close();
+ } finally {
+ this.c.close();
+ }
+ }
+
+ /**
+ * Validates if hdfs store conf is getting completely and correctly parsed
+ */
+ public void testHdfsStoreConfFullParsing() {
+ String conf = createStoreConf("123");
+ this.c.loadCacheXml(new ByteArrayInputStream(conf.getBytes()));
+ HDFSStoreImpl store = ((GemFireCacheImpl)this.c).findHDFSStore("store");
+ assertEquals("namenode url mismatch.", "url", store.getNameNodeURL());
+ assertEquals("home-dir mismatch.", "dir", store.getHomeDir());
+ assertEquals("hdfs-client-config-file mismatch.", "client", store.getHDFSClientConfigFile());
+ assertEquals("read-cache-size mismatch.", 24.5f, store.getBlockCacheSize());
+
+ assertFalse("compaction auto-compact mismatch.", store.getMinorCompaction());
+ assertTrue("compaction auto-major-compact mismatch.", store.getMajorCompaction());
+ assertEquals("compaction max-concurrency", 23, store.getMinorCompactionThreads());
+ assertEquals("compaction max-major-concurrency", 27, store.getMajorCompactionThreads());
+ assertEquals("compaction major-interval", 711, store.getPurgeInterval());
+ }
+
+ /**
+ * Validates that the config defaults are set even with minimum XML configuration
+ */
+ public void testHdfsStoreConfMinParse() {
+ this.c.loadCacheXml(new ByteArrayInputStream(XML_MIN_CONF.getBytes()));
+ HDFSStoreImpl store = ((GemFireCacheImpl)this.c).findHDFSStore("store");
+ assertEquals("namenode url mismatch.", "url", store.getNameNodeURL());
+ assertEquals("home-dir mismatch.", "gemfire", store.getHomeDir());
+
+ assertTrue("compaction auto-compact mismatch.", store.getMinorCompaction());
+ assertTrue("compaction auto-major-compact mismatch.", store.getMajorCompaction());
+ assertEquals("compaction max-input-file-size mismatch.", 512, store.getInputFileSizeMax());
+ assertEquals("compaction min-input-file-count.", 4, store.getInputFileCountMin());
+ assertEquals("compaction max-iteration-size.", 10, store.getInputFileCountMax());
+ assertEquals("compaction max-concurrency", 10, store.getMinorCompactionThreads());
+ assertEquals("compaction max-major-concurrency", 2, store.getMajorCompactionThreads());
+ assertEquals("compaction major-interval", 720, store.getMajorCompactionInterval());
+ assertEquals("compaction cleanup-interval", 30, store.getPurgeInterval());
+ }
+
+ /**
+ * Validates that cache creation fails if a compaction configuration is
+ * provided which is not applicable to the selected compaction strategy
+ */
+ public void testHdfsStoreInvalidCompactionConf() {
+ String conf = createStoreConf("123");
+ try {
+ this.c.loadCacheXml(new ByteArrayInputStream(conf.getBytes()));
+ // expected
+ } catch (CacheXmlException e) {
+ fail();
+ }
+ }
+
+ /**
+ * Validates that cache creation fails if a compaction configuration is
+ * provided which is not applicable to the selected compaction strategy
+ */
+ public void testInvalidConfigCheck() throws Exception {
+ this.c.close();
+
+ this.c = createCache();
+
+ HDFSStoreFactory hsf;
+ hsf = this.c.createHDFSStoreFactory();
+
+ try {
+ hsf.setInputFileSizeMax(-1);
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ try {
+ hsf.setInputFileCountMin(-1);
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ try {
+ hsf.setInputFileCountMax(-1);
+ //expected
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ }
+ try {
+ hsf.setMinorCompactionThreads(-1);
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ try {
+ hsf.setMajorCompactionInterval(-1);
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ try {
+ hsf.setMajorCompactionThreads(-1);
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ try {
+ hsf.setPurgeInterval(-1);
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ try {
+ hsf.setInputFileCountMin(2);
+ hsf.setInputFileCountMax(1);
+ hsf.create("test");
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ try {
+ hsf.setInputFileCountMax(1);
+ hsf.setInputFileCountMin(2);
+ hsf.create("test");
+ fail("validation failed");
+ } catch (IllegalArgumentException e) {
+ //expected
+ }
+ }
+
+ /**
+ * Validates cache creation fails if invalid integer size configuration is provided
+ * @throws Exception
+ */
+ public void testHdfsStoreConfInvalidInt() throws Exception {
+ String conf = createStoreConf("NOT_INTEGER");
+ try {
+ this.c.loadCacheXml(new ByteArrayInputStream(conf.getBytes()));
+ fail();
+ } catch (CacheXmlException e) {
+ // expected
+ }
+ }
+
+
+ private static String XML_MIN_CONF = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n"
+ + "<cache \n"
+ + "xmlns=\"http://schema.pivotal.io/gemfire/cache\"\n"
+ + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+ + " xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"\n"
+ + "version=\"9.0\">" +
+ " <hdfs-store name=\"store\" namenode-url=\"url\" />" +
+ "</cache>";
+
+ private static String XML_FULL_CONF = "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n"
+ + "<cache \n"
+ + "xmlns=\"http://schema.pivotal.io/gemfire/cache\"\n"
+ + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+ + " xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\"\n"
+ + "version=\"9.0\">"
+ + " <hdfs-store name=\"store\" namenode-url=\"url\" "
+ + " home-dir=\"dir\" "
+ + " read-cache-size=\"24.5\" "
+ + " max-write-only-file-size=\"FILE_SIZE_CONF\" "
+ + " minor-compaction-threads = \"23\""
+ + " major-compaction-threads = \"27\""
+ + " major-compaction=\"true\" "
+ + " minor-compaction=\"false\" "
+ + " major-compaction-interval=\"781\" "
+ + " purge-interval=\"711\" hdfs-client-config-file=\"client\" />\n"
+ + "</cache>";
+ // potential replacement targets
+ String FILE_SIZE_CONF_SUBSTRING = "FILE_SIZE_CONF";
+
+ private String createStoreConf(String fileSize) {
+ String result = XML_FULL_CONF;
+
+ String replaceWith = (fileSize == null) ? "123" : fileSize;
+ result = result.replaceFirst(FILE_SIZE_CONF_SUBSTRING, replaceWith);
+
+ return result;
+ }
+
+ public void _testBlockCacheConfiguration() throws Exception {
+ this.c.close();
+ this.c = createCache();
+ try {
+ HDFSStoreFactory hsf = this.c.createHDFSStoreFactory();
+
+ //Configure a block cache to cache about 20 blocks.
+ long heapSize = HeapMemoryMonitor.getTenuredPoolMaxMemory();
+ int blockSize = StoreFile.DEFAULT_BLOCKSIZE_SMALL;
+ int blockCacheSize = 5 * blockSize;
+ int entrySize = blockSize / 2;
+
+
+ float percentage = 100 * (float) blockCacheSize / (float) heapSize;
+ hsf.setBlockCacheSize(percentage);
+ HDFSStoreImpl store = (HDFSStoreImpl) hsf.create("myHDFSStore");
+ RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+ //Create a region that evicts everything
+ LocalRegion r1 = (LocalRegion) rf1.setHDFSStoreName("myHDFSStore").setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(1)).create("r1");
+
+ //Populate about many times our block cache size worth of data
+ //We want to try to cache at least 5 blocks worth of index and metadata
+ byte[] value = new byte[entrySize];
+ int numEntries = 10 * blockCacheSize / entrySize;
+ for(int i = 0; i < numEntries; i++) {
+ r1.put(i, value);
+ }
+
+ //Wait for the events to be written to HDFS.
+ Set<String> queueIds = r1.getAsyncEventQueueIds();
+ assertEquals(1, queueIds.size());
+ AsyncEventQueueImpl queue = (AsyncEventQueueImpl) c.getAsyncEventQueue(queueIds.iterator().next());
+ long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(120);
+ while(queue.size() > 0 && System.nanoTime() < end) {
+ Thread.sleep(10);
+ }
+ assertEquals(0, queue.size());
+
+
+ Thread.sleep(10000);
+
+ //Do some reads to cache some blocks. Note that this doesn't
+ //end up caching data blocks, just index and bloom filters blocks.
+ for(int i = 0; i < numEntries; i++) {
+ r1.get(i);
+ }
+
+ long statSize = store.getStats().getBlockCache().getBytesCached();
+ assertTrue("Block cache stats expected to be near " + blockCacheSize + " was " + statSize,
+ blockCacheSize / 2 < statSize &&
+ statSize <= 2 * blockCacheSize);
+
+ long currentSize = store.getBlockCache().getCurrentSize();
+ assertTrue("Block cache size expected to be near " + blockCacheSize + " was " + currentSize,
+ blockCacheSize / 2 < currentSize &&
+ currentSize <= 2 * blockCacheSize);
+
+ } finally {
+ this.c.close();
+ }
+ }
+
+ protected GemFireCacheImpl createCache() {
+ return (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").set("log-level", "info")
+ .create();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
new file mode 100644
index 0000000..75dfa93
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
@@ -0,0 +1,227 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.internal.ParallelAsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedListForAsyncQueueJUnitTest.KeyValue;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@SuppressWarnings("rawtypes")
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSEntriesSetJUnitTest extends TestCase {
+ private GemFireCacheImpl cache;
+ private HDFSStoreImpl store;
+ private PartitionedRegion region;
+ private BucketRegion bucket;
+ private HDFSParallelGatewaySenderQueue queue;
+
+ private HDFSBucketRegionQueue brq;
+ private HoplogOrganizer hdfs;
+
+ public void setUp() throws Exception {
+ System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+ cache = (GemFireCacheImpl) new CacheFactory()
+ .set("mcast-port", "0")
+ .set("log-level", "info")
+ .create();
+
+ HDFSStoreFactory hsf = this.cache.createHDFSStoreFactory();
+ hsf.setHomeDir("hoplogs");
+ store = (HDFSStoreImpl) hsf.create("test");
+
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(1);
+
+ RegionFactory rf = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+ region = (PartitionedRegion) rf.setHDFSStoreName("test").setPartitionAttributes(paf.create()).create("test");
+
+ // prime the region so buckets get created
+ region.put("test", "test");
+ GatewaySenderAttributes g = new GatewaySenderAttributes();
+ g.isHDFSQueue = true;
+ g.id = "HDFSEntriesSetJUnitTest_Queue";
+ ParallelAsyncEventQueueImpl gatewaySender = new ParallelAsyncEventQueueImpl(cache, g);
+ Set<Region> set = new HashSet<Region>();
+ set.add(region);
+
+ queue = new HDFSParallelGatewaySenderQueue(gatewaySender, set, 0, 1);
+ brq = (HDFSBucketRegionQueue)((PartitionedRegion) queue.getRegion()).getDataStore().getLocalBucketById(0);
+ bucket = region.getDataStore().getLocalBucketById(0);
+
+ HdfsRegionManager mgr = HDFSRegionDirector.getInstance().manageRegion(region, "test", null);
+ hdfs = mgr.<SortedHoplogPersistedEvent>create(0);
+ AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
+ }
+
+ public void tearDown() throws Exception {
+ store.getFileSystem().delete(new Path("hoplogs"), true);
+ hdfs.close();
+
+ cache.close();
+ }
+
+ public void testEmptyIterator() throws Exception {
+ checkIteration(Collections.<String>emptyList(), new KeyValue[] { }, new KeyValue[] { });
+ }
+
+ public void testQueueOnlyIterator() throws Exception {
+ KeyValue[] qvals = new KeyValue[] {
+ new KeyValue("K0", "0"),
+ new KeyValue("K1", "1"),
+ new KeyValue("K2", "2"),
+ new KeyValue("K3", "3"),
+ new KeyValue("K4", "4")
+ };
+ checkIteration(getExpected(), qvals, new KeyValue[] { });
+ }
+
+ public void testHdfsOnlyIterator() throws Exception {
+ KeyValue[] hvals = new KeyValue[] {
+ new KeyValue("K0", "0"),
+ new KeyValue("K1", "1"),
+ new KeyValue("K2", "2"),
+ new KeyValue("K3", "3"),
+ new KeyValue("K4", "4")
+ };
+ checkIteration(getExpected(), new KeyValue[] { }, hvals);
+ }
+
+ public void testUnevenIterator() throws Exception {
+ KeyValue[] qvals = new KeyValue[] {
+ new KeyValue("K0", "0"),
+ new KeyValue("K2", "2"),
+ };
+
+ KeyValue[] hvals = new KeyValue[] {
+ new KeyValue("K1", "1"),
+ new KeyValue("K3", "3"),
+ new KeyValue("K4", "4")
+ };
+
+ checkIteration(getExpected(), qvals, hvals);
+ }
+
+ public void testEitherOrIterator() throws Exception {
+ KeyValue[] qvals = new KeyValue[] {
+ new KeyValue("K0", "0"),
+ new KeyValue("K2", "2"),
+ new KeyValue("K4", "4")
+ };
+
+ KeyValue[] hvals = new KeyValue[] {
+ new KeyValue("K1", "1"),
+ new KeyValue("K3", "3")
+ };
+
+ checkIteration(getExpected(), qvals, hvals);
+ }
+
+ public void testDuplicateIterator() throws Exception {
+ KeyValue[] qvals = new KeyValue[] {
+ new KeyValue("K0", "0"),
+ new KeyValue("K1", "1"),
+ new KeyValue("K2", "2"),
+ new KeyValue("K3", "3"),
+ new KeyValue("K4", "4"),
+ new KeyValue("K4", "4")
+ };
+
+ KeyValue[] hvals = new KeyValue[] {
+ new KeyValue("K0", "0"),
+ new KeyValue("K1", "1"),
+ new KeyValue("K2", "2"),
+ new KeyValue("K3", "3"),
+ new KeyValue("K4", "4"),
+ new KeyValue("K4", "4")
+ };
+
+ checkIteration(getExpected(), qvals, hvals);
+ }
+
+ private List<String> getExpected() {
+ List<String> expected = new ArrayList<String>();
+ expected.add("0");
+ expected.add("1");
+ expected.add("2");
+ expected.add("3");
+ expected.add("4");
+ return expected;
+ }
+
+ private void checkIteration(List<String> expected, KeyValue[] qvals, KeyValue[] hvals)
+ throws Exception {
+ int seq = 0;
+ List<PersistedEventImpl> evts = new ArrayList<PersistedEventImpl>();
+ for (KeyValue kv : hvals) {
+ evts.add(new SortedHDFSQueuePersistedEvent(getNewEvent(kv.key, kv.value, seq++)));
+ }
+ hdfs.flush(evts.iterator(), evts.size());
+
+ for (KeyValue kv : qvals) {
+ queue.put(getNewEvent(kv.key, kv.value, seq++));
+ }
+
+ List<String> actual = new ArrayList<String>();
+ Iterator vals = new HDFSEntriesSet(bucket, brq, hdfs, IteratorType.VALUES, null).iterator();
+ while (vals.hasNext()) {
+ Object val = vals.next();
+ if(val instanceof CachedDeserializable) {
+ val = ((CachedDeserializable) val).getDeserializedForReading();
+ }
+ actual.add((String) val);
+ }
+
+ assertEquals(expected, actual);
+ }
+
+ private HDFSGatewayEventImpl getNewEvent(Object key, Object value, long seq) throws Exception {
+ EntryEventImpl evt = EntryEventImpl.create(region, Operation.CREATE,
+ key, value, null, false, (DistributedMember) cache.getMyId());
+
+ evt.setEventId(new EventID(cache.getDistributedSystem()));
+ HDFSGatewayEventImpl event = new HDFSGatewayEventImpl(EnumListenerEvent.AFTER_CREATE, evt, null, true, 0);
+ event.setShadowKey(seq);
+
+ return event;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
new file mode 100644
index 0000000..b8cbb0d
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
@@ -0,0 +1,191 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HdfsStoreMutatorJUnitTest extends BaseHoplogTestCase {
+ public void testMutatorInitialState() {
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ assertEquals(-1, mutator.getWriteOnlyFileRolloverInterval());
+ assertEquals(-1, mutator.getWriteOnlyFileRolloverSize());
+
+ assertEquals(-1, mutator.getInputFileCountMax());
+ assertEquals(-1, mutator.getInputFileSizeMax());
+ assertEquals(-1, mutator.getInputFileCountMin());
+ assertEquals(-1, mutator.getMinorCompactionThreads());
+ assertNull(mutator.getMinorCompaction());
+
+ assertEquals(-1, mutator.getMajorCompactionInterval());
+ assertEquals(-1, mutator.getMajorCompactionThreads());
+ assertNull(mutator.getMajorCompaction());
+
+ assertEquals(-1, mutator.getPurgeInterval());
+
+ assertEquals(-1, mutator.getBatchSize());
+ assertEquals(-1, mutator.getBatchInterval());
+ }
+
+ public void testMutatorSetInvalidValue() {
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+
+ try {
+ mutator.setWriteOnlyFileRolloverInterval(-3);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ mutator.setWriteOnlyFileRolloverSize(-5);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ try {
+ mutator.setInputFileCountMin(-1);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ mutator.setInputFileCountMax(-1);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ mutator.setInputFileSizeMax(-1);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ mutator.setMinorCompactionThreads(-9);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ mutator.setMajorCompactionInterval(-6);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ mutator.setMajorCompactionThreads(-1);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ mutator.setPurgeInterval(-4);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+/* try {
+ qMutator.setBatchSizeMB(-985);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ try {
+ qMutator.setBatchTimeInterval(-695);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+*/
+ try {
+ mutator.setInputFileCountMin(10);
+ mutator.setInputFileCountMax(5);
+ hdfsStore.alter(mutator);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ }
+
+ public void testMutatorReturnsUpdatedValues() {
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+
+ mutator.setWriteOnlyFileRolloverInterval(121);
+ mutator.setWriteOnlyFileRolloverSize(234);
+
+ mutator.setInputFileCountMax(87);
+ mutator.setInputFileSizeMax(45);
+ mutator.setInputFileCountMin(34);
+ mutator.setMinorCompactionThreads(843);
+ mutator.setMinorCompaction(false);
+
+ mutator.setMajorCompactionInterval(26);
+ mutator.setMajorCompactionThreads(92);
+ mutator.setMajorCompaction(false);
+
+ mutator.setPurgeInterval(328);
+
+ mutator.setBatchSize(985);
+ mutator.setBatchInterval(695);
+
+ assertEquals(121, mutator.getWriteOnlyFileRolloverInterval());
+ assertEquals(234, mutator.getWriteOnlyFileRolloverSize());
+
+ assertEquals(87, mutator.getInputFileCountMax());
+ assertEquals(45, mutator.getInputFileSizeMax());
+ assertEquals(34, mutator.getInputFileCountMin());
+ assertEquals(843, mutator.getMinorCompactionThreads());
+ assertFalse(mutator.getMinorCompaction());
+
+ assertEquals(26, mutator.getMajorCompactionInterval());
+ assertEquals(92, mutator.getMajorCompactionThreads());
+ assertFalse(mutator.getMajorCompaction());
+
+ assertEquals(328, mutator.getPurgeInterval());
+
+ assertEquals(985, mutator.getBatchSize());
+ assertEquals(695, mutator.getBatchInterval());
+
+ // repeat the cycle once more
+ mutator.setWriteOnlyFileRolloverInterval(14);
+ mutator.setWriteOnlyFileRolloverSize(56);
+
+ mutator.setInputFileCountMax(93);
+ mutator.setInputFileSizeMax(85);
+ mutator.setInputFileCountMin(64);
+ mutator.setMinorCompactionThreads(59);
+ mutator.setMinorCompaction(true);
+
+ mutator.setMajorCompactionInterval(26);
+ mutator.setMajorCompactionThreads(92);
+ mutator.setMajorCompaction(false);
+
+ mutator.setPurgeInterval(328);
+
+ assertEquals(14, mutator.getWriteOnlyFileRolloverInterval());
+ assertEquals(56, mutator.getWriteOnlyFileRolloverSize());
+
+ assertEquals(93, mutator.getInputFileCountMax());
+ assertEquals(85, mutator.getInputFileSizeMax());
+ assertEquals(64, mutator.getInputFileCountMin());
+ assertEquals(59, mutator.getMinorCompactionThreads());
+ assertTrue(mutator.getMinorCompaction());
+
+ assertEquals(26, mutator.getMajorCompactionInterval());
+ assertEquals(92, mutator.getMajorCompactionThreads());
+ assertFalse(mutator.getMajorCompaction());
+
+ assertEquals(328, mutator.getPurgeInterval());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
new file mode 100644
index 0000000..290f8d1
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
@@ -0,0 +1,415 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.File;
+import java.io.IOException;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.FileUtil;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * A class for testing the recovery after restart for GemFire cluster that has
+ * HDFS regions
+ *
+ * @author Hemant Bhanawat
+ */
+@SuppressWarnings({ "serial", "deprecation", "rawtypes" })
+public class RegionRecoveryDUnitTest extends CacheTestCase {
+ public RegionRecoveryDUnitTest(String name) {
+ super(name);
+ }
+
+ private static String homeDir = null;
+
+ @Override
+ public void preTearDownCacheTestCase() throws Exception {
+ for (int h = 0; h < Host.getHostCount(); h++) {
+ Host host = Host.getHost(h);
+ SerializableCallable cleanUp = cleanUpStores();
+ for (int v = 0; v < host.getVMCount(); v++) {
+ VM vm = host.getVM(v);
+ vm.invoke(cleanUp);
+ }
+ }
+ super.preTearDownCacheTestCase();
+ }
+
+ public SerializableCallable cleanUpStores() throws Exception {
+ SerializableCallable cleanUp = new SerializableCallable() {
+ public Object call() throws Exception {
+ if (homeDir != null) {
+ // Each VM will try to delete the same directory. But that's okay as
+ // the subsequent invocations will be no-ops.
+ FileUtil.delete(new File(homeDir));
+ homeDir = null;
+ }
+ return 0;
+ }
+ };
+ return cleanUp;
+ }
+
+ /**
+ * Tests a basic restart of the system. Events if in HDFS should be read back.
+ * The async queue is not persisted so we wait until async queue persists the
+ * items to HDFS.
+ *
+ * @throws Exception
+ */
+ public void testBasicRestart() throws Exception {
+ disconnectFromDS();
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ // Going two level up to avoid home directories getting created in
+ // VM-specific directory. This avoids failures in those tests where
+ // datastores are restarted and bucket ownership changes between VMs.
+ homeDir = "../../testBasicRestart";
+ String uniqueName = "testBasicRestart";
+
+ createServerRegion(vm0, 11, 1, 500, 500, homeDir, uniqueName);
+ createServerRegion(vm1, 11, 1, 500, 500, homeDir, uniqueName);
+ createServerRegion(vm2, 11, 1, 500, 500, homeDir, uniqueName);
+ createServerRegion(vm3, 11, 1, 500, 500, homeDir, uniqueName);
+
+ doPuts(vm0, uniqueName, 1, 50);
+ doPuts(vm1, uniqueName, 40, 100);
+ doPuts(vm2, uniqueName, 40, 100);
+ doPuts(vm3, uniqueName, 90, 150);
+
+ cacheClose(vm0, true);
+ cacheClose(vm1, true);
+ cacheClose(vm2, true);
+ cacheClose(vm3, true);
+
+ createServerRegion(vm0, 11, 1, 500, 500, homeDir, uniqueName);
+ createServerRegion(vm1, 11, 1, 500, 500, homeDir, uniqueName);
+ createServerRegion(vm2, 11, 1, 500, 500, homeDir, uniqueName);
+ createServerRegion(vm3, 11, 1, 500, 500, homeDir, uniqueName);
+
+ verifyGetsForValue(vm0, uniqueName, 1, 50, false);
+ verifyGetsForValue(vm1, uniqueName, 40, 100, false);
+ verifyGetsForValue(vm2, uniqueName, 40, 100, false);
+ verifyGetsForValue(vm3, uniqueName, 90, 150, false);
+
+ cacheClose(vm0, false);
+ cacheClose(vm1, false);
+ cacheClose(vm2, false);
+ cacheClose(vm3, false);
+
+ disconnectFromDS();
+
+ }
+
+ /**
+ * Servers are stopped and restarted. Disabled due to bug 48067.
+ */
+ public void testPersistedAsyncQueue_Restart() throws Exception {
+ disconnectFromDS();
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ // Going two level up to avoid home directories getting created in
+ // VM-specific directory. This avoids failures in those tests where
+ // datastores are restarted and bucket ownership changes between VMs.
+ homeDir = "../../testPersistedAsyncQueue_Restart";
+ String uniqueName = "testPersistedAsyncQueue_Restart";
+
+ // create cache and region
+ createPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
+ createPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
+ createPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
+ createPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
+
+ // do some puts
+ AsyncInvocation a0 = doAsyncPuts(vm0, uniqueName, 1, 50);
+ AsyncInvocation a1 = doAsyncPuts(vm1, uniqueName, 40, 100);
+ AsyncInvocation a2 = doAsyncPuts(vm2, uniqueName, 40, 100);
+ AsyncInvocation a3 = doAsyncPuts(vm3, uniqueName, 90, 150);
+
+ a3.join();
+ a2.join();
+ a1.join();
+ a0.join();
+
+ // close the cache
+ cacheClose(vm0, true);
+ cacheClose(vm1, true);
+ cacheClose(vm2, true);
+ cacheClose(vm3, true);
+
+ // recreate the cache and regions
+ a3 = createAsyncPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
+ a2 = createAsyncPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
+ a1 = createAsyncPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
+ a0 = createAsyncPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
+
+ a3.join();
+ a2.join();
+ a1.join();
+ a0.join();
+
+ // these gets should probably fetch the data from async queue
+ verifyGetsForValue(vm0, uniqueName, 1, 50, false);
+ verifyGetsForValue(vm1, uniqueName, 40, 100, false);
+ verifyGetsForValue(vm2, uniqueName, 40, 100, false);
+ verifyGetsForValue(vm3, uniqueName, 90, 150, false);
+
+ // these gets wait for sometime before fetching the data. this will ensure
+ // that the reads are done from HDFS
+ verifyGetsForValue(vm0, uniqueName, 1, 50, true);
+ verifyGetsForValue(vm1, uniqueName, 40, 100, true);
+ verifyGetsForValue(vm2, uniqueName, 40, 100, true);
+ verifyGetsForValue(vm3, uniqueName, 90, 150, true);
+
+ cacheClose(vm0, false);
+ cacheClose(vm1, false);
+ cacheClose(vm2, false);
+ cacheClose(vm3, false);
+
+ disconnectFromDS();
+ }
+
+ /**
+ * Stops a single server. A different node becomes primary for the buckets on
+ * the stopped node. Everything should work fine. Disabled due to bug 48067
+ *
+ */
+ public void testPersistedAsyncQueue_ServerRestart() throws Exception {
+ disconnectFromDS();
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ // Going two level up to avoid home directories getting created in
+ // VM-specific directory. This avoids failures in those tests where
+ // datastores are restarted and bucket ownership changes between VMs.
+ homeDir = "../../testPAQ_ServerRestart";
+ String uniqueName = "testPAQ_ServerRestart";
+
+ createPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
+ createPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
+ createPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
+ createPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
+
+ AsyncInvocation a0 = doAsyncPuts(vm0, uniqueName, 1, 50);
+ AsyncInvocation a1 = doAsyncPuts(vm1, uniqueName, 50, 75);
+ AsyncInvocation a2 = doAsyncPuts(vm2, uniqueName, 75, 100);
+ AsyncInvocation a3 = doAsyncPuts(vm3, uniqueName, 100, 150);
+
+ a3.join();
+ a2.join();
+ a1.join();
+ a0.join();
+
+ cacheClose(vm0, false);
+
+ // these gets should probably fetch the data from async queue
+ verifyGetsForValue(vm1, uniqueName, 1, 50, false);
+ verifyGetsForValue(vm2, uniqueName, 40, 100, false);
+ verifyGetsForValue(vm3, uniqueName, 70, 150, false);
+
+ // these gets wait for sometime before fetching the data. this will ensure
+ // that
+ // the reads are done from HDFS
+ verifyGetsForValue(vm2, uniqueName, 1, 100, true);
+ verifyGetsForValue(vm3, uniqueName, 40, 150, true);
+
+ cacheClose(vm1, false);
+ cacheClose(vm2, false);
+ cacheClose(vm3, false);
+
+ disconnectFromDS();
+ }
+
+ private int createPersistedServerRegion(final VM vm, final int totalnumOfBuckets,
+ final int batchSize, final int batchInterval, final int maximumEntries,
+ final String folderPath, final String uniqueName) throws IOException {
+
+ return (Integer) vm.invoke(new PersistedRegionCreation(vm, totalnumOfBuckets,
+ batchSize, batchInterval, maximumEntries, folderPath, uniqueName));
+ }
+ private AsyncInvocation createAsyncPersistedServerRegion(final VM vm, final int totalnumOfBuckets,
+ final int batchSize, final int batchInterval, final int maximumEntries, final String folderPath,
+ final String uniqueName) throws IOException {
+
+ return (AsyncInvocation) vm.invokeAsync(new PersistedRegionCreation(vm, totalnumOfBuckets,
+ batchSize, batchInterval, maximumEntries, folderPath, uniqueName));
+ }
+
+ class PersistedRegionCreation extends SerializableCallable {
+ private VM vm;
+ private int totalnumOfBuckets;
+ private int batchSize;
+ private int maximumEntries;
+ private String folderPath;
+ private String uniqueName;
+ private int batchInterval;
+
+ PersistedRegionCreation(final VM vm, final int totalnumOfBuckets,
+ final int batchSize, final int batchInterval, final int maximumEntries,
+ final String folderPath, final String uniqueName) throws IOException {
+ this.vm = vm;
+ this.totalnumOfBuckets = totalnumOfBuckets;
+ this.batchSize = batchSize;
+ this.maximumEntries = maximumEntries;
+ this.folderPath = new File(folderPath).getCanonicalPath();
+ this.uniqueName = uniqueName;
+ this.batchInterval = batchInterval;
+ }
+
+ public Object call() throws Exception {
+
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(totalnumOfBuckets);
+ paf.setRedundantCopies(1);
+
+ af.setPartitionAttributes(paf.create());
+
+ HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
+ hsf.setHomeDir(folderPath);
+ homeDir = folderPath; // for clean-up in tearDown2()
+ hsf.setBatchSize(batchSize);
+ hsf.setBatchInterval(batchInterval);
+ hsf.setBufferPersistent(true);
+ hsf.setDiskStoreName(uniqueName + vm.getPid());
+
+ getCache().createDiskStoreFactory().create(uniqueName + vm.getPid());
+
+ af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
+ af.setHDFSStoreName(uniqueName);
+ af.setHDFSWriteOnly(false);
+
+ hsf.create(uniqueName);
+
+ createRootRegion(uniqueName, af.create());
+
+ return 0;
+ }
+ };
+
+ private int createServerRegion(final VM vm, final int totalnumOfBuckets,
+ final int batchSize, final int batchInterval, final int maximumEntries,
+ final String folderPath, final String uniqueName) {
+ SerializableCallable createRegion = new SerializableCallable() {
+ public Object call() throws Exception {
+ AttributesFactory af = new AttributesFactory();
+ af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(totalnumOfBuckets);
+ paf.setRedundantCopies(1);
+ af.setPartitionAttributes(paf.create());
+
+ HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
+ homeDir = new File(folderPath).getCanonicalPath();
+ hsf.setHomeDir(homeDir);
+ hsf.setBatchSize(batchSize);
+ hsf.setBatchInterval(batchInterval);
+ hsf.setBufferPersistent(false);
+ hsf.setMaxMemory(1);
+ hsf.create(uniqueName);
+ af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
+
+ af.setHDFSWriteOnly(false);
+ af.setHDFSStoreName(uniqueName);
+ createRootRegion(uniqueName, af.create());
+
+ return 0;
+ }
+ };
+
+ return (Integer) vm.invoke(createRegion);
+ }
+
+ private void cacheClose(VM vm, final boolean sleep) {
+ vm.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ if (sleep)
+ Thread.sleep(2000);
+ getCache().getLogger().info("Cache close in progress ");
+ getCache().close();
+ getCache().getDistributedSystem().disconnect();
+ getCache().getLogger().info("Cache closed");
+ return null;
+ }
+ });
+
+ }
+
+ private void doPuts(VM vm, final String regionName, final int start, final int end) throws Exception {
+ vm.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ Region r = getRootRegion(regionName);
+ getCache().getLogger().info("Putting entries ");
+ for (int i = start; i < end; i++) {
+ r.put("K" + i, "V" + i);
+ }
+ return null;
+ }
+
+ });
+ }
+
+ private AsyncInvocation doAsyncPuts(VM vm, final String regionName,
+ final int start, final int end) throws Exception {
+ return vm.invokeAsync(new SerializableCallable() {
+ public Object call() throws Exception {
+ Region r = getRootRegion(regionName);
+ getCache().getLogger().info("Putting entries ");
+ for (int i = start; i < end; i++) {
+ r.put("K" + i, "V" + i);
+ }
+ return null;
+ }
+
+ });
+ }
+
+ private void verifyGetsForValue(VM vm, final String regionName, final int start, final int end, final boolean sleep) throws Exception {
+ vm.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ if (sleep) {
+ Thread.sleep(2000);
+ }
+ getCache().getLogger().info("Getting entries ");
+ Region r = getRootRegion(regionName);
+ for (int i = start; i < end; i++) {
+ String k = "K" + i;
+ Object s = r.get(k);
+ String v = "V" + i;
+ assertTrue("The expected key " + v+ " didn't match the received value " + s, v.equals(s));
+ }
+ return null;
+ }
+
+ });
+
+ }
+}