You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:50 UTC
[04/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
new file mode 100644
index 0000000..7b45952
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
@@ -0,0 +1,904 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HdfsSortedOplogOrganizer.HoplogCompactor;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class TieredCompactionJUnitTest extends BaseHoplogTestCase {
+ static long ONE_MB = 1024 * 1024;
+ static long TEN_MB = 10 * ONE_MB;
+
+ @Override
+ protected void configureHdfsStoreFactory() throws Exception {
+ super.configureHdfsStoreFactory();
+
+ hsf.setInputFileCountMin(3);
+ hsf.setMinorCompaction(false);
+ hsf.setMajorCompaction(false);
+ }
+
+ public void testMinorCompaction() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ // #1
+ ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
+ items.add(new TestEvent("1", "1"));
+ items.add(new TestEvent("2", "1"));
+ items.add(new TestEvent("3", "1"));
+ items.add(new TestEvent("4", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ // #2
+ items.clear();
+ items.add(new TestEvent("2", "1"));
+ items.add(new TestEvent("4", "1"));
+ items.add(new TestEvent("6", "1"));
+ items.add(new TestEvent("8", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ // #3
+ items.clear();
+ items.add(new TestEvent("1", "1"));
+ items.add(new TestEvent("3", "1"));
+ items.add(new TestEvent("5", "1"));
+ items.add(new TestEvent("7", "1"));
+ items.add(new TestEvent("9", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ // #4
+ items.clear();
+ items.add(new TestEvent("0", "1"));
+ items.add(new TestEvent("1", "1"));
+ items.add(new TestEvent("4", "1"));
+ items.add(new TestEvent("5", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ // check file existence in bucket directory, expect 4 hoplgos
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(4, hoplogs.length);
+
+ // After compaction expect 1 hoplog only. It should have the same sequence number as that of the
+ // youngest file compacted, which should be 4 in this case
+ organizer.getCompactor().compact(false, false);
+ hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+ assertEquals(1, hoplogs.length);
+ assertEquals(1, organizer.getSortedOplogs().size());
+ Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
+ assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
+
+ // iterate on oplogs to validate data in files
+ HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
+ // the iteration pattern for this test should be 0-9:
+ // 0 1 4 5 oplog #4
+ // 1 3 5 7 9 oplog #3
+ // 2 4 6 8 oplog #2
+ // 1 2 3 4 oplog #1
+ int count = 0;
+ for (ByteBuffer keyBB = null; iter.hasNext();) {
+ keyBB = iter.next();
+ byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+ assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+ count++;
+ }
+ assertEquals(10, count);
+
+ // there must be 4 expired hoplogs now
+ hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(4, hoplogs.length);
+ organizer.close();
+ }
+
+ public void testIterativeMinorCompaction() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ // #1
+ ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
+ items.add(new TestEvent("1", "1"));
+ items.add(new TestEvent("2", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent("1", "2"));
+ items.add(new TestEvent("3", "2"));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent("4", "3"));
+ items.add(new TestEvent("5", "3"));
+ organizer.flush(items.iterator(), items.size());
+
+ // check file existence in bucket directory
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(3, hoplogs.length);
+
+ organizer.getCompactor().compact(false, false);
+
+ FileStatus[] expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(3, expired.length);
+ FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
+ assertEquals(0, valids.length);
+ // After compaction expect 1 hoplog only.
+ hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+ assertEquals(1, hoplogs.length);
+
+ items.clear();
+ items.add(new TestEvent("4", "4"));
+ items.add(new TestEvent("6", "4"));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent("7", "5"));
+ items.add(new TestEvent("8", "5"));
+ organizer.flush(items.iterator(), items.size());
+
+ hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(5, hoplogs.length);
+
+ organizer.getCompactor().compact(false, false);
+ expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(6, expired.length);
+ valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
+ assertEquals(0, valids.length);
+ hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+ assertEquals(2, hoplogs.length);
+ valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
+ assertEquals(1, valids.length);
+
+ assertEquals("2", organizer.read(BlobHelper.serializeToBlob("1")).getValue());
+ assertEquals("1", organizer.read(BlobHelper.serializeToBlob("2")).getValue());
+ assertEquals("2", organizer.read(BlobHelper.serializeToBlob("3")).getValue());
+ assertEquals("4", organizer.read(BlobHelper.serializeToBlob("4")).getValue());
+ assertEquals("3", organizer.read(BlobHelper.serializeToBlob("5")).getValue());
+ assertEquals("4", organizer.read(BlobHelper.serializeToBlob("6")).getValue());
+ assertEquals("5", organizer.read(BlobHelper.serializeToBlob("7")).getValue());
+ organizer.close();
+ }
+
+ public void testMajorCompactionWithDelete() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ // #1
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent("1", "1"));
+ items.add(new TestEvent("2", "1"));
+ items.add(new TestEvent("3", "1"));
+ items.add(new TestEvent("4", "1"));
+ items.add(new TestEvent("4", "10", Operation.DESTROY));
+ organizer.flush(items.iterator(), items.size());
+
+ // #2
+ items.clear();
+ items.add(new TestEvent("2", "1", Operation.DESTROY));
+ items.add(new TestEvent("4", "1", Operation.DESTROY));
+ items.add(new TestEvent("6", "1", Operation.INVALIDATE));
+ items.add(new TestEvent("8", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ // #3
+ items.clear();
+ items.add(new TestEvent("1", "1"));
+ items.add(new TestEvent("3", "1"));
+ items.add(new TestEvent("5", "1"));
+ items.add(new TestEvent("7", "1"));
+ items.add(new TestEvent("9", "1", Operation.DESTROY));
+ organizer.flush(items.iterator(), items.size());
+
+ // #4
+ items.clear();
+ items.add(new TestEvent("0", "1", Operation.DESTROY));
+ items.add(new TestEvent("1", "1"));
+ items.add(new TestEvent("4", "1"));
+ items.add(new TestEvent("5", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ // check file existence in bucket directory, expect 4 hoplgos
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(4, hoplogs.length);
+
+ // After compaction expect 1 hoplog only. It should have the same sequence number as that of the
+ // youngest file compacted, which should be 4 in this case
+ organizer.getCompactor().compact(true, false);
+ hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(1, hoplogs.length);
+ assertEquals(1, organizer.getSortedOplogs().size());
+ Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
+ assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
+
+ // iterate on oplogs to validate data in files
+ HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
+ int count = 0;
+
+ // entries in () are destroyed or invalidated
+ // 1, 2, 3, 4, (11)
+ // (2), (4), (6), 8
+ // 1, 3, 5, 7, (9)
+ // (0), 1, 4, 5
+ String[] expectedValues = { "1", "3", "4", "5", "7", "8" };
+ for (ByteBuffer keyBB = null; iter.hasNext();) {
+ keyBB = iter.next();
+ byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+ assertEquals(expectedValues[count], BlobHelper.deserializeBlob(key));
+ count++;
+ }
+ assertEquals(6, count);
+
+ // there must be 4 expired hoplogs now
+ hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(4, hoplogs.length);
+ organizer.close();
+ }
+
+ public void testGainComputation() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+ HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+ ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+ for (int i = 0; i < 10; i++) {
+ targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
+ }
+
+ // each read has cost 3. Four files read cost is 3 * 4. Reduce read cost of
+ // file after compaction
+ float expect = (float) ((3 * 4.0 - 3) / (20 + 30 + 40 + 50));
+ float result = bucket.computeGain(2, 5, targets);
+ assertTrue(Math.abs(expect - result) < (expect/1000));
+
+ // each read has cost 3 except 10MB file with read cost 2. 9 files read cost
+ // is 3 * 9. Reduce read cost of file after compaction.
+ expect = (float) ((3 * 9 - 3 - 1.0) / (10 + 20 + 30 + 40 + 50 + 60 + 70 + 80 + 90));
+ result = bucket.computeGain(0, 9, targets);
+ assertTrue(Math.abs(expect - result) < (expect/1000));
+ }
+
+ public void testGainComputeSmallFile() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+ HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+
+ ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+ for (int i = 0; i < 10; i++) {
+ targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
+ }
+
+ float result = bucket.computeGain(2, 5, targets);
+ assertTrue(Math.abs(8.0 - result) < (1.0/1000));
+ }
+
+ public void testGainComputeMixedFiles() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+ HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+
+ ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+ for (int i = 0; i < 10; i++) {
+ targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
+ }
+ TestHoplog midHop = (TestHoplog) targets.get(4).get();
+ // one more than other files
+ midHop.size = 5 * TEN_MB;
+
+ float expect = (float) ((4 * 2 - 3 + 1.0) / 50);
+ float result = bucket.computeGain(2, 5, targets);
+ System.out.println(expect);
+ System.out.println(result);
+ assertTrue(Math.abs(expect - result) < (expect/1000));
+ }
+
+ public void testGainComputeBadRatio() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+ HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+ ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+ for (int i = 0; i < 10; i++) {
+ targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
+ }
+
+ TestHoplog firstHop = (TestHoplog) targets.get(2).get();
+ // one more than other files
+ firstHop.size = (1 + 30 + 40 + 50) * TEN_MB;
+ Float result = bucket.computeGain(2, 5, targets);
+ assertNull(result);
+ }
+
+ public void testMinorCompactionTargetMaxSize() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+ ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
+ for (int i = 0; i < 5; i++) {
+ TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+ hop.increment();
+ targets.add(hop);
+ }
+ TrackedReference<TestHoplog> oldestHop = targets.get(targets.size() - 1);
+ TestHoplog thirdHop = (TestHoplog) targets.get(2).get();
+
+ // oldest is more than max size is ignored
+ oldestHop.get().size = HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB + 100;
+ List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(4, list.size());
+ for (TrackedReference<Hoplog> ref : list) {
+ assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 5 );
+ }
+
+ // third is more than max size but is not ignored
+ thirdHop.size = HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB + 100;
+ oldestHop.increment();
+ list = (List<TrackedReference<Hoplog>>) targets.clone();
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(4, list.size());
+ int i = 0;
+ for (TrackedReference<Hoplog> ref : list) {
+ if (i != 2) {
+ assertTrue(((TestHoplog) ref.get()).size - TEN_MB < 5);
+ } else {
+ assertTrue(((TestHoplog) ref.get()).size > HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB);
+ }
+ i++;
+ }
+ }
+
+ public void testAlterMaxInputFileSize() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+ assertTrue(TEN_MB * 2 < hdfsStore.getInputFileSizeMax() * ONE_MB);
+
+ ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
+ for (int i = 0; i < 5; i++) {
+ TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+ hop.increment();
+ targets.add(hop);
+ }
+
+ List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(targets.size(), list.size());
+
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ mutator.setInputFileSizeMax(1);
+ hdfsStore.alter(mutator);
+
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(0, list.size());
+ }
+
+ public void testAlterInputFileCount() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+ assertTrue(2 < hdfsStore.getInputFileCountMax());
+
+ ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
+ for (int i = 0; i < 5; i++) {
+ TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+ hop.increment();
+ targets.add(hop);
+ }
+
+ List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(targets.size(), list.size());
+
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ mutator.setInputFileCountMax(2);
+ mutator.setInputFileCountMin(2);
+ hdfsStore.alter(mutator);
+
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(2, list.size());
+ }
+
+ public void testAlterMajorCompactionInterval() throws Exception {
+ final AtomicInteger majorCReqCount = new AtomicInteger(0);
+
+ final Compactor compactor = new AbstractCompactor() {
+ @Override
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ majorCReqCount.incrementAndGet();
+ return true;
+ }
+ };
+
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+ @Override
+ public synchronized Compactor getCompactor() {
+ return compactor;
+ }
+ };
+
+ // create hoplog in the past, 90 seconds before current time
+ organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
+ TimeUnit.MILLISECONDS.sleep(50);
+ organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
+
+ alterMajorCompaction(hdfsStore, true);
+
+ List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+ assertEquals(2, hoplogs.size());
+
+ organizer.performMaintenance();
+ TimeUnit.MILLISECONDS.sleep(100);
+ assertEquals(0, majorCReqCount.get());
+
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ mutator.setMajorCompactionInterval(1);
+ hdfsStore.alter(mutator);
+
+ organizer.performMaintenance();
+ TimeUnit.MILLISECONDS.sleep(100);
+ assertEquals(1, majorCReqCount.get());
+ }
+
+ public void testMinorCompactionTargetMinCount() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+ ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+ for (int i = 0; i < 2; i++) {
+ TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+ hop.increment();
+ targets.add(hop);
+ }
+ compactor.getMinorCompactionTargets(targets, -1);
+ assertEquals(0, targets.size());
+ }
+
+ public void testMinorCompactionLessTargetsStatsUpdate() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent("1", "1"));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent("2", "2", Operation.DESTROY));
+ organizer.flush(items.iterator(), items.size());
+
+ TimeUnit.SECONDS.sleep(1);
+ List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+ assertEquals(2, hoplogs.size());
+
+ organizer.performMaintenance();
+ hoplogs = organizer.getSortedOplogs();
+ assertEquals(2, hoplogs.size());
+ }
+
+ public void testMinorCompactionTargetsOptimizer() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+ ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+ for (int i = 0; i < 6; i++) {
+ TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+ hop.increment();
+ targets.add(hop);
+ }
+ List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(6, list.size());
+
+ TestHoplog fifthHop = (TestHoplog) targets.get(4).get();
+ // fifth hop needs additional block read as it has more than max keys size
+ fifthHop.size = (HdfsSortedOplogOrganizer.AVG_NUM_KEYS_PER_INDEX_BLOCK * 5 + 1) * 64 * 1024;
+ list = (List<TrackedReference<Hoplog>>) targets.clone();
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(4, list.size());
+ for (TrackedReference<Hoplog> ref : list) {
+ assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 4 );
+ }
+ }
+
+ public void testTargetsReleasedBadRatio() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+ ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
+ for (int i = 0; i < 3; i++) {
+ TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
+ hop.increment();
+ targets.add(hop);
+ }
+ TestHoplog oldestHop = (TestHoplog) targets.get(2).get();
+ oldestHop.size = (1 + 30) * TEN_MB;
+
+ List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
+ compactor.getMinorCompactionTargets(list, -1);
+ assertEquals(0, list.size());
+ assertEquals(3, targets.size());
+ for (TrackedReference<Hoplog> ref : targets) {
+ assertEquals(0, ref.uses());
+ }
+ }
+
+ public void testMinorCTargetsIgnoreMajorC() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 7; i++) {
+ items.clear();
+ items.add(new TestEvent("1" + i, "1" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+ List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
+ compactor.getMinorCompactionTargets(targets, -1);
+ assertEquals(7, targets.size());
+
+ targets = organizer.getSortedOplogs();
+ for (TrackedReference<Hoplog> ref : targets) {
+ ref.increment();
+ }
+ compactor.getMinorCompactionTargets(targets, 2);
+ assertEquals((7 - 2), targets.size());
+ targets = organizer.getSortedOplogs();
+ for (int i = 0; i < targets.size(); i++) {
+ if (i + 1 <= (7 - 2)) {
+ assertEquals(1, targets.get(i).uses());
+ } else {
+ assertEquals(0, targets.get(i).uses());
+ }
+ }
+
+ targets = organizer.getSortedOplogs();
+ for (TrackedReference<Hoplog> ref : targets) {
+ if (ref.uses() == 0) {
+ ref.increment();
+ }
+ assertEquals(1, ref.uses());
+ }
+ compactor.getMinorCompactionTargets(targets, 7);
+ assertEquals(0, targets.size());
+
+ targets = organizer.getSortedOplogs();
+ for (int i = 0; i < targets.size(); i++) {
+ assertEquals(0, targets.get(i).uses());
+ }
+ }
+
+ public void testTargetOverlap() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 7; i++) {
+ items.clear();
+ items.add(new TestEvent("1" + i, "1" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+ List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
+ assertTrue(compactor.isMinorMajorOverlap(targets, 8));
+ assertTrue(compactor.isMinorMajorOverlap(targets, 7));
+ assertTrue(compactor.isMinorMajorOverlap(targets, 6));
+ assertTrue(compactor.isMinorMajorOverlap(targets, 1));
+ assertFalse(compactor.isMinorMajorOverlap(targets, 0));
+ assertFalse(compactor.isMinorMajorOverlap(targets, -1));
+
+ targets.remove(targets.size() -1); // remove the last one
+ targets.remove(targets.size() -1); // remove the last one again
+ assertFalse(compactor.isMinorMajorOverlap(targets, 1));
+ assertFalse(compactor.isMinorMajorOverlap(targets, 2));
+ assertTrue(compactor.isMinorMajorOverlap(targets, 3));
+
+ targets.remove(3); // remove from the middle, seq num 4
+ assertTrue(compactor.isMinorMajorOverlap(targets, 4));
+ assertTrue(compactor.isMinorMajorOverlap(targets, 3));
+ }
+
+ public void testSuspendMinorByMajor() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 5; i++) {
+ items.clear();
+ items.add(new TestEvent("1" + i, "1" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+
+ Hoplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir + "/"
+ + getName() + "-" + System.currentTimeMillis() + "-1.ihop.tmp"), blockCache, stats, storeStats);
+ compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
+
+ cache.getLogger().info("<ExpectedException action=add>java.lang.InterruptedException</ExpectedException>");
+ try {
+ compactor.maxMajorCSeqNum.set(3);
+ compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
+ fail();
+ } catch (InterruptedException e) {
+ // expected
+ }
+ cache.getLogger().info("<ExpectedException action=remove>java.lang.InterruptedException</ExpectedException>");
+ organizer.close();
+ }
+
+ public void testMajorCompactionSetsSeqNum() throws Exception {
+ final CountDownLatch compactionStartedLatch = new CountDownLatch(1);
+ final CountDownLatch waitLatch = new CountDownLatch(1);
+ class MyOrganizer extends HdfsSortedOplogOrganizer {
+ final HoplogCompactor compactor = new MyCompactor();
+ public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
+ super(region, bucketId);
+ }
+ public synchronized Compactor getCompactor() {
+ return compactor;
+ }
+ class MyCompactor extends HoplogCompactor {
+ @Override
+ public long fillCompactionHoplog(boolean isMajor,
+ List<TrackedReference<Hoplog>> targets, Hoplog output,
+ int majorCSeqNum) throws IOException, InterruptedException {
+ compactionStartedLatch.countDown();
+ waitLatch.await();
+ long byteCount = 0;
+ try {
+ byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
+ } catch (ForceReattemptException e) {
+ // we do not expect this in a unit test.
+ }
+ return byteCount;
+ }
+ }
+ }
+
+ final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 3; i++) {
+ items.clear();
+ items.add(new TestEvent("1" + i, "1" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ organizer.getCompactor().compact(true, false);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ t.start();
+ compactionStartedLatch.await();
+ assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
+ waitLatch.countDown();
+ t.join();
+ }
+
+ public void testMinorWatchesMajorsSeqNum() throws Exception {
+ final CountDownLatch majorCStartedLatch = new CountDownLatch(1);
+ final CountDownLatch majorCWaitLatch = new CountDownLatch(1);
+
+ final CountDownLatch minorCStartedLatch = new CountDownLatch(1);
+ final List<TrackedReference<Hoplog>> minorTargets = new ArrayList<TrackedReference<Hoplog>>();
+
+ class MyOrganizer extends HdfsSortedOplogOrganizer {
+ final HoplogCompactor compactor = new MyCompactor();
+ public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
+ super(region, bucketId);
+ }
+ public synchronized Compactor getCompactor() {
+ return compactor;
+ }
+ class MyCompactor extends HoplogCompactor {
+ @Override
+ public long fillCompactionHoplog(boolean isMajor,
+ List<TrackedReference<Hoplog>> targets, Hoplog output,
+ int majorCSeqNum) throws IOException, InterruptedException {
+ if (isMajor) {
+ majorCStartedLatch.countDown();
+ majorCWaitLatch.await();
+ } else {
+ minorCStartedLatch.countDown();
+ minorTargets.addAll(targets);
+ }
+ long byteCount =0;
+ try {
+ byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
+ } catch (ForceReattemptException e) {
+ // we do not expect this in a unit test.
+ }
+ return byteCount;
+ }
+ }
+ }
+
+ final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 3; i++) {
+ items.clear();
+ items.add(new TestEvent("1" + i, "1" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ Thread majorCThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ organizer.getCompactor().compact(true, false);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ majorCThread.start();
+ majorCStartedLatch.await();
+ assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
+
+ // create more files for minor C
+ for (int i = 0; i < 4; i++) {
+ items.clear();
+ items.add(new TestEvent("1" + i, "1" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ Thread minorCThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ organizer.getCompactor().compact(false, false);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ minorCThread.start();
+ minorCThread.join();
+ assertEquals(4, minorTargets.size());
+ for (TrackedReference<Hoplog> ref : minorTargets) {
+ assertTrue(organizer.getSequenceNumber(ref.get()) >= 4);
+ }
+
+ majorCWaitLatch.countDown();
+ majorCThread.join();
+ }
+
+ public void testTimeBoundedSuspend() throws Exception {
+ final AtomicBoolean barrier = new AtomicBoolean(true);
+
+ class MyOrganizer extends HdfsSortedOplogOrganizer {
+ public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
+ super(region, bucketId);
+ }
+ public synchronized Compactor getCompactor() {
+ return new MyCompactor();
+ }
+ class MyCompactor extends HoplogCompactor {
+ public long fillCompactionHoplog(boolean isMajor, List<TrackedReference<Hoplog>> targets, Hoplog output)
+ throws IOException, InterruptedException {
+ barrier.set(false);
+ TimeUnit.SECONDS.sleep(5 * HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+ long byteCount =0;
+ try {
+ byteCount = super.fillCompactionHoplog(isMajor, targets, output, -1);
+ } catch (ForceReattemptException e) {
+ // we do not expect this in a unit test.
+ }
+ return byteCount;
+ }
+ }
+ }
+
+ HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 4; i++) {
+ items.clear();
+ items.add(new TestEvent("1" + i, "1" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ final HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
+ ExecutorService service = Executors.newCachedThreadPool();
+ service.execute(new Runnable() {
+ public void run() {
+ try {
+ compactor.compact(false, false);
+ } catch (Exception e) {
+ }
+ }
+ });
+
+ final AtomicLong start = new AtomicLong(0);
+ final AtomicLong end = new AtomicLong(0);
+ service.execute(new Runnable() {
+ public void run() {
+ while (barrier.get()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(50);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ start.set(System.currentTimeMillis());
+ compactor.suspend();
+ end.set(System.currentTimeMillis());
+ }
+ });
+
+ for (long i = 0; i < 5; i++) {
+ if (end.get() == 0) {
+ TimeUnit.MILLISECONDS.sleep(HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT / 2);
+ } else {
+ break;
+ }
+ }
+
+ assertTrue(end.get() - start.get() < 100 + HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
+ }
+
+ public static class TestHoplog extends AbstractHoplog {
+ long size;
+ long creationTime;
+ TestHoplog(HDFSStoreImpl store, long size) throws IOException {
+ this(store, size, 0);
+ }
+
+ TestHoplog(HDFSStoreImpl store, long size, long creationTime) throws IOException {
+ super(store, new Path("1-" + creationTime + "-1.hop"), null);
+ this.size = size;
+ this.creationTime = creationTime;
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+ @Override
+ public long getModificationTimeStamp() {
+ if (creationTime > 0) {
+ return creationTime;
+ }
+ return super.getModificationTimeStamp();
+ }
+ @Override
+ public String toString() {
+ long name = size - TEN_MB;
+ if (name < 0) name = size - (TEN_MB / 1024);
+ return name + "";
+ }
+ public boolean isClosed() {
+ return false;
+ }
+ public void close() throws IOException {
+ }
+ public HoplogReader getReader() throws IOException {
+ return null;
+ }
+ public HoplogWriter createWriter(int keys) throws IOException {
+ return null;
+ }
+ public void close(boolean clearCache) throws IOException {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
new file mode 100644
index 0000000..fe15305
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
@@ -0,0 +1,50 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class GFKeyJUnitTest extends TestCase {
+ public void testSerde() throws Exception {
+ String str = "str";
+ GFKey key = new GFKey();
+ key.setKey(str);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ key.write(dos);
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInputStream dis = new DataInputStream(bais);
+ key.readFields(dis);
+
+ assertEquals(str, key.getKey());
+ }
+
+ public void testCompare() {
+ GFKey keya = new GFKey();
+ keya.setKey("a");
+
+ GFKey keyb = new GFKey();
+ keyb.setKey("b");
+
+ assertEquals(-1, keya.compareTo(keyb));
+ assertEquals(1, keyb.compareTo(keya));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
new file mode 100644
index 0000000..5ebb00e
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
@@ -0,0 +1,265 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HFileSortedOplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSSplitIteratorJUnitTest extends BaseHoplogTestCase {
+ public void test1Hop1BlockIter() throws Exception {
+ Path path = new Path(testDataDir, "region/0/1-1-1.hop");
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, path, blockCache, stats,
+ storeStats);
+ createHoplog(10, oplog);
+
+ Path[] paths = {path};
+ long[] starts = {0};
+ long[] lengths = {oplog.getSize()};
+ HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+
+ int count = 0;
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ assertEquals("key-" + count, new String((byte[])iter.getKey()));
+ count++;
+ }
+ assertEquals(10, count);
+ }
+
+ public void test1HopNBlockIter() throws Exception {
+ Path path = new Path(testDataDir, "region/0/1-1-1.hop");
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, path,
+ blockCache, stats, storeStats);
+ createHoplog(2000, oplog);
+
+ FileSystem fs = hdfsStore.getFileSystem();
+ Reader reader = HFile.createReader(fs, path, new CacheConfig(fs.getConf()));
+ BlockIndexReader bir = reader.getDataBlockIndexReader();
+ int blockCount = bir.getRootBlockCount();
+ reader.close();
+
+ // make sure there are more than 1 hfile blocks in the hoplog
+ assertTrue(1 < blockCount);
+
+ Path[] paths = {path};
+ long half = oplog.getSize()/2;
+ long[] starts = {0};
+ long[] lengths = {half};
+ HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+
+ int count = 0;
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
+ count++;
+ }
+ // the number of iterations should be less than number of keys inserted in
+ // the hoplog
+ assertTrue(count < 2000 && count > 0);
+
+ paths = new Path[] {path};
+ starts = new long[] {half + 1};
+ lengths = new long[] {oplog.getSize()};
+ iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
+ count++;
+ }
+ assertEquals(2000, count);
+
+ paths = new Path[] {path, path};
+ starts = new long[] {0, half + 1};
+ lengths = new long[] {half, oplog.getSize()};
+ iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+
+ count = 0;
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
+ count++;
+ }
+ assertEquals(2000, count);
+ }
+
+ /*
+ * This tests iterates over 3 hoplog files. The three hoplog files have the
+ * same content. Duplicate keys should not get discarded
+ */
+ public void testNHoplogNBlockIter() throws Exception {
+ Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
+ blockCache, stats, storeStats);
+ createHoplog(2000, oplog);
+
+ FileSystem fs = hdfsStore.getFileSystem();
+ Reader reader = HFile.createReader(fs, path1, new CacheConfig(fs.getConf()));
+ BlockIndexReader bir = reader.getDataBlockIndexReader();
+ int blockCount = bir.getRootBlockCount();
+ reader.close();
+
+ // make sure there are more than 1 hfile blocks in the hoplog
+ assertTrue(1 < blockCount);
+
+ Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
+ oplog = new HFileSortedOplog(hdfsStore, path2,
+ blockCache, stats, storeStats);
+ createHoplog(2000, oplog);
+
+ Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
+ oplog = new HFileSortedOplog(hdfsStore, path3,
+ blockCache, stats, storeStats);
+ createHoplog(2000, oplog);
+
+ Path[] paths = {path1, path2, path3, path1, path2, path3};
+ long half = oplog.getSize()/2;
+ long[] starts = {0, 0, 0, half + 1, half + 1, half + 1};
+ long[] lengths = {half, half, half, oplog.getSize(), oplog.getSize(), oplog.getSize()};
+ HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+
+ int[] keyCounts = new int[2000];
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ String key = new String((byte[])iter.getKey()).substring("key-".length());
+ keyCounts[Integer.valueOf(key) - 100000] ++;
+ }
+
+ for (int i : keyCounts) {
+ assertEquals(3, i);
+ }
+ }
+
+ public void testMRLikeNHopIter() throws Exception {
+ Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+
+ Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
+ oplog = new HFileSortedOplog(hdfsStore, path2,
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+
+ Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
+ oplog = new HFileSortedOplog(hdfsStore, path3,
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+
+ Path[] paths = {path1, path2, path3};
+ long[] starts = {0, 0, 0};
+ long[] lengths = {oplog.getSize(), oplog.getSize(), oplog.getSize()};
+ HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+
+ int[] keyCounts = new int[10];
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ // extra has next before key read
+ iter.hasNext();
+ String key = new String((byte[])iter.getKey()).substring("key-".length());
+ System.out.println(key);
+ keyCounts[Integer.valueOf(key)] ++;
+ }
+
+ for (int i : keyCounts) {
+ assertEquals(3, i);
+ }
+ }
+
+ public void test1Hop1BlockIterSkipDeletedHoplogs() throws Exception {
+ FileSystem fs = hdfsStore.getFileSystem();
+ Path path = new Path(testDataDir, "region/0/1-1-1.hop");
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, path,
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+
+ Path[] paths = {path};
+ long[] starts = {0};
+ long[] lengths = {oplog.getSize()};
+
+ //Delete the Hoplog file
+ fs.delete(path, true);
+
+ HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+ assertFalse(iter.hasNext());
+
+ }
+
+ public void testMRLikeNHopIterSkipDeletedHoplogs() throws Exception {
+ FileSystem fs = hdfsStore.getFileSystem();
+ //Create Hoplogs
+ Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+
+ Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
+ oplog = new HFileSortedOplog(hdfsStore, path2,
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+
+ Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
+ oplog = new HFileSortedOplog(hdfsStore, path3,
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+
+ Path[] paths = {path1, path2, path3};
+ long[] starts = {0, 0, 0};
+ long[] lengths = {oplog.getSize(), oplog.getSize(), oplog.getSize()};
+ HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+ int count = 0;
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ count++;
+ }
+ assertEquals(30, count);
+
+ for(int i = 0; i < 3; ++i){
+ fs.delete(paths[i], true);
+ iter = HDFSSplitIterator.newInstance(
+ hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
+ count = 0;
+ while (iter.hasNext()) {
+ boolean success = iter.next();
+ assertTrue(success);
+ count++;
+ }
+ assertEquals(20, count);
+ oplog = new HFileSortedOplog(hdfsStore, paths[i],
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
new file mode 100644
index 0000000..a209b6e
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
@@ -0,0 +1,305 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HdfsSortedOplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HoplogUtilJUnitTest extends BaseHoplogTestCase {
+ Path regionPath = null;
+
+ @Override
+ protected void configureHdfsStoreFactory() throws Exception {
+ super.configureHdfsStoreFactory();
+
+ hsf.setInputFileCountMin(3);
+ hsf.setMinorCompaction(false);
+ hsf.setMajorCompaction(false);
+ }
+
+ public void testHoplogListingMultiBucket() throws Exception {
+ createHoplogs();
+
+ Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ assertEquals(5, hdfsStore.getFileSystem().listStatus(regionPath).length);
+ assertEquals(15, hoplogs.size());
+ }
+
+ public void testHoplogListingMixFileTypes() throws Exception {
+ createHoplogs();
+
+ HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ organizer.getCompactor().compact(false, false);
+
+ Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ assertEquals(7,
+ hdfsStore.getFileSystem().listStatus(new Path(regionPath, "0")).length);
+ assertEquals(15, hoplogs.size());
+ }
+
+ public void testHoplogListingEmptyBucket() throws Exception {
+ createHoplogs();
+
+ hdfsStore.getFileSystem().mkdirs(new Path(regionPath, "100"));
+
+ Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ assertEquals(6, hdfsStore.getFileSystem().listStatus(regionPath).length);
+ assertEquals(15, hoplogs.size());
+ }
+
+ public void testHoplogListingInvalidBucket() throws Exception {
+ createHoplogs();
+
+ hdfsStore.getFileSystem().rename(new Path(regionPath, "0"),
+ new Path(regionPath, "not_a_bucket"));
+
+ Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ assertEquals(5, hdfsStore.getFileSystem().listStatus(regionPath).length);
+ assertEquals(12, hoplogs.size());
+ }
+
+ public void testHoplogListingInvalidFiles() throws Exception {
+ createHoplogs();
+
+ Path bucketPath = new Path(regionPath, "0");
+ FSDataOutputStream stream = hdfsStore.getFileSystem().create(
+ new Path(bucketPath, "not_a_hoplog"));
+ stream.close();
+
+ Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ assertEquals(4, hdfsStore.getFileSystem().listStatus(bucketPath).length);
+ assertEquals(15, hoplogs.size());
+ }
+
+ public void testTimeRange() throws Exception {
+ createHoplogs();
+ // rename hoplogs for testing purpose
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+ regionManager, 0);
+ List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+ assertEquals(3, hoplogs.size());
+ hoplogs.get(0).get().rename("0-300-1.hop");
+ hoplogs.get(1).get().rename("0-310-1.hop");
+ hoplogs.get(2).get().rename("0-320-1.hop");
+ organizer.close();
+
+ organizer = new HdfsSortedOplogOrganizer(regionManager, 3);
+ hoplogs = organizer.getSortedOplogs();
+ assertEquals(3, hoplogs.size());
+ hoplogs.get(0).get().rename("0-600-1.hop");
+ hoplogs.get(1).get().rename("0-610-1.hop");
+ hoplogs.get(2).get().rename("0-620-1.hop");
+ organizer.close();
+
+ organizer = new HdfsSortedOplogOrganizer(regionManager, 6);
+ hoplogs = organizer.getSortedOplogs();
+ assertEquals(3, hoplogs.size());
+ hoplogs.get(0).get().rename("0-100-1.hop");
+ hoplogs.get(1).get().rename("0-110-1.hop");
+ hoplogs.get(2).get().rename("0-120-1.hop");
+
+ Collection<FileStatus> filtered = HoplogUtil.getRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 300, 305);
+ assertEquals(5, filtered.size());
+ assertTrue(containsHoplogWithName(filtered, "0-300-1.hop"));
+ assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+ assertTrue(containsHoplogWithName(filtered, "0-600-1.hop"));
+
+ filtered = HoplogUtil.getRegionHoplogs(regionPath,
+ hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 250, 310);
+ assertEquals(6, filtered.size());
+ assertTrue(containsHoplogWithName(filtered, "0-300-1.hop"));
+ assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+ assertTrue(containsHoplogWithName(filtered, "0-320-1.hop"));
+
+ filtered = HoplogUtil.getRegionHoplogs(regionPath,
+ hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 301, 311);
+ assertEquals(5, filtered.size());
+ assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+ assertTrue(containsHoplogWithName(filtered, "0-320-1.hop"));
+
+ filtered = HoplogUtil.getRegionHoplogs(regionPath,
+ hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 301, 309);
+ assertEquals(4, filtered.size());
+ assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+ organizer.close();
+ }
+
+ public void testExcludeSoonCleanedHoplogs() throws Exception {
+ FileSystem fs = hdfsStore.getFileSystem();
+ Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+ regionManager, 0);
+ //delete the auto generated clean up interval file
+ if (fs.exists(cleanUpIntervalPath)){
+ fs.delete(cleanUpIntervalPath, true);
+ }
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ int count = 10;
+ for (int fileCount = 0; fileCount < 3; fileCount++) {
+ items.clear();
+ for (int itemCount = 0; itemCount < count; itemCount++) {
+ items.add(new TestEvent(("key-" + itemCount), "value"));
+ }
+ organizer.flush(items.iterator(), count);
+ }
+ List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+
+ for(TrackedReference<Hoplog> hoplog : hoplogs) {
+ Path p = new Path(testDataDir, getName() + "/0/" +
+ hoplog.get().getFileName() + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ fs.createNewFile(p);
+ }
+ Collection<FileStatus> files = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(3, files.size());
+
+ TimeUnit.MINUTES.sleep(2);
+ //No clean up interval file, all expired files will be included
+ files = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(3, files.size());
+
+
+ long interval = 1 * 60 * 1000;
+ HoplogUtil.exposeCleanupIntervalMillis(fs,cleanUpIntervalPath,interval);
+
+ files = HoplogUtil.getAllRegionHoplogs(
+ regionPath, hdfsStore.getFileSystem(),
+ AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(0, files.size());
+ organizer.close();
+ }
+
+
+ public void testCheckpointSelection() throws Exception {
+ createHoplogs();
+ // rename hoplogs for testing purpose
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+ regionManager, 0);
+ List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+ assertEquals(3, hoplogs.size());
+ hoplogs.get(0).get().rename("0-300-1.chop");
+ hoplogs.get(1).get().rename("0-310-1.hop");
+ hoplogs.get(2).get().rename("0-320-1.hop"); // checkpoint file
+ organizer.close();
+
+ organizer = new HdfsSortedOplogOrganizer(regionManager, 3);
+ hoplogs = organizer.getSortedOplogs();
+ assertEquals(3, hoplogs.size());
+ hoplogs.get(0).get().rename("0-600-1.hop");
+ hoplogs.get(1).get().rename("0-610-1.chop");
+ hoplogs.get(2).get().rename("0-620-1.hop");
+ organizer.close();
+
+ organizer = new HdfsSortedOplogOrganizer(regionManager, 6);
+ hoplogs = organizer.getSortedOplogs();
+ assertEquals(3, hoplogs.size());
+ hoplogs.get(0).get().rename("0-100-1.hop");
+ hoplogs.get(1).get().rename("0-110-1.hop");
+ hoplogs.get(2).get().rename("0-120-1.chop");
+
+ Collection<FileStatus> filtered = HoplogUtil.filterHoplogs(
+ hdfsStore.getFileSystem(), regionPath, 290, 305, false);
+ assertEquals(4, filtered.size());
+ assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
+ assertTrue(containsHoplogWithName(filtered, "0-600-1.hop"));
+
+ filtered = HoplogUtil.filterHoplogs(hdfsStore.getFileSystem(),
+ regionPath, 290, 305, true);
+ assertEquals(3, filtered.size());
+ assertTrue(containsHoplogWithName(filtered, "0-300-1.chop"));
+ assertTrue(containsHoplogWithName(filtered, "0-610-1.chop"));
+ assertTrue(containsHoplogWithName(filtered, "0-120-1.chop"));
+ organizer.close();
+ }
+
+ private boolean containsHoplogWithName(Collection<FileStatus> filtered,
+ String name) {
+ for (FileStatus file : filtered) {
+ if (file.getPath().getName().equals(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void createHoplogs() throws IOException, Exception {
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ int count = 10;
+ for (int bucketId = 0; bucketId < 15; bucketId += 3) {
+ HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager,
+ bucketId);
+ for (int fileCount = 0; fileCount < 3; fileCount++) {
+ items.clear();
+ for (int itemCount = 0; itemCount < count; itemCount++) {
+ items.add(new TestEvent(("key-" + itemCount), "value"));
+ }
+ organizer.flush(items.iterator(), count);
+ }
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ regionPath = new Path(testDataDir, getName());
+ }
+
+ @Override
+ protected void tearDown() throws Exception{
+ FileSystem fs = hdfsStore.getFileSystem();
+ Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(),HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+ if (fs.exists(cleanUpIntervalPath)){
+ fs.delete(cleanUpIntervalPath, true);
+ }
+ super.tearDown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
index 7e4acbf..f1b9746 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/Bug38741DUnitTest.java
@@ -308,7 +308,7 @@ public class Bug38741DUnitTest extends ClientServerTestCase {
BucketRegion br = (BucketRegion) r;
try {
KeyInfo keyInfo = new KeyInfo(k1, null, bucketId);
- RawValue rv = br.getSerialized(keyInfo, false, false, null, null, false);
+ RawValue rv = br.getSerialized(keyInfo, false, false, null, null, false, false);
Object val = rv.getRawValue();
assertTrue(val instanceof CachedDeserializable);
CachedDeserializable cd = (CachedDeserializable)val;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java
new file mode 100644
index 0000000..5e2ba4f
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsJUnitTest.java
@@ -0,0 +1,33 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+/**
+ * Tests regions operations when entries are not yet persisted
+ * in HDFS but are in HDFSAsyncQueue
+ * @author sbawaska
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSQueueRegionOperationsJUnitTest extends
+ HDFSRegionOperationsJUnitTest {
+
+ @Override
+ protected int getBatchTimeInterval() {
+ return 50*1000;
+ }
+
+ @Override
+ protected void sleep(String regionPath) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java
new file mode 100644
index 0000000..f28c138
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSQueueRegionOperationsOffHeapJUnitTest.java
@@ -0,0 +1,54 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Properties;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSQueueRegionOperationsOffHeapJUnitTest extends HDFSQueueRegionOperationsJUnitTest {
+ static {
+ System.setProperty("gemfire.trackOffHeapRefCounts", "true");
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ OffHeapTestUtil.checkOrphans();
+ }
+ @Override
+ protected Region<Integer, String> createRegion(String regionName) {
+ RegionFactory<Integer, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+ PartitionAttributes prAttr = new PartitionAttributesFactory().setTotalNumBuckets(10).create();
+ rf.setPartitionAttributes(prAttr);
+ rf.setOffHeap(true);
+ rf.setHDFSStoreName(hdfsStore.getName());
+ Region<Integer, String> r = rf.create(regionName);
+// addListener(r);
+
+ ((PartitionedRegion) r).setQueryHDFS(true);
+ return r;
+ }
+ @Override
+ protected Properties getDSProps() {
+ Properties props = super.getDSProps();
+ props.setProperty("off-heap-memory-size", "50m");
+ return props;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java
new file mode 100644
index 0000000..6cf9c6a
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/HDFSRegionOperationsJUnitTest.java
@@ -0,0 +1,542 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.FixMethodOrder;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.MethodSorters;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAlgorithm;
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.hll.HyperLogLog;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+/**
+ * Tests that region operations work as expected when data is in HDFS.
+ * This test explicitly clears in-memory ConcurrentHashMap that back
+ * AbstractRegionMap before validating region operations.
+ *
+ * @author sbawaska
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSRegionOperationsJUnitTest extends TestCase {
+
+ protected Cache cache;
+ protected HDFSStore hdfsStore;
+
+ public void setUp() throws Exception {
+ Properties props = getDSProps();
+ cache = new CacheFactory(props).create();
+ System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+ String storeName = getName()+"-store";
+ HDFSStoreFactory hsf = cache.createHDFSStoreFactory();
+ hsf.setHomeDir(getName()+"-test");
+ hsf.setBatchInterval(getBatchTimeInterval());
+ hdfsStore = hsf.create(storeName);
+ }
+
+ protected Properties getDSProps() {
+ Properties props = new Properties();
+ props.put("mcast-port", "0");
+ props.put("locators", "");
+ props.put("log-level", "config");
+ return props;
+ }
+
+ public void tearDown() throws Exception {
+ for (Region r : cache.rootRegions()) {
+ if (r != null) {
+ r.close();
+ }
+ }
+
+ if (cache.getRegion(getName()) != null) {
+ cache.getRegion(getName()).destroyRegion();
+ }
+ DiskStore ds = cache.findDiskStore(null);
+ if (ds != null) {
+ ds.destroy();
+ }
+
+ ((HDFSStoreImpl)hdfsStore).getFileSystem().delete(new Path(hdfsStore.getHomeDir()), true);
+ }
+
+ protected int getBatchTimeInterval() {
+ return 1000;
+ }
+
+ protected Region<Integer, String> createRegion(String regionName) {
+ RegionFactory<Integer, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+ PartitionAttributes prAttr = new PartitionAttributesFactory().setTotalNumBuckets(10).create();
+ rf.setPartitionAttributes(prAttr);
+ rf.setHDFSStoreName(hdfsStore.getName());
+ Region<Integer, String> r = rf.create(regionName);
+
+ ((PartitionedRegion) r).setQueryHDFS(true);
+ return r;
+ }
+
+ protected void clearBackingCHM(Region<Integer, String> r) {
+ PartitionedRegion pr = (PartitionedRegion)r;
+ for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+ assertTrue(br.getRegionMap() instanceof HDFSRegionMap);
+ ((AbstractRegionMap)br.getRegionMap())._getMap().clear();
+ // wait here to make sure that the queue has been flushed
+ }
+ sleep(pr.getFullPath());
+ }
+
+ protected void sleep(String regionPath) {
+ String qname = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
+ GemFireCacheImpl.getExisting().waitForSenderQueueFlush(qname, true, 30);
+ }
+
+ public void test010PUTDMLSupport() {
+ Region<Integer, String> r = createRegion(getName());
+ SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + getName());
+ assertEquals(0, stats.getRead().getCount());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ assertEquals(100, stats.getRead().getCount());
+ sleep(r.getFullPath());
+ clearBackingCHM(r);
+ LocalRegion lr = (LocalRegion) r;
+ for (int i=0; i<200; i++) {
+ EntryEventImpl ev = lr.newPutEntryEvent(i, "value"+i, null);
+ lr.validatedPut(ev, System.currentTimeMillis());
+ }
+ // verify that read count on HDFS does not change
+ assertEquals(100, stats.getRead().getCount());
+ sleep(r.getFullPath());
+ clearBackingCHM(r);
+ for (int i=0; i<200; i++) {
+ assertEquals("value"+i, r.get(i));
+ }
+ if (getBatchTimeInterval() > 1000) {
+ // reads from async queue
+ assertEquals(100, stats.getRead().getCount());
+ } else {
+ assertEquals(300, stats.getRead().getCount());
+ }
+ }
+
+ public void test020GetOperationalData() throws Exception {
+ Region<Integer, String> r = createRegion(getName());
+ SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + getName());
+ assertEquals(0, stats.getRead().getCount());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ int expectedReadsFromHDFS = 100;
+ assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+ sleep(r.getFullPath());
+ clearBackingCHM(r);
+ LocalRegion lr = (LocalRegion) r;
+ for (int i=0; i<200; i++) {
+ if (i < 100) {
+ assertEquals("value"+i, r.get(i));
+ } else {
+ assertNull(r.get(i));
+ }
+ }
+ if (getBatchTimeInterval() > 1000) {
+ // reads from async queue
+ expectedReadsFromHDFS = 200; // initial 100 + 100 for misses
+ } else {
+ expectedReadsFromHDFS = 300; // initial 100 + 200 for reads
+ }
+ assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+ for (int i=0; i<200; i++) {
+ assertNull(lr.get(i, null, true, false, false, null, null, false, false/*allowReadFromHDFS*/));
+ }
+ // no increase in HDFS reads
+ assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+
+ /**MergeGemXDHDFSToGFE Have not merged this API as this api is not called by any code*/
+ // test the dataView API
+ //for (int i=0; i<200; i++) {
+ // assertNull(lr.getDataView().getLocally(i, null, i%10, lr, true, true, null, null, false, false/*allowReadFromHDFS*/));
+ //}
+ // no increase in HDFS reads
+ assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+ }
+
+ public void test030RemoveOperationalData() throws Exception {
+ Region<Integer, String> r = createRegion(getName());
+ SortedOplogStatistics stats = HDFSRegionDirector.getInstance().getHdfsRegionStats("/" + getName());
+ assertEquals(0, stats.getRead().getCount());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ int expectedReadsFromHDFS = 100;
+ assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+ sleep(r.getFullPath());
+ PartitionedRegion lr = (PartitionedRegion) r;
+ for(int i =0; i < 50; i++) {
+ lr.getBucketRegion(i).customEvictDestroy(i);
+ }
+ for (int i=0; i<200; i++) {
+ if (i < 100) {
+ assertEquals("value"+i, r.get(i));
+ } else {
+ assertNull(r.get(i));
+ }
+ }
+ if (getBatchTimeInterval() > 1000) {
+ // reads from async queue
+ expectedReadsFromHDFS = 200; // initial 100 + 100 for misses
+ } else {
+ expectedReadsFromHDFS = 250; // initial 100 + 200 for reads + 50 for
+ }
+ assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+ for (int i=0; i<50; i++) {
+ assertNull(lr.get(i, null, true, false, false, null, null, false, false/*allowReadFromHDFS*/));
+ }
+ for (int i=50; i<100; i++) {
+ assertEquals("value"+i, lr.get(i, null, true, false, false, null,null, false, false/*allowReadFromHDFS*/));
+ }
+ for (int i=100; i<200; i++) {
+ assertNull(lr.get(i, null, true, false, false, null, null, false, false/*allowReadFromHDFS*/));
+ }
+ // no increase in HDFS reads
+ assertEquals(expectedReadsFromHDFS, stats.getRead().getCount());
+ }
+
+ public void _test040NoAutoEviction() throws Exception {
+ if (!cache.isClosed()) {
+ tearDown();
+ cache.close();
+ System.setProperty("gemfire.disableAutoEviction", "true");
+ setUp();
+ }
+ Region<Integer, String> r = createRegion(getName());
+ System.setProperty("gemfire.disableAutoEviction", "false");
+ for (int i =0; i<5; i++) {
+ r.put(i, "value"+i);
+ }
+ PartitionedRegion pr = (PartitionedRegion) r;
+ BucketRegion br = pr.getBucketRegion(1);
+ assertNotNull(br.getAttributes().getEvictionAttributes());
+ assertEquals(EvictionAlgorithm.NONE, br.getAttributes().getEvictionAttributes().getAlgorithm());
+
+ GemFireCacheImpl cache = (GemFireCacheImpl) r.getCache();
+ assertEquals(0.0f, cache.getResourceManager().getEvictionHeapPercentage());
+ }
+
+ public void test050LRURegionAttributesForPR() {
+ RegionFactory<Integer, String> rf = cache.createRegionFactory();
+ rf.setHDFSStoreName(hdfsStore.getName());
+ rf.setDataPolicy(DataPolicy.HDFS_PARTITION);
+ verifyLRURegionAttributesForPR(rf.create(getName()));
+ }
+
+ public void test060LRURegionAttributesForRegionShortcutPR() {
+ verifyLRURegionAttributesForPR(createRegion(getName()));
+ }
+
+ private void verifyLRURegionAttributesForPR(Region r) {
+ for (int i =0; i<200; i++) {
+ r.put(i, "value"+i);
+ }
+ RegionAttributes<Integer, String> ra = r.getAttributes();
+ assertNotNull(ra.getEvictionAttributes());
+ // default eviction action for region shortcut
+ assertEquals(EvictionAction.OVERFLOW_TO_DISK, ra.getEvictionAttributes().getAction());
+
+ GemFireCacheImpl cache = (GemFireCacheImpl) r.getCache();
+ assertEquals(80.0f, cache.getResourceManager().getEvictionHeapPercentage());
+ DiskStore ds = cache.findDiskStore(null);
+ assertNotNull(ds);
+ Set s = cache.getResourceManager().getResourceListeners(ResourceType.HEAP_MEMORY);
+ Iterator it = s.iterator();
+ boolean regionFound = false;
+ while (it.hasNext()) {
+ Object o = it.next();
+ if (o instanceof PartitionedRegion) {
+ PartitionedRegion pr = (PartitionedRegion) o;
+ if (getName().equals(pr.getName())) {
+ regionFound = true;
+ } else {
+ continue;
+ }
+ for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+ assertNotNull(br.getAttributes().getEvictionAttributes());
+ assertEquals(EvictionAlgorithm.LRU_HEAP, br.getAttributes().getEvictionAttributes().getAlgorithm());
+ assertEquals(EvictionAction.OVERFLOW_TO_DISK, br.getAttributes().getEvictionAttributes().getAction());
+ }
+ }
+ }
+ assertTrue(regionFound);
+
+ }
+
+ public void test070SizeEstimate() {
+ Region<Integer, String> r = createRegion(getName());
+ int size = 226;
+ Random rand = new Random();
+ for (int i=0; i<size; i++) {
+ r.put(rand.nextInt(), "value"+i);
+ }
+ // size before flush
+ LocalRegion lr = (LocalRegion) r;
+ long estimate = lr.sizeEstimate();
+ double err = Math.abs(estimate - size) / (double) size;
+ // on a busy system flush might start before we call estimateSize, so rather than equality,
+ // test for error margin. fixes bug 49381
+ assertTrue("size:"+size+" estimate:"+estimate, err < 0.02 * 10); // each bucket can have an error of 0.02
+
+ // size after flush
+ sleep(r.getFullPath());
+ estimate = lr.sizeEstimate();
+ err = Math.abs(estimate - size) / (double) size;
+ assertTrue("size:"+size+" estimate:"+estimate, err < 0.02 * 10); // each bucket can have an error of 0.02
+ }
+
+ public void test080PutGet() throws InterruptedException {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ for (int i=0; i<100; i++) {
+ assertEquals("value"+i, r.get(i));
+ }
+
+ //Do a put while there are entries in the map
+ r.put(0, "value"+0);
+
+ r.destroy(1, "value"+1);
+ }
+
+ public void test090Delete() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<11; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ int delKey = 9;
+ r.destroy(delKey);
+ assertNull(r.get(delKey));
+ assertFalse(r.containsKey(delKey));
+ }
+
+ public void test100Invalidate() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ int invKey = 9;
+ r.invalidate(invKey);
+ assertNull(r.get(invKey));
+ assertTrue(r.containsKey(invKey));
+ }
+
+ public void test110Size() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ assertEquals(100, r.size());
+ r.destroy(45);
+ assertEquals(99, r.size());
+ r.invalidate(55);
+ r.invalidate(65);
+ assertEquals(99, r.size());
+ }
+
+ public void test120KeyIterator() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ Set<Integer> keys = r.keySet();
+ int c = 0;
+ for (int i : keys) {
+// assertEquals(c, i);
+ c++;
+ }
+ assertEquals(100, c);
+ assertEquals(100, keys.size());
+ int delKey = 88;
+ r.destroy(delKey);
+ r.invalidate(39);
+ keys = r.keySet();
+ c = 0;
+ for (int i : keys) {
+ if (c == delKey) {
+ c++;
+ }
+// assertEquals(c, i);
+ c++;
+ }
+ assertEquals(99, keys.size());
+ }
+
+ public void test130EntriesIterator() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ Set<Entry<Integer, String>> entries = r.entrySet();
+ int c = 0;
+ for (Entry<Integer, String> e : entries) {
+// assertEquals(c, (int) e.getKey());
+ assertEquals("value"+e.getKey(), e.getValue());
+ c++;
+ }
+ assertEquals(100, c);
+ assertEquals(100, entries.size());
+ int delKey = 88;
+ r.destroy(delKey);
+ int invKey = 39;
+ r.invalidate(invKey);
+ entries = r.entrySet();
+ c = 0;
+ for (Entry<Integer, String> e : entries) {
+ if (c == delKey) {
+ c++;
+ } else if (e.getKey() == invKey) {
+// assertEquals(c, (int) e.getKey());
+ assertNull(e.getValue());
+ } else {
+// assertEquals(c, (int) e.getKey());
+ assertEquals("value"+e.getKey(), e.getValue());
+ }
+ c++;
+ }
+ assertEquals(99, entries.size());
+ }
+
+ public void test140ContainsKey() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ assertTrue(r.containsKey(80));
+ r.destroy(80);
+ assertFalse(r.containsKey(80));
+ r.invalidate(64);
+ assertTrue(r.containsKey(64));
+ }
+
+ public void test150ContainsValue() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ assertTrue(r.containsValue("value45"));
+ r.destroy(45);
+ assertFalse(r.containsValue("value45"));
+ r.invalidate(64);
+ assertFalse(r.containsValue("value64"));
+ }
+
+ public void test160DestroyRegion() {
+ Region<Integer, String> r = createRegion(getName());
+ for (int i=0; i<100; i++) {
+ r.put(i, "value"+i);
+ }
+ clearBackingCHM(r);
+ r.destroyRegion();
+ try {
+ r.get(3);
+ fail("expected exception not thrown");
+ } catch (RegionDestroyedException expected) {
+ }
+ }
+
+ public void test170PutIfAbsent() {
+ Region<Integer, String> r = createRegion(getName());
+ r.put(1, "value1");
+ clearBackingCHM(r);
+ assertEquals("value1", r.putIfAbsent(1, "value2"));
+ }
+
+ public void test180Replace() {
+ Region<Integer, String> r = createRegion(getName());
+ assertNull(r.replace(1, "value"));
+ r.put(1, "value1");
+ clearBackingCHM(r);
+ assertEquals("value1", r.replace(1, "value2"));
+ }
+
+ public void test190ReplaceKVV() {
+ Region<Integer, String> r = createRegion(getName());
+ assertFalse(r.replace(1, "oldValue", "newValue"));
+ r.put(1, "value1");
+ clearBackingCHM(r);
+ assertTrue(r.replace(1, "value1", "value2"));
+ }
+
+ public void test200Accuracy() throws IOException {
+ double sum=0.0;
+ int iter = 10;
+ for (int t=0; t<iter; t++) {
+ Random r = new Random();
+ HashSet<Integer> vals = new HashSet<Integer>();
+ HyperLogLog hll = new HyperLogLog(0.03);
+ //HyperLogLog hll = new HyperLogLog(0.1);
+ double accuracy = 0.0;
+ for (int i = 0; i < 2 * 1000000; i++) {
+ int val = r.nextInt();
+ vals.add(val);
+ hll.offer(val);
+ }
+ long size = vals.size();
+ long est = hll.cardinality();
+
+ accuracy = 100.0 * (size - est) / est;
+ System.out.printf("Accuracy is %f hll size is %d\n", accuracy, hll.getBytes().length);
+ sum+=Math.abs(accuracy);
+ }
+ double avgAccuracy = sum/(iter*1.0);
+ System.out.println("Avg accuracy is:"+avgAccuracy);
+ assertTrue(avgAccuracy < 6);
+ }
+}