You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/10/21 17:58:58 UTC
[11/15] incubator-geode git commit: GEODE-429: Remove HDFS
persistence DataPolicy
GEODE-429: Remove HDFS persistence DataPolicy
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1b4fd2fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1b4fd2fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1b4fd2fe
Branch: refs/heads/feature/GEODE-409
Commit: 1b4fd2fe872af1520027b8e0a84ffe84b9613f27
Parents: 12318e9
Author: Ashvin Agrawal <as...@apache.org>
Authored: Mon Oct 19 14:49:31 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Wed Oct 21 08:55:23 2015 -0700
----------------------------------------------------------------------
.../com/gemstone/gemfire/cache/DataPolicy.java | 19 +-
.../internal/cache/PartitionedRegionHelper.java | 2 -
.../cache/xmlcache/CacheXmlGenerator.java | 4 -
.../internal/cache/xmlcache/CacheXmlParser.java | 6 -
.../ColocatedRegionWithHDFSDUnitTest.java | 2 +-
.../hdfs/internal/RegionRecoveryDUnitTest.java | 415 -----
.../internal/RegionWithHDFSBasicDUnitTest.java | 1594 ------------------
.../RegionWithHDFSOffHeapBasicDUnitTest.java | 114 --
...RegionWithHDFSPersistenceBasicDUnitTest.java | 77 -
.../HDFSQueueRegionOperationsJUnitTest.java | 33 -
...FSQueueRegionOperationsOffHeapJUnitTest.java | 54 -
.../cache/HDFSRegionOperationsJUnitTest.java | 542 ------
.../HDFSRegionOperationsOffHeapJUnitTest.java | 78 -
13 files changed, 5 insertions(+), 2935 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/main/java/com/gemstone/gemfire/cache/DataPolicy.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/DataPolicy.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/DataPolicy.java
index 4ffeaba..9223aa4 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/DataPolicy.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/DataPolicy.java
@@ -88,18 +88,6 @@ public class DataPolicy implements java.io.Serializable {
*/
public static final DataPolicy PERSISTENT_PARTITION = new DataPolicy(6, "PERSISTENT_PARTITION");
- /**
- * In addition to <code>PARTITION</code> also causes data to be stored to
- * HDFS. The region initialization may use the data stored on HDFS.
- */
- public static final DataPolicy HDFS_PARTITION = new DataPolicy(7, "HDFS_PARTITION");
-
- /**
- * In addition to <code>HDFS_PARTITION</code> also causes data to be stored on local
- * disk. The data can be evicted from the local disk and still be read
- * from HDFS.
- */
- public static final DataPolicy HDFS_PERSISTENT_PARTITION = new DataPolicy(10, "HDFS_PERSISTENT_PARTITION");
/**
* The data policy used by default; it is {@link #NORMAL}.
*/
@@ -169,7 +157,7 @@ public class DataPolicy implements java.io.Serializable {
* @since 6.5
*/
public boolean withPersistence() {
- return this == PERSISTENT_PARTITION || this == PERSISTENT_REPLICATE || this == HDFS_PERSISTENT_PARTITION;
+ return this == PERSISTENT_PARTITION || this == PERSISTENT_REPLICATE;
}
/** Return whether this policy does partitioning.
@@ -179,7 +167,7 @@ public class DataPolicy implements java.io.Serializable {
* @since 6.5
*/
public boolean withPartitioning() {
- return this == PARTITION || this == PERSISTENT_PARTITION || this == HDFS_PARTITION || this==HDFS_PERSISTENT_PARTITION;
+ return this == PARTITION || this == PERSISTENT_PARTITION;
}
/** Return whether this policy does preloaded.
@@ -254,7 +242,8 @@ public class DataPolicy implements java.io.Serializable {
* @see #HDFS_PARTITION
*/
public boolean withHDFS() {
- return this == HDFS_PARTITION || this == HDFS_PERSISTENT_PARTITION;
+// return this == HDFS_PARTITION || this == HDFS_PERSISTENT_PARTITION;
+ return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
index 965f96c..10dc256 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegionHelper.java
@@ -115,8 +115,6 @@ public class PartitionedRegionHelper
Set policies = new HashSet();
policies.add(DEFAULT_DATA_POLICY);
policies.add(DataPolicy.PERSISTENT_PARTITION);
- policies.add(DataPolicy.HDFS_PARTITION);
- policies.add(DataPolicy.HDFS_PERSISTENT_PARTITION);
// policies.add(DataPolicy.NORMAL);
ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
index ee4e0ae..3b587b3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -1904,10 +1904,6 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
dpString = PERSISTENT_REPLICATE_DP;
} else if (dp == DataPolicy.PERSISTENT_PARTITION) {
dpString = PERSISTENT_PARTITION_DP;
- } else if (dp == DataPolicy.HDFS_PARTITION) {
- dpString = HDFS_PARTITION_DP;
- } else if (dp == DataPolicy.HDFS_PERSISTENT_PARTITION) {
- dpString = HDFS_PERSISTENT_PARTITION_DP;
} else if (dp.isPartition()) {
if (this.version.compareTo(CacheXmlVersion.VERSION_5_1) >= 0) {
dpString = PARTITION_DP;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
index f0b3612..2e77d3c 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
@@ -1261,12 +1261,6 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
else if (dp.equals(PERSISTENT_PARTITION_DP)) {
attrs.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
}
- else if (dp.equals(HDFS_PARTITION_DP)) {
- attrs.setDataPolicy(DataPolicy.HDFS_PARTITION);
- }
- else if (dp.equals(HDFS_PERSISTENT_PARTITION_DP)) {
- attrs.setDataPolicy(DataPolicy.HDFS_PERSISTENT_PARTITION);
- }
else {
throw new InternalGemFireException(LocalizedStrings.CacheXmlParser_UNKNOWN_DATA_POLICY_0.toLocalizedString(dp));
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java
index 3b0be6b..44206dc 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java
@@ -57,7 +57,7 @@ public class ColocatedRegionWithHDFSDUnitTest extends RegionWithHDFSTestBase {
hsf.create(uniqueName);
AttributesFactory af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ af.setDataPolicy(DataPolicy.PARTITION);
PartitionAttributesFactory paf = new PartitionAttributesFactory();
paf.setTotalNumBuckets(totalnumOfBuckets);
paf.setRedundantCopies(1);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
deleted file mode 100644
index 61ff18d..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionRecoveryDUnitTest.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.File;
-import java.io.IOException;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EvictionAction;
-import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache30.CacheTestCase;
-import com.gemstone.gemfire.internal.FileUtil;
-
-import dunit.AsyncInvocation;
-import dunit.Host;
-import dunit.SerializableCallable;
-import dunit.VM;
-
-/**
- * A class for testing the recovery after restart for GemFire cluster that has
- * HDFS regions
- *
- * @author Hemant Bhanawat
- */
-@SuppressWarnings({ "serial", "deprecation", "rawtypes" })
-public class RegionRecoveryDUnitTest extends CacheTestCase {
- public RegionRecoveryDUnitTest(String name) {
- super(name);
- }
-
- private static String homeDir = null;
-
- public void tearDown2() throws Exception {
- for (int h = 0; h < Host.getHostCount(); h++) {
- Host host = Host.getHost(h);
- SerializableCallable cleanUp = cleanUpStores();
- for (int v = 0; v < host.getVMCount(); v++) {
- VM vm = host.getVM(v);
- vm.invoke(cleanUp);
- }
- }
- super.tearDown2();
- }
-
- public SerializableCallable cleanUpStores() throws Exception {
- SerializableCallable cleanUp = new SerializableCallable() {
- public Object call() throws Exception {
- if (homeDir != null) {
- // Each VM will try to delete the same directory. But that's okay as
- // the subsequent invocations will be no-ops.
- FileUtil.delete(new File(homeDir));
- homeDir = null;
- }
- return 0;
- }
- };
- return cleanUp;
- }
-
- /**
- * Tests a basic restart of the system. Events if in HDFS should be read back.
- * The async queue is not persisted so we wait until async queue persists the
- * items to HDFS.
- *
- * @throws Exception
- */
- public void testBasicRestart() throws Exception {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
- VM vm3 = host.getVM(3);
-
- // Going two level up to avoid home directories getting created in
- // VM-specific directory. This avoids failures in those tests where
- // datastores are restarted and bucket ownership changes between VMs.
- homeDir = "../../testBasicRestart";
- String uniqueName = "testBasicRestart";
-
- createServerRegion(vm0, 11, 1, 500, 500, homeDir, uniqueName);
- createServerRegion(vm1, 11, 1, 500, 500, homeDir, uniqueName);
- createServerRegion(vm2, 11, 1, 500, 500, homeDir, uniqueName);
- createServerRegion(vm3, 11, 1, 500, 500, homeDir, uniqueName);
-
- doPuts(vm0, uniqueName, 1, 50);
- doPuts(vm1, uniqueName, 40, 100);
- doPuts(vm2, uniqueName, 40, 100);
- doPuts(vm3, uniqueName, 90, 150);
-
- cacheClose(vm0, true);
- cacheClose(vm1, true);
- cacheClose(vm2, true);
- cacheClose(vm3, true);
-
- createServerRegion(vm0, 11, 1, 500, 500, homeDir, uniqueName);
- createServerRegion(vm1, 11, 1, 500, 500, homeDir, uniqueName);
- createServerRegion(vm2, 11, 1, 500, 500, homeDir, uniqueName);
- createServerRegion(vm3, 11, 1, 500, 500, homeDir, uniqueName);
-
- verifyGetsForValue(vm0, uniqueName, 1, 50, false);
- verifyGetsForValue(vm1, uniqueName, 40, 100, false);
- verifyGetsForValue(vm2, uniqueName, 40, 100, false);
- verifyGetsForValue(vm3, uniqueName, 90, 150, false);
-
- cacheClose(vm0, false);
- cacheClose(vm1, false);
- cacheClose(vm2, false);
- cacheClose(vm3, false);
-
- disconnectFromDS();
-
- }
-
- /**
- * Servers are stopped and restarted. Disabled due to bug 48067.
- */
- public void testPersistedAsyncQueue_Restart() throws Exception {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
- VM vm3 = host.getVM(3);
-
- // Going two level up to avoid home directories getting created in
- // VM-specific directory. This avoids failures in those tests where
- // datastores are restarted and bucket ownership changes between VMs.
- homeDir = "../../testPersistedAsyncQueue_Restart";
- String uniqueName = "testPersistedAsyncQueue_Restart";
-
- // create cache and region
- createPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
- createPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
- createPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
- createPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
-
- // do some puts
- AsyncInvocation a0 = doAsyncPuts(vm0, uniqueName, 1, 50);
- AsyncInvocation a1 = doAsyncPuts(vm1, uniqueName, 40, 100);
- AsyncInvocation a2 = doAsyncPuts(vm2, uniqueName, 40, 100);
- AsyncInvocation a3 = doAsyncPuts(vm3, uniqueName, 90, 150);
-
- a3.join();
- a2.join();
- a1.join();
- a0.join();
-
- // close the cache
- cacheClose(vm0, true);
- cacheClose(vm1, true);
- cacheClose(vm2, true);
- cacheClose(vm3, true);
-
- // recreate the cache and regions
- a3 = createAsyncPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
- a2 = createAsyncPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
- a1 = createAsyncPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
- a0 = createAsyncPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
-
- a3.join();
- a2.join();
- a1.join();
- a0.join();
-
- // these gets should probably fetch the data from async queue
- verifyGetsForValue(vm0, uniqueName, 1, 50, false);
- verifyGetsForValue(vm1, uniqueName, 40, 100, false);
- verifyGetsForValue(vm2, uniqueName, 40, 100, false);
- verifyGetsForValue(vm3, uniqueName, 90, 150, false);
-
- // these gets wait for sometime before fetching the data. this will ensure
- // that the reads are done from HDFS
- verifyGetsForValue(vm0, uniqueName, 1, 50, true);
- verifyGetsForValue(vm1, uniqueName, 40, 100, true);
- verifyGetsForValue(vm2, uniqueName, 40, 100, true);
- verifyGetsForValue(vm3, uniqueName, 90, 150, true);
-
- cacheClose(vm0, false);
- cacheClose(vm1, false);
- cacheClose(vm2, false);
- cacheClose(vm3, false);
-
- disconnectFromDS();
- }
-
- /**
- * Stops a single server. A different node becomes primary for the buckets on
- * the stopped node. Everything should work fine. Disabled due to bug 48067
- *
- */
- public void testPersistedAsyncQueue_ServerRestart() throws Exception {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
- VM vm3 = host.getVM(3);
-
- // Going two level up to avoid home directories getting created in
- // VM-specific directory. This avoids failures in those tests where
- // datastores are restarted and bucket ownership changes between VMs.
- homeDir = "../../testPAQ_ServerRestart";
- String uniqueName = "testPAQ_ServerRestart";
-
- createPersistedServerRegion(vm0, 11, 1, 2000, 5, homeDir, uniqueName);
- createPersistedServerRegion(vm1, 11, 1, 2000, 5, homeDir, uniqueName);
- createPersistedServerRegion(vm2, 11, 1, 2000, 5, homeDir, uniqueName);
- createPersistedServerRegion(vm3, 11, 1, 2000, 5, homeDir, uniqueName);
-
- AsyncInvocation a0 = doAsyncPuts(vm0, uniqueName, 1, 50);
- AsyncInvocation a1 = doAsyncPuts(vm1, uniqueName, 50, 75);
- AsyncInvocation a2 = doAsyncPuts(vm2, uniqueName, 75, 100);
- AsyncInvocation a3 = doAsyncPuts(vm3, uniqueName, 100, 150);
-
- a3.join();
- a2.join();
- a1.join();
- a0.join();
-
- cacheClose(vm0, false);
-
- // these gets should probably fetch the data from async queue
- verifyGetsForValue(vm1, uniqueName, 1, 50, false);
- verifyGetsForValue(vm2, uniqueName, 40, 100, false);
- verifyGetsForValue(vm3, uniqueName, 70, 150, false);
-
- // these gets wait for sometime before fetching the data. this will ensure
- // that
- // the reads are done from HDFS
- verifyGetsForValue(vm2, uniqueName, 1, 100, true);
- verifyGetsForValue(vm3, uniqueName, 40, 150, true);
-
- cacheClose(vm1, false);
- cacheClose(vm2, false);
- cacheClose(vm3, false);
-
- disconnectFromDS();
- }
-
- private int createPersistedServerRegion(final VM vm, final int totalnumOfBuckets,
- final int batchSize, final int batchInterval, final int maximumEntries,
- final String folderPath, final String uniqueName) throws IOException {
-
- return (Integer) vm.invoke(new PersistedRegionCreation(vm, totalnumOfBuckets,
- batchSize, batchInterval, maximumEntries, folderPath, uniqueName));
- }
- private AsyncInvocation createAsyncPersistedServerRegion(final VM vm, final int totalnumOfBuckets,
- final int batchSize, final int batchInterval, final int maximumEntries, final String folderPath,
- final String uniqueName) throws IOException {
-
- return (AsyncInvocation) vm.invokeAsync(new PersistedRegionCreation(vm, totalnumOfBuckets,
- batchSize, batchInterval, maximumEntries, folderPath, uniqueName));
- }
-
- class PersistedRegionCreation extends SerializableCallable {
- private VM vm;
- private int totalnumOfBuckets;
- private int batchSize;
- private int maximumEntries;
- private String folderPath;
- private String uniqueName;
- private int batchInterval;
-
- PersistedRegionCreation(final VM vm, final int totalnumOfBuckets,
- final int batchSize, final int batchInterval, final int maximumEntries,
- final String folderPath, final String uniqueName) throws IOException {
- this.vm = vm;
- this.totalnumOfBuckets = totalnumOfBuckets;
- this.batchSize = batchSize;
- this.maximumEntries = maximumEntries;
- this.folderPath = new File(folderPath).getCanonicalPath();
- this.uniqueName = uniqueName;
- this.batchInterval = batchInterval;
- }
-
- public Object call() throws Exception {
-
- AttributesFactory af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setTotalNumBuckets(totalnumOfBuckets);
- paf.setRedundantCopies(1);
-
- af.setPartitionAttributes(paf.create());
-
- HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
- hsf.setHomeDir(folderPath);
- homeDir = folderPath; // for clean-up in tearDown2()
- hsf.setBatchSize(batchSize);
- hsf.setBatchInterval(batchInterval);
- hsf.setBufferPersistent(true);
- hsf.setDiskStoreName(uniqueName + vm.getPid());
-
- getCache().createDiskStoreFactory().create(uniqueName + vm.getPid());
-
- af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
- af.setHDFSStoreName(uniqueName);
- af.setHDFSWriteOnly(false);
-
- hsf.create(uniqueName);
-
- createRootRegion(uniqueName, af.create());
-
- return 0;
- }
- };
-
- private int createServerRegion(final VM vm, final int totalnumOfBuckets,
- final int batchSize, final int batchInterval, final int maximumEntries,
- final String folderPath, final String uniqueName) {
- SerializableCallable createRegion = new SerializableCallable() {
- public Object call() throws Exception {
- AttributesFactory af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setTotalNumBuckets(totalnumOfBuckets);
- paf.setRedundantCopies(1);
- af.setPartitionAttributes(paf.create());
-
- HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
- homeDir = new File(folderPath).getCanonicalPath();
- hsf.setHomeDir(homeDir);
- hsf.setBatchSize(batchSize);
- hsf.setBatchInterval(batchInterval);
- hsf.setBufferPersistent(false);
- hsf.setMaxMemory(1);
- hsf.create(uniqueName);
- af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
-
- af.setHDFSWriteOnly(false);
- af.setHDFSStoreName(uniqueName);
- createRootRegion(uniqueName, af.create());
-
- return 0;
- }
- };
-
- return (Integer) vm.invoke(createRegion);
- }
-
- private void cacheClose(VM vm, final boolean sleep) {
- vm.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- if (sleep)
- Thread.sleep(2000);
- getCache().getLogger().info("Cache close in progress ");
- getCache().close();
- getCache().getDistributedSystem().disconnect();
- getCache().getLogger().info("Cache closed");
- return null;
- }
- });
-
- }
-
- private void doPuts(VM vm, final String regionName, final int start, final int end) throws Exception {
- vm.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion(regionName);
- getCache().getLogger().info("Putting entries ");
- for (int i = start; i < end; i++) {
- r.put("K" + i, "V" + i);
- }
- return null;
- }
-
- });
- }
-
- private AsyncInvocation doAsyncPuts(VM vm, final String regionName,
- final int start, final int end) throws Exception {
- return vm.invokeAsync(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion(regionName);
- getCache().getLogger().info("Putting entries ");
- for (int i = start; i < end; i++) {
- r.put("K" + i, "V" + i);
- }
- return null;
- }
-
- });
- }
-
- private void verifyGetsForValue(VM vm, final String regionName, final int start, final int end, final boolean sleep) throws Exception {
- vm.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- if (sleep) {
- Thread.sleep(2000);
- }
- getCache().getLogger().info("Getting entries ");
- Region r = getRootRegion(regionName);
- for (int i = start; i < end; i++) {
- String k = "K" + i;
- Object s = r.get(k);
- String v = "V" + i;
- assertTrue("The expected key " + v+ " didn't match the received value " + s, v.equals(s));
- }
- return null;
- }
-
- });
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
deleted file mode 100644
index 5a58dc5..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSBasicDUnitTest.java
+++ /dev/null
@@ -1,1594 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.Delta;
-import com.gemstone.gemfire.InvalidDeltaException;
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EvictionAction;
-import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import dunit.AsyncInvocation;
-import dunit.DistributedTestCase;
-import dunit.Host;
-import dunit.SerializableCallable;
-import dunit.SerializableRunnable;
-import dunit.VM;
-
-/**
- * A class for testing the basic HDFS functionality
- *
- * @author Hemant Bhanawat
- */
-@SuppressWarnings({ "serial", "rawtypes", "deprecation", "unchecked", "unused" })
-public class RegionWithHDFSBasicDUnitTest extends RegionWithHDFSTestBase {
-
- private static final Logger logger = LogService.getLogger();
-
- private ExpectedException ee0;
- private ExpectedException ee1;
-
- public RegionWithHDFSBasicDUnitTest(String name) {
- super(name);
- }
-
- public void setUp() throws Exception {
- super.setUp();
- ee0 = DistributedTestCase.addExpectedException("com.gemstone.gemfire.cache.RegionDestroyedException");
- ee1 = DistributedTestCase.addExpectedException("com.gemstone.gemfire.cache.RegionDestroyedException");
- }
-
- public void tearDown2() throws Exception {
- ee0.remove();
- ee1.remove();
- super.tearDown2();
- }
-
- @Override
- protected SerializableCallable getCreateRegionCallable(
- final int totalnumOfBuckets, final int batchSizeMB,
- final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval,
- final boolean queuePersistent, final boolean writeonly,
- final long timeForRollover, final long maxFileSize) {
- SerializableCallable createRegion = new SerializableCallable("Create HDFS region") {
- public Object call() throws Exception {
- AttributesFactory af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setTotalNumBuckets(totalnumOfBuckets);
- paf.setRedundantCopies(1);
-
- af.setHDFSStoreName(uniqueName);
- af.setPartitionAttributes(paf.create());
-
- HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
- // Going two level up to avoid home directories getting created in
- // VM-specific directory. This avoids failures in those tests where
- // datastores are restarted and bucket ownership changes between VMs.
- homeDir = new File(tmpDir + "/../../" + folderPath).getCanonicalPath();
- logger.info("Setting homeDir to {}", homeDir);
- hsf.setHomeDir(homeDir);
- hsf.setBatchSize(batchSizeMB);
- hsf.setBufferPersistent(queuePersistent);
- hsf.setMaxMemory(3);
- hsf.setBatchInterval(batchInterval);
- if (timeForRollover != -1) {
- hsf.setWriteOnlyFileRolloverInterval((int) timeForRollover);
- System.setProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS", "1");
- }
- if (maxFileSize != -1) {
- hsf.setWriteOnlyFileRolloverSize((int) maxFileSize);
- }
- hsf.create(uniqueName);
-
- af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
-
- af.setHDFSWriteOnly(writeonly);
- Region r = createRootRegion(uniqueName, af.create());
- ((LocalRegion) r).setIsTest();
-
- return 0;
- }
- };
- return createRegion;
- }
-
- @Override
- protected void doPuts(final String uniqueName, int start, int end) {
- Region r = getRootRegion(uniqueName);
- for (int i = start; i < end; i++) {
- r.put("K" + i, "V" + i);
- }
- }
-
- @Override
- protected void doPutAll(final String uniqueName, Map map) {
- Region r = getRootRegion(uniqueName);
- r.putAll(map);
- }
-
- @Override
- protected void doDestroys(final String uniqueName, int start, int end) {
- Region r = getRootRegion(uniqueName);
- for (int i = start; i < end; i++) {
- r.destroy("K" + i);
- }
- }
-
- @Override
- protected void checkWithGet(String uniqueName, int start, int end, boolean expectValue) {
- Region r = getRootRegion(uniqueName);
- for (int i = start; i < end; i++) {
- String expected = expectValue ? "V" + i : null;
- assertEquals("Mismatch on key " + i, expected, r.get("K" + i));
- }
- }
-
- @Override
- protected void checkWithGetAll(String uniqueName, ArrayList arrayl) {
- Region r = getRootRegion(uniqueName);
- Map map = r.getAll(arrayl);
- logger.info("Read entries {}", map.size());
- for (Object e : map.keySet()) {
- String v = e.toString().replaceFirst("K", "V");
- assertTrue( "Reading entries failed for key " + e + " where value = " + map.get(e), v.equals(map.get(e)));
- }
- }
-
- /**
- * Tests if gets go to primary even if the value resides on secondary.
- */
- public void testValueFetchedFromLocal() {
- disconnectFromDS();
-
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- String homeDir = "./testValueFetchedFromLocal";
-
- createServerRegion(vm0, 7, 1, 50, homeDir, "testValueFetchedFromLocal", 1000);
- createServerRegion(vm1, 7, 1, 50, homeDir, "testValueFetchedFromLocal", 1000);
-
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testValueFetchedFromLocal");
- for (int i = 0; i < 25; i++) {
- r.put("K" + i, "V" + i);
- }
- return null;
- }
- });
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testValueFetchedFromLocal");
- for (int i = 0; i < 25; i++) {
- String s = null;
- String k = "K" + i;
- s = (String) r.get(k);
- String v = "V" + i;
- assertTrue( "The expected key " + v+ " didn't match the received value " + s, v.equals(s));
- }
- // with only two members and 1 redundant copy, we will have all data locally, make sure that some
- // get operations results in a remote get operation
- assertTrue( "gets should always go to primary, ", ((LocalRegion)r).getCountNotFoundInLocal() != 0 );
- return null;
- }
- });
-
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testValueFetchedFromLocal");
- assertTrue( "HDFS queue or HDFS should not have been accessed. They were accessed " + ((LocalRegion)r).getCountNotFoundInLocal() + " times",
- ((LocalRegion)r).getCountNotFoundInLocal() == 0 );
- return null;
- }
- });
- }
-
- public void testHDFSQueueSizeTest() {
- disconnectFromDS();
-
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- String homeDir = "./testHDFSQueueSize";
-
- createServerRegion(vm0, 1, 10, 50, homeDir, "testHDFSQueueSize", 100000);
- createServerRegion(vm1, 1, 10, 50, homeDir, "testHDFSQueueSize", 100000);
-
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testHDFSQueueSize");
- byte[] b = new byte[1024];
- byte[] k = new byte[1];
- for (int i = 0; i < 1; i++) {
- r.put(k, b);
- }
- ConcurrentParallelGatewaySenderQueue hdfsqueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)((PartitionedRegion)r).getHDFSEventQueue().getSender()).getQueue();
- HDFSBucketRegionQueue hdfsBQ = (HDFSBucketRegionQueue)((PartitionedRegion)hdfsqueue.getRegion()).getDataStore().getLocalBucketById(0);
- if (hdfsBQ.getBucketAdvisor().isPrimary()) {
- assertTrue("size should not as expected on primary " + hdfsBQ.queueSizeInBytes.get(), hdfsBQ.queueSizeInBytes.get() > 1024 && hdfsBQ.queueSizeInBytes.get() < 1150);
- } else {
- assertTrue("size should be 0 on secondary", hdfsBQ.queueSizeInBytes.get()==0);
- }
- return null;
-
- }
- });
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testHDFSQueueSize");
- ConcurrentParallelGatewaySenderQueue hdfsqueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)((PartitionedRegion)r).getHDFSEventQueue().getSender()).getQueue();
- HDFSBucketRegionQueue hdfsBQ = (HDFSBucketRegionQueue)((PartitionedRegion)hdfsqueue.getRegion()).getDataStore().getLocalBucketById(0);
- if (hdfsBQ.getBucketAdvisor().isPrimary()) {
- assertTrue("size should not as expected on primary " + hdfsBQ.queueSizeInBytes.get(), hdfsBQ.queueSizeInBytes.get() > 1024 && hdfsBQ.queueSizeInBytes.get() < 1150);
- } else {
- assertTrue("size should be 0 on secondary", hdfsBQ.queueSizeInBytes.get()==0);
- }
- return null;
-
- }
- });
- }
-
- /**
- * Does put for write only HDFS store
- */
- public void testBasicPutsForWriteOnlyHDFSStore() {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- String homeDir = "./testPutsForWriteOnlyHDFSStore";
-
- createServerRegion(vm0, 7, 1, 20, homeDir, "testPutsForWriteOnlyHDFSStore",
- 100, true, false);
- createServerRegion(vm1, 7, 1, 20, homeDir, "testPutsForWriteOnlyHDFSStore",
- 100, true, false);
-
- // Do some puts
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testPutsForWriteOnlyHDFSStore");
- for (int i = 0; i < 200; i++) {
- r.put("K" + i, "V" + i);
- }
- return null;
- }
- });
-
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testPutsForWriteOnlyHDFSStore");
-
- for (int i = 200; i < 400; i++) {
- r.put("K" + i, "V" + i);
- }
-
- return null;
- }
- });
-
- }
-
- /**
- * Does put for write only HDFS store
- */
- public void testDelta() {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- String homeDir = "./testDelta";
-
- // Expected from com.gemstone.gemfire.internal.cache.ServerPingMessage.send()
- ExpectedException ee1 = DistributedTestCase.addExpectedException("java.lang.InterruptedException");
- ExpectedException ee2 = DistributedTestCase.addExpectedException("java.lang.InterruptedException");
-
- createServerRegion(vm0, 7, 1, 20, homeDir, "testDelta", 100);
- createServerRegion(vm1, 7, 1, 20, homeDir, "testDelta", 100);
-
- // Do some puts
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testDelta");
- for (int i = 0; i < 100; i++) {
- r.put("K" + i, new CustomerDelta("V" + i, "address"));
- }
- for (int i = 0; i < 50; i++) {
- CustomerDelta cd = new CustomerDelta("V" + i, "address");
- cd.setAddress("updated address");
- r.put("K" + i, cd);
- }
- return null;
- }
- });
-
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testDelta");
-
- for (int i = 100; i < 200; i++) {
- r.put("K" + i, new CustomerDelta("V" + i, "address"));
- }
- for (int i = 100; i < 150; i++) {
- CustomerDelta cd = new CustomerDelta("V" + i, "address");
- cd.setAddress("updated address");
- r.put("K" + i, cd);
- }
-
- return null;
- }
- });
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testDelta");
- for (int i = 0; i < 50; i++) {
- CustomerDelta custDela = new CustomerDelta ("V" + i, "updated address" );
- String k = "K" + i;
- CustomerDelta s = (CustomerDelta) r.get(k);
-
- assertTrue( "The expected value " + custDela + " didn't match the received value " + s, custDela.equals(s));
- }
- for (int i = 50; i < 100; i++) {
- CustomerDelta custDela = new CustomerDelta("V" + i, "address");
- String k = "K" + i;
- CustomerDelta s = (CustomerDelta) r.get(k);
-
- assertTrue( "The expected value " + custDela + " didn't match the received value " + s, custDela.equals(s));
- }
- for (int i = 100; i < 150; i++) {
- CustomerDelta custDela = new CustomerDelta ("V" + i, "updated address" );
- String k = "K" + i;
- CustomerDelta s = (CustomerDelta) r.get(k);
-
- assertTrue( "The expected value " + custDela + " didn't match the received value " + s, custDela.equals(s));
- }
- for (int i = 150; i < 200; i++) {
- CustomerDelta custDela = new CustomerDelta ("V" + i, "address" );
- String k = "K" + i;
- CustomerDelta s = (CustomerDelta) r.get(k);
-
- assertTrue( "The expected value " + custDela + " didn't match the received value " + s, custDela.equals(s));
- }
- return null;
- }
- });
- ee1.remove();
- ee2.remove();
-
- }
-
- /**
- * Puts byte arrays and fetches them back to ensure that serialization of byte
- * arrays is proper
- *
- */
- public void testByteArrays() {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- String homeDir = "./testByteArrays";
-
- createServerRegion(vm0, 7, 1, 20, homeDir, "testByteArrays", 100);
- createServerRegion(vm1, 7, 1, 20, homeDir, "testByteArrays", 100);
-
- // Do some puts
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testByteArrays");
- byte[] b1 = { 0x11, 0x44, 0x77 };
- byte[] b2 = { 0x22, 0x55 };
- byte[] b3 = { 0x33 };
- for (int i = 0; i < 100; i++) {
- int x = i % 3;
- if (x == 0) {
- r.put("K" + i, b1);
- } else if (x == 1) {
- r.put("K" + i, b2);
- } else {
- r.put("K" + i, b3);
- }
- }
- return null;
- }
- });
-
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testByteArrays");
-
- byte[] b1 = { 0x11, 0x44, 0x77 };
- byte[] b2 = { 0x22, 0x55 };
- byte[] b3 = { 0x33 };
- for (int i = 100; i < 200; i++) {
- int x = i % 3;
- if (x == 0) {
- r.put("K" + i, b1);
- } else if (x == 1) {
- r.put("K" + i, b2);
- } else {
- r.put("K" + i, b3);
- }
- }
- return null;
- }
- });
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- Region r = getRootRegion("testByteArrays");
- byte[] b1 = { 0x11, 0x44, 0x77 };
- byte[] b2 = { 0x22, 0x55 };
- byte[] b3 = { 0x33 };
- for (int i = 0; i < 200; i++) {
- int x = i % 3;
- String k = "K" + i;
- byte[] s = (byte[]) r.get(k);
- if (x == 0) {
- assertTrue( "The expected value didn't match the received value of byte array" , Arrays.equals(b1, s));
- } else if (x == 1) {
- assertTrue( "The expected value didn't match the received value of byte array" , Arrays.equals(b2, s));
- } else {
- assertTrue( "The expected value didn't match the received value of byte array" , Arrays.equals(b3, s));
- }
-
- }
- return null;
- }
- });
- }
-
- private static class CustomerDelta implements Serializable, Delta {
- private String name;
- private String address;
- private boolean nameChanged;
- private boolean addressChanged;
-
- public CustomerDelta(CustomerDelta o) {
- this.address = o.address;
- this.name = o.name;
- }
-
- public CustomerDelta(String name, String address) {
- this.name = name;
- this.address = address;
- }
-
- public void fromDelta(DataInput in) throws IOException,
- InvalidDeltaException {
- boolean nameC = in.readBoolean();
- if (nameC) {
- this.name = in.readUTF();
- }
- boolean addressC = in.readBoolean();
- if (addressC) {
- this.address = in.readUTF();
- }
- }
-
- public boolean hasDelta() {
- return nameChanged || addressChanged;
- }
-
- public void toDelta(DataOutput out) throws IOException {
- out.writeBoolean(nameChanged);
- if (this.nameChanged) {
- out.writeUTF(name);
- }
- out.writeBoolean(addressChanged);
- if (this.addressChanged) {
- out.writeUTF(address);
- }
- }
-
- public void setName(String name) {
- this.nameChanged = true;
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public void setAddress(String address) {
- this.addressChanged = true;
- this.address = address;
- }
-
- public String getAddress() {
- return address;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof CustomerDelta)) {
- return false;
- }
- CustomerDelta other = (CustomerDelta) obj;
- return this.name.equals(other.name) && this.address.equals(other.address);
- }
-
- @Override
- public int hashCode() {
- return this.address.hashCode() + this.name.hashCode();
- }
-
- @Override
- public String toString() {
- return "name=" + this.name + "address=" + address;
- }
- }
-
- public void testClearRegionDataInQueue() throws Throwable {
- doTestClearRegion(100000, false);
-
- }
-
- public void testClearRegionDataInHDFS() throws Throwable {
- doTestClearRegion(1, true);
- }
-
- public void doTestClearRegion(int batchInterval, boolean waitForWriteToHDFS) throws Throwable {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- final int numEntries = 400;
-
- String name = getName();
- final String folderPath = "./" + name;
- // Create some regions. Note that we want a large batch interval
- // so that we will have some entries sitting in the queue when
- // we do a clear.
- final String uniqueName = name;
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, batchInterval,
- false, true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, batchInterval,
- false, true);
-
- doPuts(vm0, uniqueName, numEntries);
-
- // Make sure some files have been written to hdfs.
- if (waitForWriteToHDFS) {
- verifyDataInHDFS(vm0, uniqueName, true, true, waitForWriteToHDFS, numEntries);
- }
-
- // Do a clear
- simulateClear(uniqueName, vm0, vm1);
-
- validateEmpty(vm0, numEntries, uniqueName);
- validateEmpty(vm1, numEntries, uniqueName);
-
- // Double check that there is no data in hdfs now
- verifyDataInHDFS(vm0, uniqueName, false, false, waitForWriteToHDFS, numEntries);
- verifyDataInHDFS(vm1, uniqueName, false, false, waitForWriteToHDFS, numEntries);
-
- closeCache(vm0);
- closeCache(vm1);
-
- AsyncInvocation async0 = createServerRegionAsync(vm0, 7, 31, 200, folderPath,
- uniqueName, 100000, false, true);
- AsyncInvocation async1 = createServerRegionAsync(vm1, 7, 31, 200, folderPath,
- uniqueName, 100000, false, true);
- async0.getResult();
- async1.getResult();
-
- validateEmpty(vm0, numEntries, uniqueName);
- validateEmpty(vm1, numEntries, uniqueName);
- }
-
- private void simulateClear(final String name, VM... vms) throws Throwable {
- simulateClearForTests(true);
- try {
-
- // Gemfire PRs don't support clear
- // gemfirexd does a clear by taking gemfirexd ddl locks
- // and then clearing each primary bucket on the primary.
- // Simulate that by clearing all primaries on each vm.
- // See GemFireContainer.clear
-
- SerializableCallable clear = new SerializableCallable("clear") {
- public Object call() throws Exception {
- PartitionedRegion r = (PartitionedRegion) getRootRegion(name);
-
- r.clearLocalPrimaries();
-
- return null;
- }
- };
-
- // Invoke the clears concurrently
- AsyncInvocation[] async = new AsyncInvocation[vms.length];
- for (int i = 0; i < vms.length; i++) {
- async[i] = vms[i].invokeAsync(clear);
- }
-
- // Get the clear results.
- for (int i = 0; i < async.length; i++) {
- async[i].getResult();
- }
-
- } finally {
- simulateClearForTests(false);
- }
- }
-
- protected void simulateClearForTests(final boolean isGfxd) {
- SerializableRunnable setGfxd = new SerializableRunnable() {
- @Override
- public void run() {
- if (isGfxd) {
- LocalRegion.simulateClearForTests(true);
- } else {
- LocalRegion.simulateClearForTests(false);
- }
- }
- };
- setGfxd.run();
- invokeInEveryVM(setGfxd);
- }
-
- /**
- * Test that we can locally destroy a member, without causing problems with
- * the data in HDFS. This was disabled due to ticket 47793.
- *
- * @throws InterruptedException
- */
- public void testLocalDestroy() throws InterruptedException {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- int numEntries = 200;
-
- final String folderPath = "./testLocalDestroy";
- final String uniqueName = "testLocalDestroy";
-
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 1, false, true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, 1, false, true);
-
- doPuts(vm0, uniqueName, numEntries);
-
- // Make sure some files have been written to hdfs and wait for
- // the queue to drain.
- verifyDataInHDFS(vm0, uniqueName, true, true, true, numEntries);
-
- validate(vm0, uniqueName, numEntries);
-
- SerializableCallable localDestroy = new SerializableCallable("local destroy") {
- public Object call() throws Exception {
- Region r = getRootRegion(uniqueName);
- r.localDestroyRegion();
- return null;
- }
- };
-
- vm0.invoke(localDestroy);
-
- verifyNoQOrPR(vm0);
-
- validate(vm1, uniqueName, numEntries);
-
- vm1.invoke(localDestroy);
-
- verifyNoQOrPR(vm1);
-
- closeCache(vm0);
- closeCache(vm1);
-
- // Restart vm0 and see if the data is still available from HDFS
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 1, false, true);
-
- validate(vm0, uniqueName, numEntries);
- }
-
- /**
- * Test that doing a destroyRegion removes all data from HDFS.
- *
- * @throws InterruptedException
- */
- public void testGlobalDestroyWithHDFSData() throws InterruptedException {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- final String folderPath = "./testGlobalDestroyWithHDFSData";
- final String uniqueName = "testGlobalDestroyWithHDFSData";
- int numEntries = 200;
-
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 1, false, true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, 1, false, true);
-
- doPuts(vm0, uniqueName, numEntries);
-
- // Make sure some files have been written to hdfs.
- verifyDataInHDFS(vm0, uniqueName, true, true, false, numEntries);
-
- SerializableCallable globalDestroy = new SerializableCallable("destroy") {
- public Object call() throws Exception {
- Region r = getRootRegion(uniqueName);
- r.destroyRegion();
- return null;
- }
- };
-
- vm0.invoke(globalDestroy);
-
- // make sure data is not in HDFS
- verifyNoQOrPR(vm0);
- verifyNoQOrPR(vm1);
- verifyNoHDFSData(vm0, uniqueName);
- verifyNoHDFSData(vm1, uniqueName);
-
- closeCache(vm0);
- closeCache(vm1);
-
- // Restart vm0 and make sure it's still empty
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 1, false, true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, 1, false, true);
-
- // make sure it's empty
- validateEmpty(vm0, numEntries, uniqueName);
- validateEmpty(vm1, numEntries, uniqueName);
-
- }
-
- /**
- * Test that doing a destroyRegion removes all data from HDFS.
- */
- public void _testGlobalDestroyWithQueueData() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- final String folderPath = "./testGlobalDestroyWithQueueData";
- final String uniqueName = "testGlobalDestroyWithQueueData";
- int numEntries = 200;
-
- // set a large queue timeout so that data is still in the queue
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 10000, false,
- true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, 10000, false,
- true);
-
- doPuts(vm0, uniqueName, numEntries);
-
- SerializableCallable globalDestroy = new SerializableCallable("destroy") {
- public Object call() throws Exception {
- Region r = getRootRegion(uniqueName);
- r.destroyRegion();
- return null;
- }
- };
-
- vm0.invoke(globalDestroy);
-
- // make sure data is not in HDFS
- verifyNoQOrPR(vm0);
- verifyNoQOrPR(vm1);
- verifyNoHDFSData(vm0, uniqueName);
- verifyNoHDFSData(vm1, uniqueName);
-
- closeCache(vm0);
- closeCache(vm1);
-
- // Restart vm0 and make sure it's still empty
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 1, false, true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, 1, false, true);
-
- // make sure it's empty
- validateEmpty(vm0, numEntries, uniqueName);
- validateEmpty(vm1, numEntries, uniqueName);
-
- }
-
- /**
- * Make sure all async event queues and PRs a destroyed in a member
- */
- public void verifyNoQOrPR(VM vm) {
- vm.invoke(new SerializableRunnable() {
- @Override
- public void run() {
- GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
- assertEquals(Collections.EMPTY_SET, cache.getAsyncEventQueues());
- assertEquals(Collections.EMPTY_SET, cache.getPartitionedRegions());
- }
- });
-
- }
-
- /**
- * Make sure all of the data for a region in HDFS is destroyed
- */
- public void verifyNoHDFSData(final VM vm, final String uniqueName) {
- vm.invoke(new SerializableCallable() {
- @Override
- public Object call() throws IOException {
- HDFSStoreImpl hdfsStore = (HDFSStoreImpl) ((GemFireCacheImpl)getCache()).findHDFSStore(uniqueName);
- FileSystem fs = hdfsStore.getFileSystem();
- Path path = new Path(hdfsStore.getHomeDir(), uniqueName);
- if (fs.exists(path)) {
- dumpFiles(vm, uniqueName);
- fail("Found files in " + path);
- }
- return null;
- }
- });
- }
-
- protected AsyncInvocation doAsyncPuts(VM vm, final String regionName,
- final int start, final int end, final String suffix) throws Exception {
- return doAsyncPuts(vm, regionName, start, end, suffix, "");
- }
-
- protected AsyncInvocation doAsyncPuts(VM vm, final String regionName,
- final int start, final int end, final String suffix, final String value)
- throws Exception {
- return vm.invokeAsync(new SerializableCallable("doAsyncPuts") {
- public Object call() throws Exception {
- Region r = getRootRegion(regionName);
- String v = "V";
- if (!value.equals("")) {
- v = value;
- }
- logger.info("Putting entries ");
- for (int i = start; i < end; i++) {
- r.put("K" + i, v + i + suffix);
- }
- return null;
- }
-
- });
- }
-
- public void _testGlobalDestroyFromAccessor() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
-
- final String folderPath = "./testGlobalDestroyFromAccessor";
- final String uniqueName = "testGlobalDestroyFromAccessor";
- int numEntries = 200;
-
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 1, false, true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, 1, false, true);
- createServerAccessor(vm2, 7, 40, uniqueName);
-
- doPuts(vm0, uniqueName, numEntries);
-
- // Make sure some files have been written to hdfs.
- verifyDataInHDFS(vm0, uniqueName, true, true, false, numEntries);
-
- SerializableCallable globalDestroy = new SerializableCallable("destroy") {
- public Object call() throws Exception {
- Region r = getRootRegion(uniqueName);
- r.destroyRegion();
- return null;
- }
- };
-
- // Destroy the region from an accessor
- vm2.invoke(globalDestroy);
-
- // make sure data is not in HDFS
- verifyNoQOrPR(vm0);
- verifyNoQOrPR(vm1);
- verifyNoHDFSData(vm0, uniqueName);
- verifyNoHDFSData(vm1, uniqueName);
-
- closeCache(vm0);
- closeCache(vm1);
- closeCache(vm2);
-
- // Restart vm0 and make sure it's still empty
- createServerRegion(vm0, 7, 31, 40, folderPath, uniqueName, 1, false, true);
- createServerRegion(vm1, 7, 31, 40, folderPath, uniqueName, 1, false, true);
-
- // make sure it's empty
- validateEmpty(vm0, numEntries, uniqueName);
- validateEmpty(vm1, numEntries, uniqueName);
- }
-
- /**
- * create a server with maxfilesize as 2 MB. Insert 4 entries of 1 MB each.
- * There should be 2 files with 2 entries each.
- *
- * @throws Throwable
- */
- public void testWOFileSizeParam() throws Throwable {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- String homeDir = "./testWOFileSizeParam";
- final String uniqueName = getName();
- String value = "V";
- for (int i = 0; i < 20; i++) {
- value += value;
- }
-
- createServerRegion(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 2000, 2);
- createServerRegion(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 2000, 2);
-
- AsyncInvocation a1 = doAsyncPuts(vm0, uniqueName, 1, 3, "vm0", value);
- AsyncInvocation a2 = doAsyncPuts(vm1, uniqueName, 2, 4, "vm1", value);
-
- a1.join();
- a2.join();
-
- Thread.sleep(4000);
-
- cacheClose(vm0, false);
- cacheClose(vm1, false);
-
- // Start the VMs in parallel for the persistent version subclass
- AsyncInvocation async1 = createServerRegionAsync(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 2000, 2);
- AsyncInvocation async2 = createServerRegionAsync(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 2000, 2);
- async1.getResult();
- async2.getResult();
-
- // There should be two files in bucket 0.
- verifyTwoHDFSFilesWithTwoEntries(vm0, uniqueName, value);
-
- cacheClose(vm0, false);
- cacheClose(vm1, false);
-
- disconnectFromDS();
-
- }
-
- /**
- * Create server with file rollover time as 5 seconds. Insert few entries and
- * then sleep for 7 seconds. A file should be created. Do it again. At the end, two
- * files with inserted entries should be created.
- *
- * @throws Throwable
- */
- public void testWOTimeForRollOverParam() throws Throwable {
- disconnectFromDS();
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- String homeDir = "./testWOTimeForRollOverParam";
- final String uniqueName = getName();
-
- createServerRegion(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
- createServerRegion(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
-
- AsyncInvocation a1 = doAsyncPuts(vm0, uniqueName, 1, 8, "vm0");
- AsyncInvocation a2 = doAsyncPuts(vm1, uniqueName, 4, 10, "vm1");
-
- a1.join();
- a2.join();
-
- Thread.sleep(7000);
-
- a1 = doAsyncPuts(vm0, uniqueName, 10, 18, "vm0");
- a2 = doAsyncPuts(vm1, uniqueName, 14, 20, "vm1");
-
- a1.join();
- a2.join();
-
- Thread.sleep(7000);
-
- cacheClose(vm0, false);
- cacheClose(vm1, false);
-
- AsyncInvocation async1 = createServerRegionAsync(vm0, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
- AsyncInvocation async2 = createServerRegionAsync(vm1, 1, 1, 500, homeDir, uniqueName, 5, true, false, 5, 1);
- async1.getResult();
- async2.getResult();
-
- // There should be two files in bucket 0.
- // Each should have entry 1 to 10 and duplicate from 4 to 7
- verifyTwoHDFSFiles(vm0, uniqueName);
-
- cacheClose(vm0, false);
- cacheClose(vm1, false);
-
- disconnectFromDS();
-
- }
-
- private void createServerAccessor(VM vm, final int totalnumOfBuckets,
- final int maximumEntries, final String uniqueName) {
- SerializableCallable createRegion = new SerializableCallable() {
- public Object call() throws Exception {
- AttributesFactory af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setTotalNumBuckets(totalnumOfBuckets);
- paf.setRedundantCopies(1);
- // make this member an accessor.
- paf.setLocalMaxMemory(0);
- af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
- af.setPartitionAttributes(paf.create());
-
- Region r = createRootRegion(uniqueName, af.create());
- assertTrue(!((PartitionedRegion) r).isDataStore());
-
- return null;
- }
- };
-
- vm.invoke(createRegion);
- }
-
- @Override
- protected void verifyHDFSData(VM vm, String uniqueName) throws Exception {
-
- HashMap<String, HashMap<String, String>> filesToEntriesMap = createFilesAndEntriesMap(vm, uniqueName, uniqueName);
- HashMap<String, String> entriesMap = new HashMap<String, String>();
- for (HashMap<String, String> v : filesToEntriesMap.values()) {
- entriesMap.putAll(v);
- }
- verifyInEntriesMap(entriesMap, 1, 50, "vm0");
- verifyInEntriesMap(entriesMap, 40, 100, "vm1");
- verifyInEntriesMap(entriesMap, 40, 100, "vm2");
- verifyInEntriesMap(entriesMap, 90, 150, "vm3");
-
- }
-
- protected void verifyTwoHDFSFiles(VM vm, String uniqueName) throws Exception {
-
- HashMap<String, HashMap<String, String>> filesToEntriesMap = createFilesAndEntriesMap(vm, uniqueName, uniqueName);
-
- assertTrue("there should be exactly two files, but there are "
- + filesToEntriesMap.size(), filesToEntriesMap.size() == 2);
- long timestamp = Long.MAX_VALUE;
- String olderFile = null;
- for (Map.Entry<String, HashMap<String, String>> e : filesToEntriesMap
- .entrySet()) {
- String fileName = e.getKey().substring(
- 0,
- e.getKey().length()
- - AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION.length());
- long newTimeStamp = Long.parseLong(fileName.substring(
- fileName.indexOf("-") + 1, fileName.lastIndexOf("-")));
- if (newTimeStamp < timestamp) {
- olderFile = e.getKey();
- timestamp = newTimeStamp;
- }
- }
- verifyInEntriesMap(filesToEntriesMap.get(olderFile), 1, 8, "vm0");
- verifyInEntriesMap(filesToEntriesMap.get(olderFile), 4, 10, "vm1");
- filesToEntriesMap.remove(olderFile);
- verifyInEntriesMap(filesToEntriesMap.values().iterator().next(), 10, 18, "vm0");
- verifyInEntriesMap(filesToEntriesMap.values().iterator().next(), 14, 20, "vm1");
- }
-
- protected void verifyTwoHDFSFilesWithTwoEntries(VM vm, String uniqueName,
- String value) throws Exception {
-
- HashMap<String, HashMap<String, String>> filesToEntriesMap = createFilesAndEntriesMap(vm, uniqueName, uniqueName);
-
- assertTrue( "there should be exactly two files, but there are " + filesToEntriesMap.size(), filesToEntriesMap.size() == 2);
- HashMap<String, String> entriesMap = new HashMap<String, String>();
- for (HashMap<String, String> v : filesToEntriesMap.values()) {
- entriesMap.putAll(v);
- }
- assertTrue( "Expected key K1 received " + entriesMap.get(value+ "1vm0"), entriesMap.get(value+ "1vm0").equals("K1"));
- assertTrue( "Expected key K2 received " + entriesMap.get(value+ "2vm0"), entriesMap.get(value+ "2vm0").equals("K2"));
- assertTrue( "Expected key K2 received " + entriesMap.get(value+ "2vm1"), entriesMap.get(value+ "2vm1").equals("K2"));
- assertTrue( "Expected key K3 received " + entriesMap.get(value+ "3vm1"), entriesMap.get(value+ "3vm1").equals("K3"));
- }
-
- /**
- * verify that a PR accessor can be started
- */
- public void testPRAccessor() {
- Host host = Host.getHost(0);
- VM accessor = host.getVM(0);
- VM datastore1 = host.getVM(1);
- VM datastore2 = host.getVM(2);
- VM accessor2 = host.getVM(3);
- final String regionName = getName();
- final String storeName = "store_" + regionName;
-
- SerializableCallable createRegion = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- HDFSStoreFactory storefactory = getCache().createHDFSStoreFactory();
- homeDir = new File("../" + regionName).getCanonicalPath();
- storefactory.setHomeDir(homeDir);
- storefactory.create(storeName);
- AttributesFactory<Integer, String> af = new AttributesFactory<Integer, String>();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
- af.setHDFSStoreName(storeName);
- Region r = getCache().createRegionFactory(af.create()).create(regionName);
- r.put("key1", "value1");
- return null;
- }
- };
-
- SerializableCallable createAccessorRegion = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- HDFSStoreFactory storefactory = getCache().createHDFSStoreFactory();
- homeDir = new File("../" + regionName).getCanonicalPath();
- storefactory.setHomeDir(homeDir);
- storefactory.create(storeName);
- // DataPolicy PARTITION with localMaxMemory 0 cannot be created
- AttributesFactory<Integer, String> af = new AttributesFactory<Integer, String>();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory<Integer, String> paf = new PartitionAttributesFactory<Integer, String>();
- paf.setLocalMaxMemory(0);
- af.setPartitionAttributes(paf.create());
- // DataPolicy PARTITION with localMaxMemory 0 can be created if hdfsStoreName is set
- af.setHDFSStoreName(storeName);
- // No need to check with different storeNames (can never be done in GemFireXD)
- Region r = getCache().createRegionFactory(af.create()).create(regionName);
- r.localDestroyRegion();
- // DataPolicy HDFS_PARTITION with localMaxMemory 0 can be created
- af = new AttributesFactory<Integer, String>();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
- af.setPartitionAttributes(paf.create());
- getCache().createRegionFactory(af.create()).create(regionName);
- return null;
- }
- };
-
- datastore1.invoke(createRegion);
- accessor.invoke(createAccessorRegion);
- datastore2.invoke(createRegion);
- accessor2.invoke(createAccessorRegion);
- }
-
- /**
- * verify that PUT dml does not read from hdfs
- */
- public void testPUTDMLSupport() {
- doPUTDMLWork(false);
- }
-
- public void testPUTDMLBulkSupport() {
- doPUTDMLWork(true);
- }
-
- private void doPUTDMLWork(final boolean isPutAll) {
- Host host = Host.getHost(0);
- VM vm1 = host.getVM(0);
- VM vm2 = host.getVM(1);
- final String regionName = getName();
-
- createServerRegion(vm1, 7, 1, 50, "./" + regionName, regionName, 1000);
- createServerRegion(vm2, 7, 1, 50, "./" + regionName, regionName, 1000);
-
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- LocalRegion lr = (LocalRegion) r;
- SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- long readsFromHDFS = stats.getRead().getCount();
- assertEquals(0, readsFromHDFS);
- if (isPutAll) {
- Map m = new HashMap();
- // map with only one entry
- m.put("key0", "value0");
- DistributedPutAllOperation ev = lr.newPutAllOperation(m, null);
- lr.basicPutAll(m, ev, null);
- m.clear();
- // map with multiple entries
- for (int i = 1; i < 100; i++) {
- m.put("key" + i, "value" + i);
- }
- ev = lr.newPutAllOperation(m, null);
- lr.basicPutAll(m, ev, null);
- } else {
- for (int i = 0; i < 100; i++) {
- r.put("key" + i, "value" + i);
- }
- }
- return null;
- }
- });
-
- SerializableCallable getHDFSReadCount = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- return stats.getRead().getCount();
- }
- };
-
- long vm1Count = (Long) vm1.invoke(getHDFSReadCount);
- long vm2Count = (Long) vm2.invoke(getHDFSReadCount);
- assertEquals(100, vm1Count + vm2Count);
-
- pause(10 * 1000);
-
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- // do puts using the new api
- LocalRegion lr = (LocalRegion) getCache().getRegion(regionName);
- if (isPutAll) {
- Map m = new HashMap();
- // map with only one entry
- m.put("key0", "value0");
- DistributedPutAllOperation ev = lr.newPutAllForPUTDmlOperation(m, null);
- lr.basicPutAll(m, ev, null);
- m.clear();
- // map with multiple entries
- for (int i = 1; i < 200; i++) {
- m.put("key" + i, "value" + i);
- }
- ev = lr.newPutAllForPUTDmlOperation(m, null);
- lr.basicPutAll(m, ev, null);
- } else {
- for (int i = 0; i < 200; i++) {
- EntryEventImpl ev = lr.newPutEntryEvent("key" + i, "value" + i, null);
- lr.validatedPut(ev, System.currentTimeMillis());
- }
- }
- return null;
- }
- });
-
- // verify the stat for hdfs reads has not incremented
- vm1Count = (Long) vm1.invoke(getHDFSReadCount);
- vm2Count = (Long) vm2.invoke(getHDFSReadCount);
- assertEquals(100, vm1Count + vm2Count);
-
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- for (int i = 0; i < 200; i++) {
- assertEquals("value" + i, r.get("key" + i));
- }
- return null;
- }
- });
- }
-
- /**
- * verify that get on operational data does not read from HDFS
- */
- public void testGetOperationalData() {
- Host host = Host.getHost(0);
- VM vm1 = host.getVM(0);
- VM vm2 = host.getVM(1);
- final String regionName = getName();
-
- createServerRegion(vm1, 7, 1, 50, "./"+regionName, regionName, 1000);
- createServerRegion(vm2, 7, 1, 50, "./"+regionName, regionName, 1000);
-
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- long readsFromHDFS = stats.getRead().getCount();
- assertEquals(0, readsFromHDFS);
- for (int i = 0; i < 100; i++) {
- logger.info("SWAP:DOING PUT:key{}", i);
- r.put("key" + i, "value" + i);
- }
- return null;
- }
- });
-
- SerializableCallable getHDFSReadCount = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- return stats.getRead().getCount();
- }
- };
-
- long vm1Count = (Long) vm1.invoke(getHDFSReadCount);
- long vm2Count = (Long) vm2.invoke(getHDFSReadCount);
- assertEquals(100, vm1Count + vm2Count);
-
- pause(10 * 1000);
-
- // verify that get increments the read stat
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- for (int i = 0; i < 200; i++) {
- if (i < 100) {
- logger.info("SWAP:DOING GET:key", i);
- assertEquals("value" + i, r.get("key" + i));
- } else {
- assertNull(r.get("key" + i));
- }
- }
- return null;
- }
- });
-
- vm1Count = (Long) vm1.invoke(getHDFSReadCount);
- vm2Count = (Long) vm2.invoke(getHDFSReadCount);
- // initial 100 + 150 for get (since 50 are in memory)
- assertEquals(250, vm1Count + vm2Count);
-
- // do gets with readFromHDFS set to false
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- LocalRegion lr = (LocalRegion) r;
- int numEntries = 0;
- for (int i = 0; i < 200; i++) {
- logger.info("SWAP:DOING GET NO READ:key", i);
- Object val = lr.get("key"+i, null, true, false, false, null, null, false, false/*allowReadFromHDFS*/);
- if (val != null) {
- numEntries++;
- }
- }
- assertEquals(50, numEntries); // entries in memory
- return null;
- }
- });
-
- vm1Count = (Long) vm1.invoke(getHDFSReadCount);
- vm2Count = (Long) vm2.invoke(getHDFSReadCount);
- // get should not have incremented
- assertEquals(250, vm1Count + vm2Count);
-
- /**MergeGemXDHDFSToGFE Have not merged this API as this api is not called by any code*/
- /*
- // do gets using DataView
- SerializableCallable getUsingDataView = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- LocalRegion lr = (LocalRegion) r;
- PartitionedRegion pr = (PartitionedRegion) lr;
- long numEntries = 0;
- for (int i=0; i<200; i++) {
- InternalDataView idv = lr.getDataView();
- logger.debug("SWAP:DATAVIEW");
- Object val = idv.getLocally("key"+i, null, PartitionedRegionHelper.getHashKey(pr, "key"+i), lr, true, true, null, null, false, false);
- if (val != null) {
- numEntries++;
- }
- }
- return numEntries;
- }
- };
-
- vm1Count = (Long) vm1.invoke(getUsingDataView);
- vm2Count = (Long) vm2.invoke(getUsingDataView);
- assertEquals(50 * 2, vm1Count + vm2Count);// both VMs will find 50 entries*/
-
- vm1Count = (Long) vm1.invoke(getHDFSReadCount);
- vm2Count = (Long) vm2.invoke(getHDFSReadCount);
- // get should not have incremented
- assertEquals(250, vm1Count + vm2Count);
-
- }
-
- public void testSizeEstimate() {
- Host host = Host.getHost(0);
- VM vm1 = host.getVM(0);
- VM vm2 = host.getVM(1);
- VM vm3 = host.getVM(2);
- final String regionName = getName();
-
- createServerRegion(vm1, 7, 1, 50, "./"+regionName, regionName, 1000);
- createServerRegion(vm2, 7, 1, 50, "./"+regionName, regionName, 1000);
- createServerRegion(vm3, 7, 1, 50, "./"+regionName, regionName, 1000);
-
- final int size = 226;
-
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- // LocalRegion lr = (LocalRegion) r;
- for (int i = 0; i < size; i++) {
- r.put("key" + i, "value" + i);
- }
- // before flush
- // assertEquals(size, lr.sizeEstimate());
- return null;
- }
- });
-
- pause(10 * 1000);
-
- vm2.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- LocalRegion lr = (LocalRegion) r;
- logger.debug("SWAP:callingsizeEstimate");
- long estimate = lr.sizeEstimate();
- double err = Math.abs(estimate - size) / (double) size;
- System.out.println("SWAP:estimate:" + estimate);
- assertTrue(err < 0.2);
- return null;
- }
- });
- }
-
- public void testForceAsyncMajorCompaction() throws Exception {
- doForceCompactionTest(true, false);
- }
-
- public void testForceSyncMajorCompaction() throws Exception {
- // more changes
- doForceCompactionTest(true, true);
- }
-
- private void doForceCompactionTest(final boolean isMajor, final boolean isSynchronous) throws Exception {
- Host host = Host.getHost(0);
- VM vm1 = host.getVM(0);
- VM vm2 = host.getVM(1);
- VM vm3 = host.getVM(2);
- final String regionName = getName();
-
- createServerRegion(vm1, 7, 1, 50, "./" + regionName, regionName, 1000);
- createServerRegion(vm2, 7, 1, 50, "./" + regionName, regionName, 1000);
- createServerRegion(vm3, 7, 1, 50, "./" + regionName, regionName, 1000);
-
- SerializableCallable noCompaction = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- if (isMajor) {
- assertEquals(0, stats.getMajorCompaction().getCount());
- } else {
- assertEquals(0, stats.getMinorCompaction().getCount());
- }
- return null;
- }
- };
-
- vm1.invoke(noCompaction);
- vm2.invoke(noCompaction);
- vm3.invoke(noCompaction);
-
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- Region r = getCache().getRegion(regionName);
- for (int i = 0; i < 500; i++) {
- r.put("key" + i, "value" + i);
- if (i % 100 == 0) {
- // wait for flush
- pause(3000);
- }
- }
- pause(3000);
- PartitionedRegion pr = (PartitionedRegion) r;
- long lastCompactionTS = pr.lastMajorHDFSCompaction();
- assertEquals(0, lastCompactionTS);
- long beforeCompact = System.currentTimeMillis();
- pr.forceHDFSCompaction(true, isSynchronous ? 0 : 1);
- if (isSynchronous) {
- final SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- assertTrue(stats.getMajorCompaction().getCount() > 0);
- assertTrue(pr.lastMajorHDFSCompaction() >= beforeCompact);
- }
- return null;
- }
- });
-
- if (!isSynchronous) {
- SerializableCallable verifyCompactionStat = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- final SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- waitForCriterion(new WaitCriterion() {
- @Override
- public boolean done() {
- return stats.getMajorCompaction().getCount() > 0;
- }
-
- @Override
- public String description() {
- return "Major compaction stat not > 0";
- }
- }, 30 * 1000, 1000, true);
- return null;
- }
- };
-
- vm1.invoke(verifyCompactionStat);
- vm2.invoke(verifyCompactionStat);
- vm3.invoke(verifyCompactionStat);
- } else {
- SerializableCallable verifyCompactionStat = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- final SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + regionName);
- assertTrue(stats.getMajorCompaction().getCount() > 0);
- return null;
- }
- };
- vm2.invoke(verifyCompactionStat);
- vm3.invoke(verifyCompactionStat);
- }
- }
-
- public void testFlushQueue() throws Exception {
- doFlushQueue(false);
- }
-
- public void testFlushQueueWO() throws Exception {
- doFlushQueue(true);
- }
-
- private void doFlushQueue(boolean wo) throws Exception {
- Host host = Host.getHost(0);
- VM vm1 = host.getVM(0);
- VM vm2 = host.getVM(1);
- VM vm3 = host.getVM(2);
- final String regionName = getName();
-
- createServerRegion(vm1, 7, 1, 50, "./"+regionName, regionName, 300000, wo, false);
- createServerRegion(vm2, 7, 1, 50, "./"+regionName, regionName, 300000, wo, false);
- createServerRegion(vm3, 7, 1, 50, "./"+regionName, regionName, 300000, wo, false);
-
- vm1.invoke(new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(regionName);
- for (int i = 0; i < 500; i++) {
- pr.put("key" + i, "value" + i);
- }
-
- pr.flushHDFSQueue(0);
- return null;
- }
- });
-
- SerializableCallable verify = new SerializableCallable() {
- @Override
- public Object call() throws Exception {
- PartitionedRegion pr = (PartitionedRegion) getCache().getRegion(regionName);
- assertEquals(0, pr.getHDFSEventQueueStats().getEventQueueSize());
- return null;
- }
- };
-
- vm1.invoke(verify);
- vm2.invoke(verify);
- vm3.invoke(verify);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1b4fd2fe/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java
deleted file mode 100644
index ee517d2..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSOffHeapBasicDUnitTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.File;
-import java.util.Properties;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EvictionAction;
-import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.OffHeapTestUtil;
-
-import dunit.SerializableCallable;
-import dunit.SerializableRunnable;
-
-@SuppressWarnings({ "serial", "rawtypes", "deprecation" })
-public class RegionWithHDFSOffHeapBasicDUnitTest extends
- RegionWithHDFSBasicDUnitTest {
- static {
- System.setProperty("gemfire.trackOffHeapRefCounts", "true");
- }
-
- public RegionWithHDFSOffHeapBasicDUnitTest(String name) {
- super(name);
- }
-
- @Override
- public void tearDown2() throws Exception {
- SerializableRunnable checkOrphans = new SerializableRunnable() {
-
- @Override
- public void run() {
- if(hasCache()) {
- OffHeapTestUtil.checkOrphans();
- }
- }
- };
- try {
- checkOrphans.run();
- invokeInEveryVM(checkOrphans);
- } finally {
- // proceed with tearDown2 anyway.
- super.tearDown2();
- }
- }
-
- public void testDelta() {
- //do nothing, deltas aren't supported with off heap.
- }
-
- @Override
- protected SerializableCallable getCreateRegionCallable(final int totalnumOfBuckets,
- final int batchSizeMB, final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval, final boolean queuePersistent,
- final boolean writeonly, final long timeForRollover, final long maxFileSize) {
- SerializableCallable createRegion = new SerializableCallable() {
- public Object call() throws Exception {
- AttributesFactory af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.HDFS_PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setTotalNumBuckets(totalnumOfBuckets);
- paf.setRedundantCopies(1);
-
- af.setHDFSStoreName(uniqueName);
- af.setPartitionAttributes(paf.create());
- HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
- // Going two level up to avoid home directories getting created in
- // VM-specific directory. This avoids failures in those tests where
- // datastores are restarted and bucket ownership changes between VMs.
- homeDir = new File(tmpDir + "/../../" + folderPath).getCanonicalPath();
- hsf.setHomeDir(homeDir);
- hsf.setBatchSize(batchSizeMB);
- hsf.setBufferPersistent(queuePersistent);
- hsf.setMaxMemory(3);
- hsf.setBatchInterval(batchInterval);
- if (timeForRollover != -1) {
- hsf.setWriteOnlyFileRolloverInterval((int)timeForRollover);
- System.setProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS", "1");
- }
- if (maxFileSize != -1) {
- hsf.setWriteOnlyFileRolloverSize((int) maxFileSize);
- }
- hsf.create(uniqueName);
-
- af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(maximumEntries, EvictionAction.LOCAL_DESTROY));
-
- af.setHDFSWriteOnly(writeonly);
- af.setOffHeap(true);;
- Region r = createRootRegion(uniqueName, af.create());
- ((LocalRegion)r).setIsTest();
-
- return 0;
- }
- };
- return createRegion;
- }
-
- @Override
- public Properties getDistributedSystemProperties() {
- Properties props = super.getDistributedSystemProperties();
- props.setProperty("off-heap-memory-size", "50m");
- return props;
- }
-}