You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/10/21 17:59:02 UTC
[15/15] incubator-geode git commit: GEODE-429: Remove HdfsStore Junit
and Dunits
GEODE-429: Remove HdfsStore Junit and Dunits
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/74c3156a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/74c3156a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/74c3156a
Branch: refs/heads/feature/GEODE-409
Commit: 74c3156aaa0d29ccc4ec0b4c9a53659d2c9eb003
Parents: 1b4fd2f
Author: Ashvin Agrawal <as...@apache.org>
Authored: Mon Oct 19 14:58:00 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Wed Oct 21 08:55:23 2015 -0700
----------------------------------------------------------------------
.../ColocatedRegionWithHDFSDUnitTest.java | 189 ----
.../hdfs/internal/HDFSEntriesSetJUnitTest.java | 228 ----
.../internal/HdfsStoreMutatorJUnitTest.java | 191 ----
.../hdfs/internal/RegionWithHDFSTestBase.java | 715 ------------
.../internal/hoplog/BaseHoplogTestCase.java | 389 -------
.../hoplog/CardinalityEstimatorJUnitTest.java | 188 ----
.../hoplog/HDFSCacheLoaderJUnitTest.java | 106 --
.../hoplog/HDFSCompactionManagerJUnitTest.java | 449 --------
.../hoplog/HDFSRegionDirectorJUnitTest.java | 97 --
.../internal/hoplog/HDFSStatsJUnitTest.java | 250 -----
.../HDFSUnsortedHoplogOrganizerJUnitTest.java | 297 -----
.../HdfsSortedOplogOrganizerJUnitTest.java | 1045 ------------------
.../hoplog/HfileSortedOplogJUnitTest.java | 540 ---------
.../hoplog/SortedOplogListIterJUnitTest.java | 178 ---
.../hoplog/TieredCompactionJUnitTest.java | 904 ---------------
.../hoplog/mapreduce/GFKeyJUnitTest.java | 50 -
.../mapreduce/HDFSSplitIteratorJUnitTest.java | 265 -----
.../hoplog/mapreduce/HoplogUtilJUnitTest.java | 305 -----
18 files changed, 6386 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java
deleted file mode 100644
index 44206dc..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/ColocatedRegionWithHDFSDUnitTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EvictionAction;
-import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-
-import dunit.AsyncInvocation;
-import dunit.SerializableCallable;
-import dunit.VM;
-
-/**
- * A class for testing the basic HDFS functionality
- *
- * @author Hemant Bhanawat
- */
-@SuppressWarnings({"serial", "rawtypes", "unchecked", "deprecation"})
-public class ColocatedRegionWithHDFSDUnitTest extends RegionWithHDFSTestBase {
-
- public ColocatedRegionWithHDFSDUnitTest(String name) {
- super(name);
- }
-
- @Override
- protected SerializableCallable getCreateRegionCallable(
- final int totalnumOfBuckets, final int batchSizeMB,
- final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval,
- final boolean queuePersistent, final boolean writeonly,
- final long timeForRollover, final long maxFileSize) {
- SerializableCallable createRegion = new SerializableCallable() {
- public Object call() throws Exception {
- HDFSStoreFactory hsf = getCache().createHDFSStoreFactory();
- hsf.setBatchSize(batchSizeMB);
- hsf.setBufferPersistent(queuePersistent);
- hsf.setMaxMemory(3);
- hsf.setBatchInterval(batchInterval);
- hsf.setHomeDir(tmpDir + "/" + folderPath);
- homeDir = new File(tmpDir + "/" + folderPath).getCanonicalPath();
- hsf.setHomeDir(homeDir);
- hsf.create(uniqueName);
-
- AttributesFactory af = new AttributesFactory();
- af.setDataPolicy(DataPolicy.PARTITION);
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setTotalNumBuckets(totalnumOfBuckets);
- paf.setRedundantCopies(1);
-
- af.setHDFSStoreName(uniqueName);
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(
- maximumEntries, EvictionAction.LOCAL_DESTROY));
-
- af.setHDFSWriteOnly(writeonly);
- Region r1 = createRootRegion(uniqueName + "-r1", af.create());
-
- paf.setColocatedWith(uniqueName + "-r1");
- af.setPartitionAttributes(paf.create());
- af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(
- maximumEntries, EvictionAction.LOCAL_DESTROY));
- Region r2 = createRootRegion(uniqueName + "-r2", af.create());
-
- ((LocalRegion) r1).setIsTest();
- ((LocalRegion) r2).setIsTest();
-
- return 0;
- }
- };
- return createRegion;
- }
-
- @Override
- protected void doPuts(String uniqueName, int start, int end) {
- Region r1 = getRootRegion(uniqueName + "-r1");
- Region r2 = getRootRegion(uniqueName + "-r2");
-
- for (int i = start; i < end; i++) {
- r1.put("K" + i, "V" + i);
- r2.put("K" + i, "V" + i);
- }
- }
-
- protected AsyncInvocation doAsyncPuts(VM vm, final String regionName,
- final int start, final int end, final String suffix) throws Exception {
- return vm.invokeAsync(new SerializableCallable() {
- public Object call() throws Exception {
- Region r1 = getRootRegion(regionName + "-r1");
- Region r2 = getRootRegion(regionName + "-r2");
-
- getCache().getLogger().info("Putting entries ");
- for (int i = start; i < end; i++) {
- r1.put("K" + i, "V" + i + suffix);
- r2.put("K" + i, "V" + i + suffix);
- }
- return null;
- }
-
- });
- }
-
- protected void doPutAll(final String uniqueName, Map map) {
- Region r1 = getRootRegion(uniqueName + "-r1");
- Region r2 = getRootRegion(uniqueName + "-r2");
- r1.putAll(map);
- r2.putAll(map);
- }
-
- @Override
- protected void doDestroys(String uniqueName, int start, int end) {
- Region r1 = getRootRegion(uniqueName + "-r1");
- Region r2 = getRootRegion(uniqueName + "-r2");
-
- for (int i = start; i < end; i++) {
- r1.destroy("K" + i);
- r2.destroy("K" + i);
- }
- }
-
- @Override
- protected void checkWithGet(String uniqueName, int start, int end,
- boolean expectValue) {
- Region r1 = getRootRegion(uniqueName + "-r1");
- Region r2 = getRootRegion(uniqueName + "-r2");
- for (int i = start; i < end; i++) {
- String expected = expectValue ? "V" + i : null;
- assertEquals("Mismatch on key " + i, expected, r1.get("K" + i));
- assertEquals("Mismatch on key " + i, expected, r2.get("K" + i));
- }
- }
-
- protected void checkWithGetAll(String uniqueName, ArrayList arrayl) {
- Region r1 = getRootRegion(uniqueName + "-r1");
- Region r2 = getRootRegion(uniqueName + "-r2");
- Map map1 = r1.getAll(arrayl);
- Map map2 = r2.getAll(arrayl);
- for (Object e : map1.keySet()) {
- String v = e.toString().replaceFirst("K", "V");
- assertTrue("Reading entries failed for key " + e + " where value = "
- + map1.get(e), v.equals(map1.get(e)));
- assertTrue("Reading entries failed for key " + e + " where value = "
- + map2.get(e), v.equals(map2.get(e)));
- }
- }
-
- @Override
- protected void verifyHDFSData(VM vm, String uniqueName) throws Exception {
- HashMap<String, HashMap<String, String>> filesToEntriesMap = createFilesAndEntriesMap(
- vm, uniqueName, uniqueName + "-r1");
- HashMap<String, String> entriesMap = new HashMap<String, String>();
- for (Map.Entry<String, HashMap<String, String>> e : filesToEntriesMap
- .entrySet()) {
- entriesMap.putAll(e.getValue());
- }
-
- verifyInEntriesMap(entriesMap, 1, 50, "vm0");
- verifyInEntriesMap(entriesMap, 40, 100, "vm1");
- verifyInEntriesMap(entriesMap, 40, 100, "vm2");
- verifyInEntriesMap(entriesMap, 90, 150, "vm3");
-
- filesToEntriesMap = createFilesAndEntriesMap(vm, uniqueName, uniqueName
- + "-r2");
- entriesMap = new HashMap<String, String>();
- for (Map.Entry<String, HashMap<String, String>> e : filesToEntriesMap
- .entrySet()) {
- entriesMap.putAll(e.getValue());
- }
-
- verifyInEntriesMap(entriesMap, 1, 50, "vm0");
- verifyInEntriesMap(entriesMap, 40, 100, "vm1");
- verifyInEntriesMap(entriesMap, 40, 100, "vm2");
- verifyInEntriesMap(entriesMap, 90, 150, "vm3");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
deleted file mode 100644
index 3085a66..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSetJUnitTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.fs.Path;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionFactory;
-import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.asyncqueue.internal.ParallelAsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedListForAsyncQueueJUnitTest.KeyValue;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.CachedDeserializable;
-import com.gemstone.gemfire.internal.cache.EntryEventImpl;
-import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
-import com.gemstone.gemfire.internal.cache.EventID;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-@SuppressWarnings("rawtypes")
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HDFSEntriesSetJUnitTest extends TestCase {
- private GemFireCacheImpl cache;
- private HDFSStoreImpl store;
- private PartitionedRegion region;
- private BucketRegion bucket;
- private HDFSParallelGatewaySenderQueue queue;
-
- private HDFSBucketRegionQueue brq;
- private HoplogOrganizer hdfs;
-
- public void setUp() throws Exception {
- System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
- cache = (GemFireCacheImpl) new CacheFactory()
- .set("mcast-port", "0")
- .set("log-level", "info")
- .create();
-
- HDFSStoreFactory hsf = this.cache.createHDFSStoreFactory();
- hsf.setHomeDir("hoplogs");
- store = (HDFSStoreImpl) hsf.create("test");
-
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
- paf.setTotalNumBuckets(1);
-
- RegionFactory rf = cache.createRegionFactory(RegionShortcut.PARTITION);
-// rf.setHDFSStoreName("test");
- region = (PartitionedRegion) rf.setPartitionAttributes(paf.create()).create("test");
-
- // prime the region so buckets get created
- region.put("test", "test");
- GatewaySenderAttributes g = new GatewaySenderAttributes();
- g.isHDFSQueue = true;
- g.id = "HDFSEntriesSetJUnitTest_Queue";
- ParallelAsyncEventQueueImpl gatewaySender = new ParallelAsyncEventQueueImpl(cache, g);
- Set<Region> set = new HashSet<Region>();
- set.add(region);
-
- queue = new HDFSParallelGatewaySenderQueue(gatewaySender, set, 0, 1);
- brq = (HDFSBucketRegionQueue)((PartitionedRegion) queue.getRegion()).getDataStore().getLocalBucketById(0);
- bucket = region.getDataStore().getLocalBucketById(0);
-
- HdfsRegionManager mgr = HDFSRegionDirector.getInstance().manageRegion(region, "test", null);
- hdfs = mgr.<SortedHoplogPersistedEvent>create(0);
- AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
- }
-
- public void tearDown() throws Exception {
- store.getFileSystem().delete(new Path("hoplogs"), true);
- hdfs.close();
-
- cache.close();
- }
-
- public void testEmptyIterator() throws Exception {
- checkIteration(Collections.<String>emptyList(), new KeyValue[] { }, new KeyValue[] { });
- }
-
- public void testQueueOnlyIterator() throws Exception {
- KeyValue[] qvals = new KeyValue[] {
- new KeyValue("K0", "0"),
- new KeyValue("K1", "1"),
- new KeyValue("K2", "2"),
- new KeyValue("K3", "3"),
- new KeyValue("K4", "4")
- };
- checkIteration(getExpected(), qvals, new KeyValue[] { });
- }
-
- public void testHdfsOnlyIterator() throws Exception {
- KeyValue[] hvals = new KeyValue[] {
- new KeyValue("K0", "0"),
- new KeyValue("K1", "1"),
- new KeyValue("K2", "2"),
- new KeyValue("K3", "3"),
- new KeyValue("K4", "4")
- };
- checkIteration(getExpected(), new KeyValue[] { }, hvals);
- }
-
- public void testUnevenIterator() throws Exception {
- KeyValue[] qvals = new KeyValue[] {
- new KeyValue("K0", "0"),
- new KeyValue("K2", "2"),
- };
-
- KeyValue[] hvals = new KeyValue[] {
- new KeyValue("K1", "1"),
- new KeyValue("K3", "3"),
- new KeyValue("K4", "4")
- };
-
- checkIteration(getExpected(), qvals, hvals);
- }
-
- public void testEitherOrIterator() throws Exception {
- KeyValue[] qvals = new KeyValue[] {
- new KeyValue("K0", "0"),
- new KeyValue("K2", "2"),
- new KeyValue("K4", "4")
- };
-
- KeyValue[] hvals = new KeyValue[] {
- new KeyValue("K1", "1"),
- new KeyValue("K3", "3")
- };
-
- checkIteration(getExpected(), qvals, hvals);
- }
-
- public void testDuplicateIterator() throws Exception {
- KeyValue[] qvals = new KeyValue[] {
- new KeyValue("K0", "0"),
- new KeyValue("K1", "1"),
- new KeyValue("K2", "2"),
- new KeyValue("K3", "3"),
- new KeyValue("K4", "4"),
- new KeyValue("K4", "4")
- };
-
- KeyValue[] hvals = new KeyValue[] {
- new KeyValue("K0", "0"),
- new KeyValue("K1", "1"),
- new KeyValue("K2", "2"),
- new KeyValue("K3", "3"),
- new KeyValue("K4", "4"),
- new KeyValue("K4", "4")
- };
-
- checkIteration(getExpected(), qvals, hvals);
- }
-
- private List<String> getExpected() {
- List<String> expected = new ArrayList<String>();
- expected.add("0");
- expected.add("1");
- expected.add("2");
- expected.add("3");
- expected.add("4");
- return expected;
- }
-
- private void checkIteration(List<String> expected, KeyValue[] qvals, KeyValue[] hvals)
- throws Exception {
- int seq = 0;
- List<PersistedEventImpl> evts = new ArrayList<PersistedEventImpl>();
- for (KeyValue kv : hvals) {
- evts.add(new SortedHDFSQueuePersistedEvent(getNewEvent(kv.key, kv.value, seq++)));
- }
- hdfs.flush(evts.iterator(), evts.size());
-
- for (KeyValue kv : qvals) {
- queue.put(getNewEvent(kv.key, kv.value, seq++));
- }
-
- List<String> actual = new ArrayList<String>();
- Iterator vals = new HDFSEntriesSet(bucket, brq, hdfs, IteratorType.VALUES, null).iterator();
- while (vals.hasNext()) {
- Object val = vals.next();
- if(val instanceof CachedDeserializable) {
- val = ((CachedDeserializable) val).getDeserializedForReading();
- }
- actual.add((String) val);
- }
-
- assertEquals(expected, actual);
- }
-
- private HDFSGatewayEventImpl getNewEvent(Object key, Object value, long seq) throws Exception {
- EntryEventImpl evt = EntryEventImpl.create(region, Operation.CREATE,
- key, value, null, false, (DistributedMember) cache.getMyId());
-
- evt.setEventId(new EventID(cache.getDistributedSystem()));
- HDFSGatewayEventImpl event = new HDFSGatewayEventImpl(EnumListenerEvent.AFTER_CREATE, evt, null, true, 0);
- event.setShadowKey(seq);
-
- return event;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
deleted file mode 100644
index b8cbb0d..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/HdfsStoreMutatorJUnitTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HdfsStoreMutatorJUnitTest extends BaseHoplogTestCase {
- public void testMutatorInitialState() {
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
- assertEquals(-1, mutator.getWriteOnlyFileRolloverInterval());
- assertEquals(-1, mutator.getWriteOnlyFileRolloverSize());
-
- assertEquals(-1, mutator.getInputFileCountMax());
- assertEquals(-1, mutator.getInputFileSizeMax());
- assertEquals(-1, mutator.getInputFileCountMin());
- assertEquals(-1, mutator.getMinorCompactionThreads());
- assertNull(mutator.getMinorCompaction());
-
- assertEquals(-1, mutator.getMajorCompactionInterval());
- assertEquals(-1, mutator.getMajorCompactionThreads());
- assertNull(mutator.getMajorCompaction());
-
- assertEquals(-1, mutator.getPurgeInterval());
-
- assertEquals(-1, mutator.getBatchSize());
- assertEquals(-1, mutator.getBatchInterval());
- }
-
- public void testMutatorSetInvalidValue() {
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
-
- try {
- mutator.setWriteOnlyFileRolloverInterval(-3);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- mutator.setWriteOnlyFileRolloverSize(-5);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
-
- try {
- mutator.setInputFileCountMin(-1);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- mutator.setInputFileCountMax(-1);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- mutator.setInputFileSizeMax(-1);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- mutator.setMinorCompactionThreads(-9);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- mutator.setMajorCompactionInterval(-6);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- mutator.setMajorCompactionThreads(-1);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- mutator.setPurgeInterval(-4);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
-/* try {
- qMutator.setBatchSizeMB(-985);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- try {
- qMutator.setBatchTimeInterval(-695);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
-*/
- try {
- mutator.setInputFileCountMin(10);
- mutator.setInputFileCountMax(5);
- hdfsStore.alter(mutator);
- fail();
- } catch (IllegalArgumentException e) {
- // expected
- }
- }
-
- public void testMutatorReturnsUpdatedValues() {
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
-
- mutator.setWriteOnlyFileRolloverInterval(121);
- mutator.setWriteOnlyFileRolloverSize(234);
-
- mutator.setInputFileCountMax(87);
- mutator.setInputFileSizeMax(45);
- mutator.setInputFileCountMin(34);
- mutator.setMinorCompactionThreads(843);
- mutator.setMinorCompaction(false);
-
- mutator.setMajorCompactionInterval(26);
- mutator.setMajorCompactionThreads(92);
- mutator.setMajorCompaction(false);
-
- mutator.setPurgeInterval(328);
-
- mutator.setBatchSize(985);
- mutator.setBatchInterval(695);
-
- assertEquals(121, mutator.getWriteOnlyFileRolloverInterval());
- assertEquals(234, mutator.getWriteOnlyFileRolloverSize());
-
- assertEquals(87, mutator.getInputFileCountMax());
- assertEquals(45, mutator.getInputFileSizeMax());
- assertEquals(34, mutator.getInputFileCountMin());
- assertEquals(843, mutator.getMinorCompactionThreads());
- assertFalse(mutator.getMinorCompaction());
-
- assertEquals(26, mutator.getMajorCompactionInterval());
- assertEquals(92, mutator.getMajorCompactionThreads());
- assertFalse(mutator.getMajorCompaction());
-
- assertEquals(328, mutator.getPurgeInterval());
-
- assertEquals(985, mutator.getBatchSize());
- assertEquals(695, mutator.getBatchInterval());
-
- // repeat the cycle once more
- mutator.setWriteOnlyFileRolloverInterval(14);
- mutator.setWriteOnlyFileRolloverSize(56);
-
- mutator.setInputFileCountMax(93);
- mutator.setInputFileSizeMax(85);
- mutator.setInputFileCountMin(64);
- mutator.setMinorCompactionThreads(59);
- mutator.setMinorCompaction(true);
-
- mutator.setMajorCompactionInterval(26);
- mutator.setMajorCompactionThreads(92);
- mutator.setMajorCompaction(false);
-
- mutator.setPurgeInterval(328);
-
- assertEquals(14, mutator.getWriteOnlyFileRolloverInterval());
- assertEquals(56, mutator.getWriteOnlyFileRolloverSize());
-
- assertEquals(93, mutator.getInputFileCountMax());
- assertEquals(85, mutator.getInputFileSizeMax());
- assertEquals(64, mutator.getInputFileCountMin());
- assertEquals(59, mutator.getMinorCompactionThreads());
- assertTrue(mutator.getMinorCompaction());
-
- assertEquals(26, mutator.getMajorCompactionInterval());
- assertEquals(92, mutator.getMajorCompactionThreads());
- assertFalse(mutator.getMajorCompaction());
-
- assertEquals(328, mutator.getPurgeInterval());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
deleted file mode 100644
index 3330574..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog;
-import com.gemstone.gemfire.cache30.CacheTestCase;
-import com.gemstone.gemfire.internal.FileUtil;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
-
-import dunit.AsyncInvocation;
-import dunit.Host;
-import dunit.SerializableCallable;
-import dunit.SerializableRunnable;
-import dunit.VM;
-
-@SuppressWarnings({"serial", "rawtypes", "unchecked"})
-public abstract class RegionWithHDFSTestBase extends CacheTestCase {
-
- protected String tmpDir;
-
- public static String homeDir = null;
-
- protected abstract void checkWithGetAll(String uniqueName, ArrayList arrayl);
-
- protected abstract void checkWithGet(String uniqueName, int start,
- int end, boolean expectValue);
-
- protected abstract void doDestroys(final String uniqueName, int start, int end);
-
- protected abstract void doPutAll(final String uniqueName, Map map);
-
- protected abstract void doPuts(final String uniqueName, int start, int end);
-
- protected abstract SerializableCallable getCreateRegionCallable(final int totalnumOfBuckets, final int batchSizeMB,
- final int maximumEntries, final String folderPath, final String uniqueName, final int batchInterval, final boolean queuePersistent,
- final boolean writeonly, final long timeForRollover, final long maxFileSize);
-
- protected abstract void verifyHDFSData(VM vm, String uniqueName) throws Exception ;
-
- protected abstract AsyncInvocation doAsyncPuts(VM vm, final String regionName,
- final int start, final int end, final String suffix) throws Exception;
-
- public RegionWithHDFSTestBase(String name) {
- super(name);
- }
-
- @Override
- public void tearDown2() throws Exception {
- super.tearDown2();
- for (int h = 0; h < Host.getHostCount(); h++) {
- Host host = Host.getHost(h);
- SerializableCallable cleanUp = cleanUpStoresAndDisconnect();
- for (int v = 0; v < host.getVMCount(); v++) {
- VM vm = host.getVM(v);
- // This store will be deleted by the first VM itself. Invocations from
- // subsequent VMs will be no-op.
- vm.invoke(cleanUp);
- }
- }
- }
-
- public SerializableCallable cleanUpStoresAndDisconnect() throws Exception {
- SerializableCallable cleanUp = new SerializableCallable("cleanUpStoresAndDisconnect") {
- public Object call() throws Exception {
- disconnectFromDS();
- File file;
- if (homeDir != null) {
- file = new File(homeDir);
- FileUtil.delete(file);
- homeDir = null;
- }
- file = new File(tmpDir);
- FileUtil.delete(file);
- return 0;
- }
- };
- return cleanUp;
- }
-
- @Override
- public void setUp() throws Exception {
- super.setUp();
- tmpDir = /*System.getProperty("java.io.tmpdir") + "/" +*/ "RegionWithHDFSBasicDUnitTest_" + System.nanoTime();
- }
-
- int createServerRegion(VM vm, final int totalnumOfBuckets,
- final int batchSize, final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval) {
- return createServerRegion(vm, totalnumOfBuckets,
- batchSize, maximumEntries, folderPath,
- uniqueName, batchInterval, false, false);
- }
-
- protected int createServerRegion(VM vm, final int totalnumOfBuckets,
- final int batchSizeMB, final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval, final boolean writeonly,
- final boolean queuePersistent) {
- return createServerRegion(vm, totalnumOfBuckets,
- batchSizeMB, maximumEntries, folderPath,
- uniqueName, batchInterval, writeonly, queuePersistent, -1, -1);
- }
- protected int createServerRegion(VM vm, final int totalnumOfBuckets,
- final int batchSizeMB, final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval, final boolean writeonly,
- final boolean queuePersistent, final long timeForRollover, final long maxFileSize) {
- SerializableCallable createRegion = getCreateRegionCallable(
- totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName,
- batchInterval, queuePersistent, writeonly, timeForRollover, maxFileSize);
-
- return (Integer) vm.invoke(createRegion);
- }
- protected AsyncInvocation createServerRegionAsync(VM vm, final int totalnumOfBuckets,
- final int batchSizeMB, final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval, final boolean writeonly,
- final boolean queuePersistent) {
- SerializableCallable createRegion = getCreateRegionCallable(
- totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName,
- batchInterval, queuePersistent, writeonly, -1, -1);
-
- return vm.invokeAsync(createRegion);
- }
- protected AsyncInvocation createServerRegionAsync(VM vm, final int totalnumOfBuckets,
- final int batchSizeMB, final int maximumEntries, final String folderPath,
- final String uniqueName, final int batchInterval, final boolean writeonly,
- final boolean queuePersistent, final long timeForRollover, final long maxFileSize) {
- SerializableCallable createRegion = getCreateRegionCallable(
- totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName,
- batchInterval, queuePersistent, writeonly, timeForRollover, maxFileSize);
-
- return vm.invokeAsync(createRegion);
- }
-
- /**
- * Does puts, gets, destroy and getAll. Since there are many updates
- * most of the time the data is not found in memory and queue and
- * is fetched from HDFS
- * @throws Throwable
- */
- public void testGetFromHDFS() throws Throwable {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- final String uniqueName = getName();
- final String homeDir = "../../testGetFromHDFS";
-
- createServerRegion(vm0, 7, 1, 50, homeDir, uniqueName, 50, false, true);
- createServerRegion(vm1, 7, 1, 50, homeDir, uniqueName, 50, false, true);
-
- // Do some puts
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- doPuts(uniqueName, 0, 40);
- return null;
- }
- });
-
- // Do some puts and destroys
- // some order manipulation has been done because of an issue:
- // " a higher version update on a key can be batched and
- // sent to HDFS before a lower version update on the same key
- // is batched and sent to HDFS. This will cause the latest
- // update on a key in an older file. Hence, a fetch from HDFS
- // will return an older update from a newer file."
-
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- doPuts(uniqueName, 40, 50);
- doDestroys(uniqueName, 40, 50);
- doPuts(uniqueName, 50, 100);
- doPuts(uniqueName, 30, 40);
- return null;
- }
- });
-
- // do some more puts and destroy
- // some order manipulation has been done because of an issue:
- // " a higher version update on a key can be batched and
- // sent to HDFS before a lower version update on the same key
- // is batched and sent to HDFS. This will cause the latest
- // update on a key in an older file. Hence, a fetch from HDFS
- // will return an older update from a newer file."
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- doPuts(uniqueName, 80, 90);
- doDestroys(uniqueName, 80, 90);
- doPuts(uniqueName, 110, 200);
- doPuts(uniqueName, 90, 110);
- return null;
- }
-
- });
-
- // get and getall the values and compare them.
- SerializableCallable checkData = new SerializableCallable() {
- public Object call() throws Exception {
- checkWithGet(uniqueName, 0, 40, true);
- checkWithGet(uniqueName, 40, 50, false);
- checkWithGet(uniqueName, 50, 80, true);
- checkWithGet(uniqueName, 80, 90, false);
- checkWithGet(uniqueName, 90, 200, true);
- checkWithGet(uniqueName, 200, 201, false);
-
- ArrayList arrayl = new ArrayList();
- for (int i =0; i< 200; i++) {
- String k = "K" + i;
- if ( !((40 <= i && i < 50) || (80 <= i && i < 90)))
- arrayl.add(k);
- }
- checkWithGetAll(uniqueName, arrayl);
-
- return null;
- }
- };
- vm1.invoke(checkData);
-
- //Restart the members and verify that we can still get the data
- closeCache(vm0);
- closeCache(vm1);
- AsyncInvocation async0 = createServerRegionAsync(vm0, 7, 1, 50, homeDir, uniqueName, 50, false, true);
- AsyncInvocation async1 = createServerRegionAsync(vm1, 7, 1, 50, homeDir, uniqueName, 50, false, true);
-
- async0.getResult();
- async1.getResult();
-
-
- // get and getall the values and compare them.
- vm1.invoke(checkData);
-
- //TODO:HDFS we are just reading the files here. Need to verify
- // once the folder structure is finalized.
- dumpFiles(vm1, uniqueName);
-
- }
-
- /**
- * puts a few entries (keys with multiple updates ). Gets them immediately.
- * High probability that it gets it from async queue.
- */
- public void testGetForAsyncQueue() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- final String uniqueName = getName();
- final String homeDir = "../../testGetForAsyncQueue";
-
- createServerRegion(vm0, 2, 5, 1, homeDir, uniqueName, 10000);
- createServerRegion(vm1, 2, 5, 1, homeDir, uniqueName, 10000);
-
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- doPuts(uniqueName, 0, 4);
- return null;
- }
- });
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- doPuts(uniqueName, 0, 2);
- doDestroys(uniqueName, 2, 3);
- doPuts(uniqueName, 3, 7);
-
- checkWithGet(uniqueName, 0, 2, true);
- checkWithGet(uniqueName, 2, 3, false);
- checkWithGet(uniqueName, 3, 7, true);
- return null;
- }
- });
- }
-
- /**
- * puts a few entries (keys with multiple updates ). Calls getAll immediately.
- * High probability that it gets it from async queue.
- */
- public void testGetAllForAsyncQueue() {
-
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- final String uniqueName = getName();
- createServerRegion(vm0, 2, 5, 2, uniqueName, uniqueName, 10000);
- createServerRegion(vm1, 2, 5, 2, uniqueName, uniqueName, 10000);
-
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- doPuts(uniqueName, 0, 4);
- return null;
- }
- });
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- doPuts(uniqueName, 1, 5);
-
- ArrayList arrayl = new ArrayList();
- for (int i =0; i< 5; i++) {
- String k = "K" + i;
- arrayl.add(k);
- }
- checkWithGetAll(uniqueName, arrayl);
- return null;
- }
- });
- }
-
- /**
- * puts a few entries (keys with multiple updates ). Calls getAll immediately.
- * High probability that it gets it from async queue.
- */
- public void testPutAllForAsyncQueue() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- final String uniqueName = getName();
- final String homeDir = "../../testPutAllForAsyncQueue";
- createServerRegion(vm0, 2, 5, 2, homeDir, uniqueName, 10000);
- createServerRegion(vm1, 2, 5, 2, homeDir, uniqueName, 10000);
-
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- HashMap putAllmap = new HashMap();
- for (int i =0; i< 4; i++)
- putAllmap.put("K" + i, "V"+ i );
- doPutAll(uniqueName, putAllmap);
- return null;
- }
- });
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- HashMap putAllmap = new HashMap();
- for (int i =1; i< 5; i++)
- putAllmap.put("K" + i, "V"+ i );
- doPutAll(uniqueName, putAllmap);
- checkWithGet(uniqueName, 0, 5, true);
- return null;
- }
- });
- }
-
- /**
- * Does putAll and get. Since there are many updates
- * most of the time the data is not found in memory and queue and
- * is fetched from HDFS
- */
- public void _testPutAllAndGetFromHDFS() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
-
- final String uniqueName = getName();
- final String homeDir = "../../testPutAllAndGetFromHDFS";
- createServerRegion(vm0, 7, 1, 500, homeDir, uniqueName, 500);
- createServerRegion(vm1, 7, 1, 500, homeDir, uniqueName, 500);
-
- // Do some puts
- vm0.invoke(new SerializableCallable() {
- public Object call() throws Exception {
-
- HashMap putAllmap = new HashMap();
-
- for (int i =0; i< 500; i++)
- putAllmap.put("K" + i, "V"+ i );
- doPutAll(uniqueName, putAllmap);
- return null;
- }
- });
-
- // Do putAll and some destroys
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- HashMap putAllmap = new HashMap();
- for (int i = 500; i< 1000; i++)
- putAllmap.put("K" + i, "V"+ i );
- doPutAll(uniqueName, putAllmap);
- return null;
- }
- });
-
- // do some more puts
- // some order manipulation has been done because of an issue:
- // " a higher version update on a key can be batched and
- // sent to HDFS before a lower version update on the same key
- // is batched and sent to HDFS. This will cause the latest
- // update on a key in an older file. Hence, a fetch from HDFS
- // will return an older update from a newer file."
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- HashMap putAllmap = new HashMap();
- for (int i =1100; i< 2000; i++)
- putAllmap.put("K" + i, "V"+ i );
- doPutAll(uniqueName, putAllmap);
- putAllmap = new HashMap();
- for (int i = 900; i< 1100; i++)
- putAllmap.put("K" + i, "V"+ i );
- doPutAll(uniqueName, putAllmap);
- return null;
- }
-
- });
-
- // get and getall the values and compare them.
- vm1.invoke(new SerializableCallable() {
- public Object call() throws Exception {
- checkWithGet(uniqueName, 0, 2000, true);
- checkWithGet(uniqueName, 2000, 2001, false);
-
- ArrayList arrayl = new ArrayList();
- for (int i =0; i< 2000; i++) {
- String k = "K" + i;
- arrayl.add(k);
- }
- checkWithGetAll(uniqueName, arrayl);
- return null;
- }
- });
-
- }
-
- public void _testWObasicClose() throws Throwable{
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
- VM vm3 = host.getVM(3);
-
- String homeDir = "../../testWObasicClose";
- final String uniqueName = getName();
-
- createServerRegion(vm0, 11, 1, 500, homeDir, uniqueName, 500, true, false);
- createServerRegion(vm1, 11, 1, 500, homeDir, uniqueName, 500, true, false);
- createServerRegion(vm2, 11, 1, 500, homeDir, uniqueName, 500, true, false);
- createServerRegion(vm3, 11, 1, 500, homeDir, uniqueName, 500, true, false);
-
- AsyncInvocation a1 = doAsyncPuts(vm0, uniqueName, 1, 50, "vm0");
- AsyncInvocation a2 = doAsyncPuts(vm1, uniqueName, 40, 100, "vm1");
- AsyncInvocation a3 = doAsyncPuts(vm2, uniqueName, 40, 100, "vm2");
- AsyncInvocation a4 = doAsyncPuts(vm3, uniqueName, 90, 150, "vm3");
-
- a1.join();
- a2.join();
- a3.join();
- a4.join();
-
- Thread.sleep(5000);
- cacheClose (vm0, false);
- cacheClose (vm1, false);
- cacheClose (vm2, false);
- cacheClose (vm3, false);
-
- AsyncInvocation async1 = createServerRegionAsync(vm0, 11, 1, 500, homeDir, uniqueName, 500, true, false);
- AsyncInvocation async2 = createServerRegionAsync(vm1, 11, 1, 500, homeDir, uniqueName, 500, true, false);
- AsyncInvocation async3 = createServerRegionAsync(vm2, 11, 1, 500, homeDir, uniqueName, 500, true, false);
- AsyncInvocation async4 = createServerRegionAsync(vm3, 11, 1, 500, homeDir, uniqueName, 500, true, false);
- async1.getResult();
- async2.getResult();
- async3.getResult();
- async4.getResult();
-
- verifyHDFSData(vm0, uniqueName);
-
- cacheClose (vm0, false);
- cacheClose (vm1, false);
- cacheClose (vm2, false);
- cacheClose (vm3, false);
- }
-
-
- protected void cacheClose(VM vm, final boolean sleep){
- vm.invoke( new SerializableCallable() {
- public Object call() throws Exception {
- if (sleep)
- Thread.sleep(2000);
- getCache().getLogger().info("Cache close in progress ");
- getCache().close();
- getCache().getLogger().info("Cache closed");
- return null;
- }
- });
-
- }
-
- protected void verifyInEntriesMap (HashMap<String, String> entriesMap, int start, int end, String suffix) {
- for (int i =start; i< end; i++) {
- String k = "K" + i;
- String v = "V"+ i + suffix;
- Object s = entriesMap.get(v);
- assertTrue( "The expected key " + k+ " didn't match the received value " + s + ". value: " + v, k.equals(s));
- }
- }
-
- /**
- * Reads all the sequence files and returns the list of key value pairs persisted.
- * Returns the key value pair as <value, key> tuple as there can be multiple values
- * for a key
- * @throws Exception
- */
- protected HashMap<String, HashMap<String, String>> createFilesAndEntriesMap(VM vm0, final String uniqueName, final String regionName) throws Exception {
- HashMap<String, HashMap<String, String>> entriesToFileMap = (HashMap<String, HashMap<String, String>>)
- vm0.invoke( new SerializableCallable() {
- public Object call() throws Exception {
- HashMap<String, HashMap<String, String>> entriesToFileMap = new HashMap<String, HashMap<String, String>>();
- HDFSStoreImpl hdfsStore = (HDFSStoreImpl) ((GemFireCacheImpl)getCache()).findHDFSStore(uniqueName);
- FileSystem fs = hdfsStore.getFileSystem();
- System.err.println("dumping file names in HDFS directory: " + hdfsStore.getHomeDir());
- try {
- Path basePath = new Path(hdfsStore.getHomeDir());
- Path regionPath = new Path(basePath, regionName);
- RemoteIterator<LocatedFileStatus> files = fs.listFiles(regionPath, true);
-
- while(files.hasNext()) {
- HashMap<String, String> entriesMap = new HashMap<String, String>();
- LocatedFileStatus next = files.next();
- /* MergeGemXDHDFSToGFE - Disabled as I am not pulling in DunitEnv */
- // System.err.println(DUnitEnv.get().getPid() + " - " + next.getPath());
- System.err.println(" - " + next.getPath());
- readSequenceFile(fs, next.getPath(), entriesMap);
- entriesToFileMap.put(next.getPath().getName(), entriesMap);
- }
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- return entriesToFileMap;
- }
- @SuppressWarnings("deprecation")
- public void readSequenceFile(FileSystem inputFS, Path sequenceFileName,
- HashMap<String, String> entriesMap) throws IOException {
- SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
- HoplogIterator<byte[], byte[]> iter = hoplog.getReader().scan();
- try {
- while (iter.hasNext()) {
- iter.next();
- PersistedEventImpl te = UnsortedHoplogPersistedEvent.fromBytes(iter.getValue());
- String stringkey = ((String)CacheServerHelper.deserialize(iter.getKey()));
- String value = (String) te.getDeserializedValue();
- entriesMap.put(value, stringkey);
- if (getCache().getLoggerI18n().fineEnabled())
- getCache().getLoggerI18n().fine("Key: " + stringkey + " value: " + value + " path " + sequenceFileName.getName());
- }
- } catch (Exception e) {
- assertTrue(e.toString(), false);
- }
- iter.close();
- hoplog.close();
- }
- });
- return entriesToFileMap;
- }
- protected SerializableCallable validateEmpty(VM vm0, final int numEntries, final String uniqueName) {
- SerializableCallable validateEmpty = new SerializableCallable("validateEmpty") {
- public Object call() throws Exception {
- Region r = getRootRegion(uniqueName);
-
- assertTrue(r.isEmpty());
-
- //validate region is empty on peer as well
- assertFalse(r.entrySet().iterator().hasNext());
- //Make sure the region is empty
- for (int i =0; i< numEntries; i++) {
- assertEquals("failure on key K" + i , null, r.get("K" + i));
- }
-
- return null;
- }
- };
-
- vm0.invoke(validateEmpty);
- return validateEmpty;
- }
-
- protected void closeCache(VM vm0) {
- //Restart and validate still empty.
- SerializableRunnable closeCache = new SerializableRunnable("close cache") {
- @Override
- public void run() {
- getCache().close();
- disconnectFromDS();
- }
- };
-
- vm0.invoke(closeCache);
- }
-
- protected void verifyDataInHDFS(VM vm0, final String uniqueName, final boolean shouldHaveData,
- final boolean wait, final boolean waitForQueueToDrain, final int numEntries) {
- vm0.invoke(new SerializableCallable("check for data in hdfs") {
- @Override
- public Object call() throws Exception {
-
- HDFSRegionDirector director = HDFSRegionDirector.getInstance();
- final SortedOplogStatistics stats = director.getHdfsRegionStats("/" + uniqueName);
- waitForCriterion(new WaitCriterion() {
- @Override
- public boolean done() {
- return stats.getActiveFileCount() > 0 == shouldHaveData;
- }
-
- @Override
- public String description() {
- return "Waiting for active file count to be greater than 0: " + stats.getActiveFileCount() + " stats=" + System.identityHashCode(stats);
- }
- }, 30000, 100, true);
-
- if(waitForQueueToDrain) {
- PartitionedRegion region = (PartitionedRegion) getCache().getRegion(uniqueName);
- final AsyncEventQueueStats queueStats = region.getHDFSEventQueueStats();
- waitForCriterion(new WaitCriterion() {
- @Override
- public boolean done() {
- return queueStats.getEventQueueSize() <= 0;
- }
-
- @Override
- public String description() {
- return "Waiting for queue stats to reach 0: " + queueStats.getEventQueueSize();
- }
- }, 30000, 100, true);
- }
- return null;
- }
- });
- }
-
- protected void doPuts(VM vm0, final String uniqueName, final int numEntries) {
- // Do some puts
- vm0.invoke(new SerializableCallable("do puts") {
- public Object call() throws Exception {
- Region r = getRootRegion(uniqueName);
- for (int i =0; i< numEntries; i++)
- r.put("K" + i, "V"+ i );
- return null;
- }
- });
- }
-
- protected void validate(VM vm1, final String uniqueName, final int numEntries) {
- SerializableCallable validate = new SerializableCallable("validate") {
- public Object call() throws Exception {
- Region r = getRootRegion(uniqueName);
-
- for (int i =0; i< numEntries; i++) {
- assertEquals("failure on key K" + i , "V"+ i, r.get("K" + i));
- }
-
- return null;
- }
- };
- vm1.invoke(validate);
- }
-
- protected void dumpFiles(VM vm0, final String uniqueName) {
- vm0.invoke(new SerializableRunnable() {
-
- @Override
- public void run() {
- HDFSStoreImpl hdfsStore = (HDFSStoreImpl) ((GemFireCacheImpl)getCache()).findHDFSStore(uniqueName);
- FileSystem fs;
- try {
- fs = hdfsStore.getFileSystem();
- } catch (IOException e1) {
- throw new HDFSIOException(e1.getMessage(), e1);
- }
- System.err.println("dumping file names in HDFS directory: " + hdfsStore.getHomeDir());
- try {
- RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(hdfsStore.getHomeDir()), true);
-
- while(files.hasNext()) {
- LocatedFileStatus next = files.next();
- /* MergeGemXDHDFSToGFE - Disabled as I am not pulling in DunitEnv */
- // System.err.println(DUnitEnv.get().getPid() + " - " + next.getPath());
- System.err.println(" - " + next.getPath());
- }
- } catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java
deleted file mode 100644
index 07d9f77..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-
-import junit.framework.TestCase;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionFactory;
-import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.SerializedCacheValue;
-import com.gemstone.gemfire.cache.TransactionId;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHDFSQueuePersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl.FileSystemFactory;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
-import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-
-import dunit.DistributedTestCase;
-import dunit.DistributedTestCase.ExpectedException;
-
-public abstract class BaseHoplogTestCase extends TestCase {
- public static final String HDFS_STORE_NAME = "hdfs";
- public static final Random rand = new Random(System.currentTimeMillis());
- protected Path testDataDir;
- protected Cache cache;
-
- protected HDFSRegionDirector director;
- protected HdfsRegionManager regionManager;
- protected HDFSStoreFactory hsf;
- protected HDFSStoreImpl hdfsStore;
- protected RegionFactory<Object, Object> regionfactory;
- protected Region<Object, Object> region;
- protected SortedOplogStatistics stats;
- protected HFileStoreStatistics storeStats;
- protected BlockCache blockCache;
-
- Set<ExpectedException> exceptions = new HashSet<ExpectedException>();
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
-
- //This is logged by HDFS when it is stopped.
- exceptions.add(DistributedTestCase.addExpectedException("sleep interrupted"));
- exceptions.add(DistributedTestCase.addExpectedException("java.io.InterruptedIOException"));
-
- testDataDir = new Path("test-case");
-
- cache = createCache();
-
- configureHdfsStoreFactory();
- hdfsStore = (HDFSStoreImpl) hsf.create(HDFS_STORE_NAME);
-
- regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION);
-// regionfactory.setHDFSStoreName(HDFS_STORE_NAME);
- region = regionfactory.create(getName());
-
- // disable compaction by default and clear existing queues
- HDFSCompactionManager compactionManager = HDFSCompactionManager.getInstance(hdfsStore);
- compactionManager.reset();
-
- director = HDFSRegionDirector.getInstance();
- director.setCache(cache);
- regionManager = ((LocalRegion)region).getHdfsRegionManager();
- stats = director.getHdfsRegionStats("/" + getName());
- storeStats = hdfsStore.getStats();
- blockCache = hdfsStore.getBlockCache();
- AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
- }
-
- protected void configureHdfsStoreFactory() throws Exception {
- hsf = this.cache.createHDFSStoreFactory();
- hsf.setHomeDir(testDataDir.toString());
- hsf.setMinorCompaction(false);
- hsf.setMajorCompaction(false);
- }
-
- protected Cache createCache() {
- CacheFactory cf = new CacheFactory().set("mcast-port", "0")
- .set("log-level", "info")
- ;
- cache = cf.create();
- return cache;
- }
-
- @Override
- protected void tearDown() throws Exception {
- if (region != null) {
- region.destroyRegion();
- }
-
- if (hdfsStore != null) {
- hdfsStore.getFileSystem().delete(testDataDir, true);
- hdfsStore.destroy();
- }
-
- if (cache != null) {
- cache.close();
- }
- super.tearDown();
- for (ExpectedException ex: exceptions) {
- ex.remove();
- }
- }
-
- /**
- * creates a hoplog file with numKeys records. Keys follow key-X pattern and values follow value-X
- * pattern where X=0 to X is = numKeys -1
- *
- * @return the sorted map of inserted KVs
- */
- protected TreeMap<String, String> createHoplog(int numKeys, Hoplog oplog) throws IOException {
- int offset = (numKeys > 10 ? 100000 : 0);
-
- HoplogWriter writer = oplog.createWriter(numKeys);
- TreeMap<String, String> map = new TreeMap<String, String>();
- for (int i = offset; i < (numKeys + offset); i++) {
- String key = ("key-" + i);
- String value = ("value-" + System.nanoTime());
- writer.append(key.getBytes(), value.getBytes());
- map.put(key, value);
- }
- writer.close();
- return map;
- }
-
- protected FileStatus[] getBucketHoplogs(String regionAndBucket, final String type)
- throws IOException {
- return getBucketHoplogs(hdfsStore.getFileSystem(), regionAndBucket, type);
- }
-
- protected FileStatus[] getBucketHoplogs(FileSystem fs, String regionAndBucket, final String type)
- throws IOException {
- FileStatus[] hoplogs = fs.listStatus(
- new Path(testDataDir, regionAndBucket), new PathFilter() {
- @Override
- public boolean accept(Path file) {
- return file.getName().endsWith(type);
- }
- });
- return hoplogs;
- }
-
- protected String getRandomHoplogName() {
- String hoplogName = "hoplog-" + System.nanoTime() + "-" + rand.nextInt(10000) + ".hop";
- return hoplogName;
- }
-
-// public static MiniDFSCluster initMiniCluster(int port, int numDN) throws Exception {
-// HashMap<String, String> map = new HashMap<String, String>();
-// map.put(DFSConfigKeys.DFS_REPLICATION_KEY, "1");
-// return initMiniCluster(port, numDN, map);
-// }
-//
-// public static MiniDFSCluster initMiniCluster(int port, int numDN, HashMap<String, String> map) throws Exception {
-// System.setProperty("test.build.data", "hdfs-test-cluster");
-// Configuration hconf = new HdfsConfiguration();
-// for (Entry<String, String> entry : map.entrySet()) {
-// hconf.set(entry.getKey(), entry.getValue());
-// }
-//
-// hconf.set("dfs.namenode.fs-limits.min-block-size", "1024");
-//
-// Builder builder = new MiniDFSCluster.Builder(hconf);
-// builder.numDataNodes(numDN);
-// builder.nameNodePort(port);
-// MiniDFSCluster cluster = builder.build();
-// return cluster;
-// }
-
- public static void setConfigFile(HDFSStoreFactory factory, File configFile, String config)
- throws Exception {
- BufferedWriter bw = new BufferedWriter(new FileWriter(configFile));
- bw.write(config);
- bw.close();
- factory.setHDFSClientConfigFile(configFile.getName());
- }
-
- public static void alterMajorCompaction(HDFSStoreImpl store, boolean enable) {
- HDFSStoreMutator mutator = store.createHdfsStoreMutator();
- mutator.setMajorCompaction(enable);
- store.alter(mutator);
- }
-
- public static void alterMinorCompaction(HDFSStoreImpl store, boolean enable) {
- HDFSStoreMutator mutator = store.createHdfsStoreMutator();
- mutator.setMinorCompaction(enable);
- store.alter(mutator);
- }
-
- public void deleteMiniClusterDir() throws Exception {
- File clusterDir = new File("hdfs-test-cluster");
- if (clusterDir.exists()) {
- FileUtils.deleteDirectory(clusterDir);
- }
- }
-
- public static class TestEvent extends SortedHDFSQueuePersistedEvent {
- Object key;
-
- public TestEvent(String k, String v) throws Exception {
- this(k, v, Operation.PUT_IF_ABSENT);
- }
-
- public TestEvent(String k, String v, Operation op) throws Exception {
- super(v, op, (byte) 0x02, false, new DiskVersionTag(), BlobHelper.serializeToBlob(k), 0);
- this.key = k;
- }
-
- public Object getKey() {
- return key;
-
- }
-
- public Object getNewValue() {
- return valueObject;
- }
-
- public Operation getOperation() {
- return op;
- }
-
- public Region<Object, Object> getRegion() {
- return null;
- }
-
- public Object getCallbackArgument() {
- return null;
- }
-
- public boolean isCallbackArgumentAvailable() {
- return false;
- }
-
- public boolean isOriginRemote() {
- return false;
- }
-
- public DistributedMember getDistributedMember() {
- return null;
- }
-
- public boolean isExpiration() {
- return false;
- }
-
- public boolean isDistributed() {
- return false;
- }
-
- public Object getOldValue() {
- return null;
- }
-
- public SerializedCacheValue<Object> getSerializedOldValue() {
- return null;
- }
-
- public SerializedCacheValue<Object> getSerializedNewValue() {
- return null;
- }
-
- public boolean isLocalLoad() {
- return false;
- }
-
- public boolean isNetLoad() {
- return false;
- }
-
- public boolean isLoad() {
- return false;
- }
-
- public boolean isNetSearch() {
- return false;
- }
-
- public TransactionId getTransactionId() {
- return null;
- }
-
- public boolean isBridgeEvent() {
- return false;
- }
-
- public boolean hasClientOrigin() {
- return false;
- }
-
- public boolean isOldValueAvailable() {
- return false;
- }
- }
-
- public abstract class AbstractCompactor implements Compactor {
- @Override
- public HDFSStore getHdfsStore() {
- return hdfsStore;
- }
-
- public void suspend() {
- }
-
- public void resume() {
- }
-
- public boolean isBusy(boolean isMajor) {
- return false;
- }
- }
-
- public HDFSStoreFactoryImpl getCloseableLocalHdfsStoreFactory() {
- final FileSystemFactory fsFactory = new FileSystemFactory() {
- // by default local FS instance is not disabled by close. Hence this
- // customization
- class CustomFileSystem extends LocalFileSystem {
- boolean isClosed = false;
-
- public void close() throws IOException {
- isClosed = true;
- super.close();
- }
-
- public FileStatus getFileStatus(Path f) throws IOException {
- if (isClosed) {
- throw new IOException();
- }
- return super.getFileStatus(f);
- }
- }
-
- public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException {
- CustomFileSystem fs = new CustomFileSystem();
- fs.initialize(namenode, conf);
- return fs;
- }
- };
-
- HDFSStoreFactoryImpl storeFactory = new HDFSStoreFactoryImpl(cache) {
- public HDFSStore create(String name) {
- return new HDFSStoreImpl(name, this.configHolder) {
- public FileSystemFactory getFileSystemFactory() {
- return fsFactory;
- }
- };
- }
- };
- return storeFactory;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java
deleted file mode 100644
index db050b3..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest
-;
-
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class CardinalityEstimatorJUnitTest extends BaseHoplogTestCase {
-
- public void testSingleHoplogCardinality() throws Exception {
- int count = 10;
- int bucketId = (int) System.nanoTime();
- HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- // flush and create hoplog
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < count; i++) {
- items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
- }
- // assert that size is 0 before flush begins
- assertEquals(0, organizer.sizeEstimate());
- organizer.flush(items.iterator(), count);
-
- assertEquals(count, organizer.sizeEstimate());
- assertEquals(0, stats.getActiveReaderCount());
-
- organizer.close();
- organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
- assertEquals(count, organizer.sizeEstimate());
- assertEquals(1, stats.getActiveReaderCount());
- }
-
- public void testSingleHoplogCardinalityWithDuplicates() throws Exception {
- int bucketId = (int) System.nanoTime();
- HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- List<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent("key-0", "value-0"));
- items.add(new TestEvent("key-0", "value-0"));
- items.add(new TestEvent("key-1", "value-1"));
- items.add(new TestEvent("key-2", "value-2"));
- items.add(new TestEvent("key-3", "value-3"));
- items.add(new TestEvent("key-3", "value-3"));
- items.add(new TestEvent("key-4", "value-4"));
-
- organizer.flush(items.iterator(), 7);
- assertEquals(5, organizer.sizeEstimate());
- }
-
- public void testMultipleHoplogCardinality() throws Exception {
- int bucketId = (int) System.nanoTime();
- HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- List<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent("key-0", "value-0"));
- items.add(new TestEvent("key-1", "value-1"));
- items.add(new TestEvent("key-2", "value-2"));
- items.add(new TestEvent("key-3", "value-3"));
- items.add(new TestEvent("key-4", "value-4"));
-
- organizer.flush(items.iterator(), 5);
- assertEquals(5, organizer.sizeEstimate());
-
- items.clear();
- items.add(new TestEvent("key-1", "value-0"));
- items.add(new TestEvent("key-5", "value-5"));
- items.add(new TestEvent("key-6", "value-6"));
- items.add(new TestEvent("key-7", "value-7"));
- items.add(new TestEvent("key-8", "value-8"));
- items.add(new TestEvent("key-9", "value-9"));
-
- organizer.flush(items.iterator(), 6);
- assertEquals(10, organizer.sizeEstimate());
- }
-
- public void testCardinalityAfterRestart() throws Exception {
- int bucketId = (int) System.nanoTime();
- HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- List<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent("key-0", "value-0"));
- items.add(new TestEvent("key-1", "value-1"));
- items.add(new TestEvent("key-2", "value-2"));
- items.add(new TestEvent("key-3", "value-3"));
- items.add(new TestEvent("key-4", "value-4"));
-
- assertEquals(0, organizer.sizeEstimate());
- organizer.flush(items.iterator(), 5);
- assertEquals(5, organizer.sizeEstimate());
-
- // restart
- organizer.close();
- organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
- assertEquals(5, organizer.sizeEstimate());
-
- items.clear();
- items.add(new TestEvent("key-1", "value-0"));
- items.add(new TestEvent("key-5", "value-5"));
- items.add(new TestEvent("key-6", "value-6"));
- items.add(new TestEvent("key-7", "value-7"));
- items.add(new TestEvent("key-8", "value-8"));
- items.add(new TestEvent("key-9", "value-9"));
-
- organizer.flush(items.iterator(), 6);
- assertEquals(10, organizer.sizeEstimate());
-
- // restart - make sure that HLL from the youngest file is read
- organizer.close();
- organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
- assertEquals(10, organizer.sizeEstimate());
-
- items.clear();
- items.add(new TestEvent("key-1", "value-1"));
- items.add(new TestEvent("key-5", "value-5"));
- items.add(new TestEvent("key-10", "value-10"));
- items.add(new TestEvent("key-11", "value-11"));
- items.add(new TestEvent("key-12", "value-12"));
- items.add(new TestEvent("key-13", "value-13"));
- items.add(new TestEvent("key-14", "value-14"));
-
- organizer.flush(items.iterator(), 7);
- assertEquals(15, organizer.sizeEstimate());
- }
-
- public void testCardinalityAfterMajorCompaction() throws Exception {
- doCardinalityAfterCompactionWork(true);
- }
-
- public void testCardinalityAfterMinorCompaction() throws Exception {
- doCardinalityAfterCompactionWork(false);
- }
-
- private void doCardinalityAfterCompactionWork(boolean isMajor) throws Exception {
- int bucketId = (int) System.nanoTime();
- HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- List<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent("key-0", "value-0"));
- items.add(new TestEvent("key-1", "value-1"));
- items.add(new TestEvent("key-2", "value-2"));
- items.add(new TestEvent("key-3", "value-3"));
- items.add(new TestEvent("key-4", "value-4"));
-
- organizer.flush(items.iterator(), 5);
- assertEquals(5, organizer.sizeEstimate());
-
- items.clear();
- items.add(new TestEvent("key-0", "value-0"));
- items.add(new TestEvent("key-1", "value-5", Operation.DESTROY));
- items.add(new TestEvent("key-2", "value-6", Operation.INVALIDATE));
- items.add(new TestEvent("key-5", "value-5"));
-
- organizer.flush(items.iterator(), 4);
- assertEquals(6, organizer.sizeEstimate());
-
- items.clear();
- items.add(new TestEvent("key-3", "value-5", Operation.DESTROY));
- items.add(new TestEvent("key-4", "value-6", Operation.INVALIDATE));
- items.add(new TestEvent("key-5", "value-0"));
- items.add(new TestEvent("key-6", "value-5"));
-
- organizer.flush(items.iterator(), 4);
-
- items.add(new TestEvent("key-5", "value-0"));
- items.add(new TestEvent("key-6", "value-5"));
-
- items.clear();
- organizer.flush(items.iterator(), items.size());
- assertEquals(7, organizer.sizeEstimate());
-
- organizer.getCompactor().compact(isMajor, false);
- assertEquals(3, organizer.sizeEstimate());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java
deleted file mode 100644
index 67dcddf..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.List;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.AttributesMutator;
-import com.gemstone.gemfire.cache.CacheLoader;
-import com.gemstone.gemfire.cache.CacheLoaderException;
-import com.gemstone.gemfire.cache.LoaderHelper;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest
-;
-
-/**
- * Tests that entries loaded from a cache loader are inserted in the HDFS queue
- *
- * @author hemantb
- */
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HDFSCacheLoaderJUnitTest extends BaseHoplogTestCase {
-
- private static int totalEventsReceived = 0;
- protected void configureHdfsStoreFactory() throws Exception {
- hsf = this.cache.createHDFSStoreFactory();
- hsf.setHomeDir(testDataDir.toString());
- hsf.setBatchInterval(100000000);
- hsf.setBatchSize(10000);
- }
-
- /**
- * Tests that entries loaded from a cache loader are inserted in the HDFS queue
- * but are not inserted in async queues.
- * @throws Exception
- */
- public void testCacheLoaderForAsyncQAndHDFS() throws Exception {
-
- final AsyncEventQueueStats hdfsQueuestatistics = ((AsyncEventQueueImpl)cache.
- getAsyncEventQueues().toArray()[0]).getStatistics();
-
- AttributesMutator am = this.region.getAttributesMutator();
- am.setCacheLoader(new CacheLoader() {
- private int i = 0;
- public Object load(LoaderHelper helper)
- throws CacheLoaderException {
- return new Integer(i++);
- }
-
- public void close() { }
- });
-
-
-
- String asyncQueueName = "myQueue";
- new AsyncEventQueueFactoryImpl(cache).setBatchTimeInterval(1).
- create(asyncQueueName, new AsyncEventListener() {
-
- @Override
- public void close() {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean processEvents(List events) {
- totalEventsReceived += events.size();
- return true;
- }
- });
- am.addAsyncEventQueueId(asyncQueueName);
-
- region.put(1, new Integer(100));
- region.destroy(1);
- region.get(1);
- region.destroy(1);
-
- assertTrue("HDFS queue should have received four events. But it received " +
- hdfsQueuestatistics.getEventQueueSize(), 4 == hdfsQueuestatistics.getEventQueueSize());
- assertTrue("HDFS queue should have received four events. But it received " +
- hdfsQueuestatistics.getEventsReceived(), 4 == hdfsQueuestatistics.getEventsReceived());
-
- region.get(1);
- Thread.sleep(2000);
-
- assertTrue("Async queue should have received only 5 events. But it received " +
- totalEventsReceived, totalEventsReceived == 5);
- assertTrue("HDFS queue should have received 5 events. But it received " +
- hdfsQueuestatistics.getEventQueueSize(), 5 == hdfsQueuestatistics.getEventQueueSize());
- assertTrue("HDFS queue should have received 5 events. But it received " +
- hdfsQueuestatistics.getEventsReceived(), 5 == hdfsQueuestatistics.getEventsReceived());
-
-
- }
-
-}