You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/10/26 19:28:43 UTC
[20/50] [abbrv] incubator-geode git commit: GEODE-429: Remove
HdfsStore Junit and Dunits
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
deleted file mode 100644
index 7b45952..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/TieredCompactionJUnitTest.java
+++ /dev/null
@@ -1,904 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.Operation;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HdfsSortedOplogOrganizer.HoplogCompactor;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest
-;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class TieredCompactionJUnitTest extends BaseHoplogTestCase {
- static long ONE_MB = 1024 * 1024;
- static long TEN_MB = 10 * ONE_MB;
-
- @Override
- protected void configureHdfsStoreFactory() throws Exception {
- super.configureHdfsStoreFactory();
-
- hsf.setInputFileCountMin(3);
- hsf.setMinorCompaction(false);
- hsf.setMajorCompaction(false);
- }
-
- public void testMinorCompaction() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- // #1
- ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
- items.add(new TestEvent("1", "1"));
- items.add(new TestEvent("2", "1"));
- items.add(new TestEvent("3", "1"));
- items.add(new TestEvent("4", "1"));
- organizer.flush(items.iterator(), items.size());
-
- // #2
- items.clear();
- items.add(new TestEvent("2", "1"));
- items.add(new TestEvent("4", "1"));
- items.add(new TestEvent("6", "1"));
- items.add(new TestEvent("8", "1"));
- organizer.flush(items.iterator(), items.size());
-
- // #3
- items.clear();
- items.add(new TestEvent("1", "1"));
- items.add(new TestEvent("3", "1"));
- items.add(new TestEvent("5", "1"));
- items.add(new TestEvent("7", "1"));
- items.add(new TestEvent("9", "1"));
- organizer.flush(items.iterator(), items.size());
-
- // #4
- items.clear();
- items.add(new TestEvent("0", "1"));
- items.add(new TestEvent("1", "1"));
- items.add(new TestEvent("4", "1"));
- items.add(new TestEvent("5", "1"));
- organizer.flush(items.iterator(), items.size());
-
- // check file existence in bucket directory, expect 4 hoplgos
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(4, hoplogs.length);
-
- // After compaction expect 1 hoplog only. It should have the same sequence number as that of the
- // youngest file compacted, which should be 4 in this case
- organizer.getCompactor().compact(false, false);
- hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
- assertEquals(1, hoplogs.length);
- assertEquals(1, organizer.getSortedOplogs().size());
- Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
- assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
-
- // iterate on oplogs to validate data in files
- HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
- // the iteration pattern for this test should be 0-9:
- // 0 1 4 5 oplog #4
- // 1 3 5 7 9 oplog #3
- // 2 4 6 8 oplog #2
- // 1 2 3 4 oplog #1
- int count = 0;
- for (ByteBuffer keyBB = null; iter.hasNext();) {
- keyBB = iter.next();
- byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
- assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
- count++;
- }
- assertEquals(10, count);
-
- // there must be 4 expired hoplogs now
- hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(4, hoplogs.length);
- organizer.close();
- }
-
- public void testIterativeMinorCompaction() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- // #1
- ArrayList<QueuedPersistentEvent> items = new ArrayList<QueuedPersistentEvent>();
- items.add(new TestEvent("1", "1"));
- items.add(new TestEvent("2", "1"));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent("1", "2"));
- items.add(new TestEvent("3", "2"));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent("4", "3"));
- items.add(new TestEvent("5", "3"));
- organizer.flush(items.iterator(), items.size());
-
- // check file existence in bucket directory
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(3, hoplogs.length);
-
- organizer.getCompactor().compact(false, false);
-
- FileStatus[] expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(3, expired.length);
- FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
- assertEquals(0, valids.length);
- // After compaction expect 1 hoplog only.
- hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
- assertEquals(1, hoplogs.length);
-
- items.clear();
- items.add(new TestEvent("4", "4"));
- items.add(new TestEvent("6", "4"));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent("7", "5"));
- items.add(new TestEvent("8", "5"));
- organizer.flush(items.iterator(), items.size());
-
- hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(5, hoplogs.length);
-
- organizer.getCompactor().compact(false, false);
- expired = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(6, expired.length);
- valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
- assertEquals(0, valids.length);
- hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
- assertEquals(2, hoplogs.length);
- valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, expired);
- assertEquals(1, valids.length);
-
- assertEquals("2", organizer.read(BlobHelper.serializeToBlob("1")).getValue());
- assertEquals("1", organizer.read(BlobHelper.serializeToBlob("2")).getValue());
- assertEquals("2", organizer.read(BlobHelper.serializeToBlob("3")).getValue());
- assertEquals("4", organizer.read(BlobHelper.serializeToBlob("4")).getValue());
- assertEquals("3", organizer.read(BlobHelper.serializeToBlob("5")).getValue());
- assertEquals("4", organizer.read(BlobHelper.serializeToBlob("6")).getValue());
- assertEquals("5", organizer.read(BlobHelper.serializeToBlob("7")).getValue());
- organizer.close();
- }
-
- public void testMajorCompactionWithDelete() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- // #1
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent("1", "1"));
- items.add(new TestEvent("2", "1"));
- items.add(new TestEvent("3", "1"));
- items.add(new TestEvent("4", "1"));
- items.add(new TestEvent("4", "10", Operation.DESTROY));
- organizer.flush(items.iterator(), items.size());
-
- // #2
- items.clear();
- items.add(new TestEvent("2", "1", Operation.DESTROY));
- items.add(new TestEvent("4", "1", Operation.DESTROY));
- items.add(new TestEvent("6", "1", Operation.INVALIDATE));
- items.add(new TestEvent("8", "1"));
- organizer.flush(items.iterator(), items.size());
-
- // #3
- items.clear();
- items.add(new TestEvent("1", "1"));
- items.add(new TestEvent("3", "1"));
- items.add(new TestEvent("5", "1"));
- items.add(new TestEvent("7", "1"));
- items.add(new TestEvent("9", "1", Operation.DESTROY));
- organizer.flush(items.iterator(), items.size());
-
- // #4
- items.clear();
- items.add(new TestEvent("0", "1", Operation.DESTROY));
- items.add(new TestEvent("1", "1"));
- items.add(new TestEvent("4", "1"));
- items.add(new TestEvent("5", "1"));
- organizer.flush(items.iterator(), items.size());
-
- // check file existence in bucket directory, expect 4 hoplgos
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(4, hoplogs.length);
-
- // After compaction expect 1 hoplog only. It should have the same sequence number as that of the
- // youngest file compacted, which should be 4 in this case
- organizer.getCompactor().compact(true, false);
- hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
- assertEquals(1, hoplogs.length);
- assertEquals(1, organizer.getSortedOplogs().size());
- Hoplog hoplog = new HFileSortedOplog(hdfsStore, hoplogs[0].getPath(), blockCache, stats, storeStats);
- assertEquals(4, HdfsSortedOplogOrganizer.getSequenceNumber(hoplog));
-
- // iterate on oplogs to validate data in files
- HoplogSetIterator iter = new HoplogSetIterator(organizer.getSortedOplogs());
- int count = 0;
-
- // entries in () are destroyed or invalidated
- // 1, 2, 3, 4, (11)
- // (2), (4), (6), 8
- // 1, 3, 5, 7, (9)
- // (0), 1, 4, 5
- String[] expectedValues = { "1", "3", "4", "5", "7", "8" };
- for (ByteBuffer keyBB = null; iter.hasNext();) {
- keyBB = iter.next();
- byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
- assertEquals(expectedValues[count], BlobHelper.deserializeBlob(key));
- count++;
- }
- assertEquals(6, count);
-
- // there must be 4 expired hoplogs now
- hoplogs = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(4, hoplogs.length);
- organizer.close();
- }
-
- public void testGainComputation() throws Exception {
- HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
- HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
- ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
- for (int i = 0; i < 10; i++) {
- targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
- }
-
- // each read has cost 3. Four files read cost is 3 * 4. Reduce read cost of
- // file after compaction
- float expect = (float) ((3 * 4.0 - 3) / (20 + 30 + 40 + 50));
- float result = bucket.computeGain(2, 5, targets);
- assertTrue(Math.abs(expect - result) < (expect/1000));
-
- // each read has cost 3 except 10MB file with read cost 2. 9 files read cost
- // is 3 * 9. Reduce read cost of file after compaction.
- expect = (float) ((3 * 9 - 3 - 1.0) / (10 + 20 + 30 + 40 + 50 + 60 + 70 + 80 + 90));
- result = bucket.computeGain(0, 9, targets);
- assertTrue(Math.abs(expect - result) < (expect/1000));
- }
-
- public void testGainComputeSmallFile() throws Exception {
- HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
- HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
-
- ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
- for (int i = 0; i < 10; i++) {
- targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
- }
-
- float result = bucket.computeGain(2, 5, targets);
- assertTrue(Math.abs(8.0 - result) < (1.0/1000));
- }
-
- public void testGainComputeMixedFiles() throws Exception {
- HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
- HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
-
- ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
- for (int i = 0; i < 10; i++) {
- targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB / 1024)));
- }
- TestHoplog midHop = (TestHoplog) targets.get(4).get();
- // one more than other files
- midHop.size = 5 * TEN_MB;
-
- float expect = (float) ((4 * 2 - 3 + 1.0) / 50);
- float result = bucket.computeGain(2, 5, targets);
- System.out.println(expect);
- System.out.println(result);
- assertTrue(Math.abs(expect - result) < (expect/1000));
- }
-
- public void testGainComputeBadRatio() throws Exception {
- HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
- HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
- ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
- for (int i = 0; i < 10; i++) {
- targets.add(new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, i * TEN_MB)));
- }
-
- TestHoplog firstHop = (TestHoplog) targets.get(2).get();
- // one more than other files
- firstHop.size = (1 + 30 + 40 + 50) * TEN_MB;
- Float result = bucket.computeGain(2, 5, targets);
- assertNull(result);
- }
-
- public void testMinorCompactionTargetMaxSize() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
-
- ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
- for (int i = 0; i < 5; i++) {
- TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
- hop.increment();
- targets.add(hop);
- }
- TrackedReference<TestHoplog> oldestHop = targets.get(targets.size() - 1);
- TestHoplog thirdHop = (TestHoplog) targets.get(2).get();
-
- // oldest is more than max size is ignored
- oldestHop.get().size = HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB + 100;
- List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(4, list.size());
- for (TrackedReference<Hoplog> ref : list) {
- assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 5 );
- }
-
- // third is more than max size but is not ignored
- thirdHop.size = HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB + 100;
- oldestHop.increment();
- list = (List<TrackedReference<Hoplog>>) targets.clone();
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(4, list.size());
- int i = 0;
- for (TrackedReference<Hoplog> ref : list) {
- if (i != 2) {
- assertTrue(((TestHoplog) ref.get()).size - TEN_MB < 5);
- } else {
- assertTrue(((TestHoplog) ref.get()).size > HDFSStore.DEFAULT_INPUT_FILE_SIZE_MAX_MB * ONE_MB);
- }
- i++;
- }
- }
-
- public void testAlterMaxInputFileSize() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
-
- assertTrue(TEN_MB * 2 < hdfsStore.getInputFileSizeMax() * ONE_MB);
-
- ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
- for (int i = 0; i < 5; i++) {
- TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
- hop.increment();
- targets.add(hop);
- }
-
- List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(targets.size(), list.size());
-
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
- mutator.setInputFileSizeMax(1);
- hdfsStore.alter(mutator);
-
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(0, list.size());
- }
-
- public void testAlterInputFileCount() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
-
- assertTrue(2 < hdfsStore.getInputFileCountMax());
-
- ArrayList<TrackedReference<TestHoplog>> targets = new ArrayList<TrackedReference<TestHoplog>>();
- for (int i = 0; i < 5; i++) {
- TrackedReference<TestHoplog> hop = new TrackedReference<TestHoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
- hop.increment();
- targets.add(hop);
- }
-
- List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(targets.size(), list.size());
-
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
- mutator.setInputFileCountMax(2);
- mutator.setInputFileCountMin(2);
- hdfsStore.alter(mutator);
-
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(2, list.size());
- }
-
- public void testAlterMajorCompactionInterval() throws Exception {
- final AtomicInteger majorCReqCount = new AtomicInteger(0);
-
- final Compactor compactor = new AbstractCompactor() {
- @Override
- public boolean compact(boolean isMajor, boolean isForced) throws IOException {
- majorCReqCount.incrementAndGet();
- return true;
- }
- };
-
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
- @Override
- public synchronized Compactor getCompactor() {
- return compactor;
- }
- };
-
- // create hoplog in the past, 90 seconds before current time
- organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
- TimeUnit.MILLISECONDS.sleep(50);
- organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, ONE_MB, System.currentTimeMillis() - 90000));
-
- alterMajorCompaction(hdfsStore, true);
-
- List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
- assertEquals(2, hoplogs.size());
-
- organizer.performMaintenance();
- TimeUnit.MILLISECONDS.sleep(100);
- assertEquals(0, majorCReqCount.get());
-
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
- mutator.setMajorCompactionInterval(1);
- hdfsStore.alter(mutator);
-
- organizer.performMaintenance();
- TimeUnit.MILLISECONDS.sleep(100);
- assertEquals(1, majorCReqCount.get());
- }
-
- public void testMinorCompactionTargetMinCount() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
-
- ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
- for (int i = 0; i < 2; i++) {
- TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
- hop.increment();
- targets.add(hop);
- }
- compactor.getMinorCompactionTargets(targets, -1);
- assertEquals(0, targets.size());
- }
-
- public void testMinorCompactionLessTargetsStatsUpdate() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent("1", "1"));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent("2", "2", Operation.DESTROY));
- organizer.flush(items.iterator(), items.size());
-
- TimeUnit.SECONDS.sleep(1);
- List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
- assertEquals(2, hoplogs.size());
-
- organizer.performMaintenance();
- hoplogs = organizer.getSortedOplogs();
- assertEquals(2, hoplogs.size());
- }
-
- public void testMinorCompactionTargetsOptimizer() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
-
- ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
- for (int i = 0; i < 6; i++) {
- TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
- hop.increment();
- targets.add(hop);
- }
- List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(6, list.size());
-
- TestHoplog fifthHop = (TestHoplog) targets.get(4).get();
- // fifth hop needs additional block read as it has more than max keys size
- fifthHop.size = (HdfsSortedOplogOrganizer.AVG_NUM_KEYS_PER_INDEX_BLOCK * 5 + 1) * 64 * 1024;
- list = (List<TrackedReference<Hoplog>>) targets.clone();
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(4, list.size());
- for (TrackedReference<Hoplog> ref : list) {
- assertTrue(((TestHoplog)ref.get()).size - TEN_MB < 4 );
- }
- }
-
- public void testTargetsReleasedBadRatio() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
-
- ArrayList<TrackedReference<Hoplog>> targets = new ArrayList<TrackedReference<Hoplog>>();
- for (int i = 0; i < 3; i++) {
- TrackedReference<Hoplog> hop = new TrackedReference<Hoplog>(new TestHoplog(hdfsStore, TEN_MB + i));
- hop.increment();
- targets.add(hop);
- }
- TestHoplog oldestHop = (TestHoplog) targets.get(2).get();
- oldestHop.size = (1 + 30) * TEN_MB;
-
- List<TrackedReference<Hoplog>> list = (List<TrackedReference<Hoplog>>) targets.clone();
- compactor.getMinorCompactionTargets(list, -1);
- assertEquals(0, list.size());
- assertEquals(3, targets.size());
- for (TrackedReference<Hoplog> ref : targets) {
- assertEquals(0, ref.uses());
- }
- }
-
- public void testMinorCTargetsIgnoreMajorC() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 7; i++) {
- items.clear();
- items.add(new TestEvent("1" + i, "1" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
- List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
- compactor.getMinorCompactionTargets(targets, -1);
- assertEquals(7, targets.size());
-
- targets = organizer.getSortedOplogs();
- for (TrackedReference<Hoplog> ref : targets) {
- ref.increment();
- }
- compactor.getMinorCompactionTargets(targets, 2);
- assertEquals((7 - 2), targets.size());
- targets = organizer.getSortedOplogs();
- for (int i = 0; i < targets.size(); i++) {
- if (i + 1 <= (7 - 2)) {
- assertEquals(1, targets.get(i).uses());
- } else {
- assertEquals(0, targets.get(i).uses());
- }
- }
-
- targets = organizer.getSortedOplogs();
- for (TrackedReference<Hoplog> ref : targets) {
- if (ref.uses() == 0) {
- ref.increment();
- }
- assertEquals(1, ref.uses());
- }
- compactor.getMinorCompactionTargets(targets, 7);
- assertEquals(0, targets.size());
-
- targets = organizer.getSortedOplogs();
- for (int i = 0; i < targets.size(); i++) {
- assertEquals(0, targets.get(i).uses());
- }
- }
-
- public void testTargetOverlap() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 7; i++) {
- items.clear();
- items.add(new TestEvent("1" + i, "1" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
- List<TrackedReference<Hoplog>> targets = organizer.getSortedOplogs();
- assertTrue(compactor.isMinorMajorOverlap(targets, 8));
- assertTrue(compactor.isMinorMajorOverlap(targets, 7));
- assertTrue(compactor.isMinorMajorOverlap(targets, 6));
- assertTrue(compactor.isMinorMajorOverlap(targets, 1));
- assertFalse(compactor.isMinorMajorOverlap(targets, 0));
- assertFalse(compactor.isMinorMajorOverlap(targets, -1));
-
- targets.remove(targets.size() -1); // remove the last one
- targets.remove(targets.size() -1); // remove the last one again
- assertFalse(compactor.isMinorMajorOverlap(targets, 1));
- assertFalse(compactor.isMinorMajorOverlap(targets, 2));
- assertTrue(compactor.isMinorMajorOverlap(targets, 3));
-
- targets.remove(3); // remove from the middle, seq num 4
- assertTrue(compactor.isMinorMajorOverlap(targets, 4));
- assertTrue(compactor.isMinorMajorOverlap(targets, 3));
- }
-
- public void testSuspendMinorByMajor() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 5; i++) {
- items.clear();
- items.add(new TestEvent("1" + i, "1" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
-
- Hoplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir + "/"
- + getName() + "-" + System.currentTimeMillis() + "-1.ihop.tmp"), blockCache, stats, storeStats);
- compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
-
- cache.getLogger().info("<ExpectedException action=add>java.lang.InterruptedException</ExpectedException>");
- try {
- compactor.maxMajorCSeqNum.set(3);
- compactor.fillCompactionHoplog(false, organizer.getSortedOplogs(), hoplog, -1);
- fail();
- } catch (InterruptedException e) {
- // expected
- }
- cache.getLogger().info("<ExpectedException action=remove>java.lang.InterruptedException</ExpectedException>");
- organizer.close();
- }
-
- public void testMajorCompactionSetsSeqNum() throws Exception {
- final CountDownLatch compactionStartedLatch = new CountDownLatch(1);
- final CountDownLatch waitLatch = new CountDownLatch(1);
- class MyOrganizer extends HdfsSortedOplogOrganizer {
- final HoplogCompactor compactor = new MyCompactor();
- public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
- super(region, bucketId);
- }
- public synchronized Compactor getCompactor() {
- return compactor;
- }
- class MyCompactor extends HoplogCompactor {
- @Override
- public long fillCompactionHoplog(boolean isMajor,
- List<TrackedReference<Hoplog>> targets, Hoplog output,
- int majorCSeqNum) throws IOException, InterruptedException {
- compactionStartedLatch.countDown();
- waitLatch.await();
- long byteCount = 0;
- try {
- byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
- } catch (ForceReattemptException e) {
- // we do not expect this in a unit test.
- }
- return byteCount;
- }
- }
- }
-
- final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 3; i++) {
- items.clear();
- items.add(new TestEvent("1" + i, "1" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- Thread t = new Thread(new Runnable() {
- public void run() {
- try {
- organizer.getCompactor().compact(true, false);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
- t.start();
- compactionStartedLatch.await();
- assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
- waitLatch.countDown();
- t.join();
- }
-
- public void testMinorWatchesMajorsSeqNum() throws Exception {
- final CountDownLatch majorCStartedLatch = new CountDownLatch(1);
- final CountDownLatch majorCWaitLatch = new CountDownLatch(1);
-
- final CountDownLatch minorCStartedLatch = new CountDownLatch(1);
- final List<TrackedReference<Hoplog>> minorTargets = new ArrayList<TrackedReference<Hoplog>>();
-
- class MyOrganizer extends HdfsSortedOplogOrganizer {
- final HoplogCompactor compactor = new MyCompactor();
- public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
- super(region, bucketId);
- }
- public synchronized Compactor getCompactor() {
- return compactor;
- }
- class MyCompactor extends HoplogCompactor {
- @Override
- public long fillCompactionHoplog(boolean isMajor,
- List<TrackedReference<Hoplog>> targets, Hoplog output,
- int majorCSeqNum) throws IOException, InterruptedException {
- if (isMajor) {
- majorCStartedLatch.countDown();
- majorCWaitLatch.await();
- } else {
- minorCStartedLatch.countDown();
- minorTargets.addAll(targets);
- }
- long byteCount =0;
- try {
- byteCount = super.fillCompactionHoplog(isMajor, targets, output, majorCSeqNum);
- } catch (ForceReattemptException e) {
- // we do not expect this in a unit test.
- }
- return byteCount;
- }
- }
- }
-
- final HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 3; i++) {
- items.clear();
- items.add(new TestEvent("1" + i, "1" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- Thread majorCThread = new Thread(new Runnable() {
- public void run() {
- try {
- organizer.getCompactor().compact(true, false);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
- majorCThread.start();
- majorCStartedLatch.await();
- assertEquals(3, ((HoplogCompactor)organizer.getCompactor()).maxMajorCSeqNum.get());
-
- // create more files for minor C
- for (int i = 0; i < 4; i++) {
- items.clear();
- items.add(new TestEvent("1" + i, "1" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- Thread minorCThread = new Thread(new Runnable() {
- public void run() {
- try {
- organizer.getCompactor().compact(false, false);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- });
- minorCThread.start();
- minorCThread.join();
- assertEquals(4, minorTargets.size());
- for (TrackedReference<Hoplog> ref : minorTargets) {
- assertTrue(organizer.getSequenceNumber(ref.get()) >= 4);
- }
-
- majorCWaitLatch.countDown();
- majorCThread.join();
- }
-
- public void testTimeBoundedSuspend() throws Exception {
- final AtomicBoolean barrier = new AtomicBoolean(true);
-
- class MyOrganizer extends HdfsSortedOplogOrganizer {
- public MyOrganizer(HdfsRegionManager region, int bucketId) throws IOException {
- super(region, bucketId);
- }
- public synchronized Compactor getCompactor() {
- return new MyCompactor();
- }
- class MyCompactor extends HoplogCompactor {
- public long fillCompactionHoplog(boolean isMajor, List<TrackedReference<Hoplog>> targets, Hoplog output)
- throws IOException, InterruptedException {
- barrier.set(false);
- TimeUnit.SECONDS.sleep(5 * HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
- long byteCount =0;
- try {
- byteCount = super.fillCompactionHoplog(isMajor, targets, output, -1);
- } catch (ForceReattemptException e) {
- // we do not expect this in a unit test.
- }
- return byteCount;
- }
- }
- }
-
- HdfsSortedOplogOrganizer organizer = new MyOrganizer(regionManager, 0);
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 4; i++) {
- items.clear();
- items.add(new TestEvent("1" + i, "1" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- final HoplogCompactor compactor = (HoplogCompactor) organizer.getCompactor();
- ExecutorService service = Executors.newCachedThreadPool();
- service.execute(new Runnable() {
- public void run() {
- try {
- compactor.compact(false, false);
- } catch (Exception e) {
- }
- }
- });
-
- final AtomicLong start = new AtomicLong(0);
- final AtomicLong end = new AtomicLong(0);
- service.execute(new Runnable() {
- public void run() {
- while (barrier.get()) {
- try {
- TimeUnit.MILLISECONDS.sleep(50);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- start.set(System.currentTimeMillis());
- compactor.suspend();
- end.set(System.currentTimeMillis());
- }
- });
-
- for (long i = 0; i < 5; i++) {
- if (end.get() == 0) {
- TimeUnit.MILLISECONDS.sleep(HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT / 2);
- } else {
- break;
- }
- }
-
- assertTrue(end.get() - start.get() < 100 + HoplogConfig.SUSPEND_MAX_WAIT_MS_DEFAULT);
- }
-
- public static class TestHoplog extends AbstractHoplog {
- long size;
- long creationTime;
- TestHoplog(HDFSStoreImpl store, long size) throws IOException {
- this(store, size, 0);
- }
-
- TestHoplog(HDFSStoreImpl store, long size, long creationTime) throws IOException {
- super(store, new Path("1-" + creationTime + "-1.hop"), null);
- this.size = size;
- this.creationTime = creationTime;
- }
-
- @Override
- public long getSize() {
- return size;
- }
- @Override
- public long getModificationTimeStamp() {
- if (creationTime > 0) {
- return creationTime;
- }
- return super.getModificationTimeStamp();
- }
- @Override
- public String toString() {
- long name = size - TEN_MB;
- if (name < 0) name = size - (TEN_MB / 1024);
- return name + "";
- }
- public boolean isClosed() {
- return false;
- }
- public void close() throws IOException {
- }
- public HoplogReader getReader() throws IOException {
- return null;
- }
- public HoplogWriter createWriter(int keys) throws IOException {
- return null;
- }
- public void close(boolean clearCache) throws IOException {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
deleted file mode 100644
index fe15305..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKeyJUnitTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import junit.framework.TestCase;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class GFKeyJUnitTest extends TestCase {
- public void testSerde() throws Exception {
- String str = "str";
- GFKey key = new GFKey();
- key.setKey(str);
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- key.write(dos);
-
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- DataInputStream dis = new DataInputStream(bais);
- key.readFields(dis);
-
- assertEquals(str, key.getKey());
- }
-
- public void testCompare() {
- GFKey keya = new GFKey();
- keya.setKey("a");
-
- GFKey keyb = new GFKey();
- keyb.setKey("b");
-
- assertEquals(-1, keya.compareTo(keyb));
- assertEquals(1, keyb.compareTo(keya));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
deleted file mode 100644
index 5ebb00e..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIteratorJUnitTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HFileSortedOplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HDFSSplitIteratorJUnitTest extends BaseHoplogTestCase {
- public void test1Hop1BlockIter() throws Exception {
- Path path = new Path(testDataDir, "region/0/1-1-1.hop");
- Hoplog oplog = new HFileSortedOplog(hdfsStore, path, blockCache, stats,
- storeStats);
- createHoplog(10, oplog);
-
- Path[] paths = {path};
- long[] starts = {0};
- long[] lengths = {oplog.getSize()};
- HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
-
- int count = 0;
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- assertEquals("key-" + count, new String((byte[])iter.getKey()));
- count++;
- }
- assertEquals(10, count);
- }
-
- public void test1HopNBlockIter() throws Exception {
- Path path = new Path(testDataDir, "region/0/1-1-1.hop");
- Hoplog oplog = new HFileSortedOplog(hdfsStore, path,
- blockCache, stats, storeStats);
- createHoplog(2000, oplog);
-
- FileSystem fs = hdfsStore.getFileSystem();
- Reader reader = HFile.createReader(fs, path, new CacheConfig(fs.getConf()));
- BlockIndexReader bir = reader.getDataBlockIndexReader();
- int blockCount = bir.getRootBlockCount();
- reader.close();
-
- // make sure there are more than 1 hfile blocks in the hoplog
- assertTrue(1 < blockCount);
-
- Path[] paths = {path};
- long half = oplog.getSize()/2;
- long[] starts = {0};
- long[] lengths = {half};
- HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
-
- int count = 0;
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
- count++;
- }
- // the number of iterations should be less than number of keys inserted in
- // the hoplog
- assertTrue(count < 2000 && count > 0);
-
- paths = new Path[] {path};
- starts = new long[] {half + 1};
- lengths = new long[] {oplog.getSize()};
- iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
-
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
- count++;
- }
- assertEquals(2000, count);
-
- paths = new Path[] {path, path};
- starts = new long[] {0, half + 1};
- lengths = new long[] {half, oplog.getSize()};
- iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
-
- count = 0;
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- assertEquals("key-" + (count + 100000), new String((byte[])iter.getKey()));
- count++;
- }
- assertEquals(2000, count);
- }
-
- /*
- * This tests iterates over 3 hoplog files. The three hoplog files have the
- * same content. Duplicate keys should not get discarded
- */
- public void testNHoplogNBlockIter() throws Exception {
- Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
- Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
- blockCache, stats, storeStats);
- createHoplog(2000, oplog);
-
- FileSystem fs = hdfsStore.getFileSystem();
- Reader reader = HFile.createReader(fs, path1, new CacheConfig(fs.getConf()));
- BlockIndexReader bir = reader.getDataBlockIndexReader();
- int blockCount = bir.getRootBlockCount();
- reader.close();
-
- // make sure there are more than 1 hfile blocks in the hoplog
- assertTrue(1 < blockCount);
-
- Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
- oplog = new HFileSortedOplog(hdfsStore, path2,
- blockCache, stats, storeStats);
- createHoplog(2000, oplog);
-
- Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
- oplog = new HFileSortedOplog(hdfsStore, path3,
- blockCache, stats, storeStats);
- createHoplog(2000, oplog);
-
- Path[] paths = {path1, path2, path3, path1, path2, path3};
- long half = oplog.getSize()/2;
- long[] starts = {0, 0, 0, half + 1, half + 1, half + 1};
- long[] lengths = {half, half, half, oplog.getSize(), oplog.getSize(), oplog.getSize()};
- HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
-
- int[] keyCounts = new int[2000];
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- String key = new String((byte[])iter.getKey()).substring("key-".length());
- keyCounts[Integer.valueOf(key) - 100000] ++;
- }
-
- for (int i : keyCounts) {
- assertEquals(3, i);
- }
- }
-
- public void testMRLikeNHopIter() throws Exception {
- Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
- Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
-
- Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
- oplog = new HFileSortedOplog(hdfsStore, path2,
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
-
- Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
- oplog = new HFileSortedOplog(hdfsStore, path3,
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
-
- Path[] paths = {path1, path2, path3};
- long[] starts = {0, 0, 0};
- long[] lengths = {oplog.getSize(), oplog.getSize(), oplog.getSize()};
- HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
-
- int[] keyCounts = new int[10];
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- // extra has next before key read
- iter.hasNext();
- String key = new String((byte[])iter.getKey()).substring("key-".length());
- System.out.println(key);
- keyCounts[Integer.valueOf(key)] ++;
- }
-
- for (int i : keyCounts) {
- assertEquals(3, i);
- }
- }
-
- public void test1Hop1BlockIterSkipDeletedHoplogs() throws Exception {
- FileSystem fs = hdfsStore.getFileSystem();
- Path path = new Path(testDataDir, "region/0/1-1-1.hop");
- Hoplog oplog = new HFileSortedOplog(hdfsStore, path,
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
-
- Path[] paths = {path};
- long[] starts = {0};
- long[] lengths = {oplog.getSize()};
-
- //Delete the Hoplog file
- fs.delete(path, true);
-
- HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
- assertFalse(iter.hasNext());
-
- }
-
- public void testMRLikeNHopIterSkipDeletedHoplogs() throws Exception {
- FileSystem fs = hdfsStore.getFileSystem();
- //Create Hoplogs
- Path path1 = new Path(testDataDir, "region/0/1-1-1.hop");
- Hoplog oplog = new HFileSortedOplog(hdfsStore, path1,
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
-
- Path path2 = new Path(testDataDir, "region/0/1-2-1.hop");
- oplog = new HFileSortedOplog(hdfsStore, path2,
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
-
- Path path3 = new Path(testDataDir, "region/0/1-3-1.hop");
- oplog = new HFileSortedOplog(hdfsStore, path3,
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
-
- Path[] paths = {path1, path2, path3};
- long[] starts = {0, 0, 0};
- long[] lengths = {oplog.getSize(), oplog.getSize(), oplog.getSize()};
- HDFSSplitIterator iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
- int count = 0;
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- count++;
- }
- assertEquals(30, count);
-
- for(int i = 0; i < 3; ++i){
- fs.delete(paths[i], true);
- iter = HDFSSplitIterator.newInstance(
- hdfsStore.getFileSystem(), paths, starts, lengths, 0, 0);
- count = 0;
- while (iter.hasNext()) {
- boolean success = iter.next();
- assertTrue(success);
- count++;
- }
- assertEquals(20, count);
- oplog = new HFileSortedOplog(hdfsStore, paths[i],
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
deleted file mode 100644
index a209b6e..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtilJUnitTest.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.BaseHoplogTestCase;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HdfsSortedOplogOrganizer;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HoplogUtilJUnitTest extends BaseHoplogTestCase {
- Path regionPath = null;
-
- @Override
- protected void configureHdfsStoreFactory() throws Exception {
- super.configureHdfsStoreFactory();
-
- hsf.setInputFileCountMin(3);
- hsf.setMinorCompaction(false);
- hsf.setMajorCompaction(false);
- }
-
- public void testHoplogListingMultiBucket() throws Exception {
- createHoplogs();
-
- Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- assertEquals(5, hdfsStore.getFileSystem().listStatus(regionPath).length);
- assertEquals(15, hoplogs.size());
- }
-
- public void testHoplogListingMixFileTypes() throws Exception {
- createHoplogs();
-
- HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- organizer.getCompactor().compact(false, false);
-
- Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- assertEquals(7,
- hdfsStore.getFileSystem().listStatus(new Path(regionPath, "0")).length);
- assertEquals(15, hoplogs.size());
- }
-
- public void testHoplogListingEmptyBucket() throws Exception {
- createHoplogs();
-
- hdfsStore.getFileSystem().mkdirs(new Path(regionPath, "100"));
-
- Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- assertEquals(6, hdfsStore.getFileSystem().listStatus(regionPath).length);
- assertEquals(15, hoplogs.size());
- }
-
- public void testHoplogListingInvalidBucket() throws Exception {
- createHoplogs();
-
- hdfsStore.getFileSystem().rename(new Path(regionPath, "0"),
- new Path(regionPath, "not_a_bucket"));
-
- Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- assertEquals(5, hdfsStore.getFileSystem().listStatus(regionPath).length);
- assertEquals(12, hoplogs.size());
- }
-
- public void testHoplogListingInvalidFiles() throws Exception {
- createHoplogs();
-
- Path bucketPath = new Path(regionPath, "0");
- FSDataOutputStream stream = hdfsStore.getFileSystem().create(
- new Path(bucketPath, "not_a_hoplog"));
- stream.close();
-
- Collection<FileStatus> hoplogs = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- assertEquals(4, hdfsStore.getFileSystem().listStatus(bucketPath).length);
- assertEquals(15, hoplogs.size());
- }
-
- public void testTimeRange() throws Exception {
- createHoplogs();
- // rename hoplogs for testing purpose
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
- regionManager, 0);
- List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
- assertEquals(3, hoplogs.size());
- hoplogs.get(0).get().rename("0-300-1.hop");
- hoplogs.get(1).get().rename("0-310-1.hop");
- hoplogs.get(2).get().rename("0-320-1.hop");
- organizer.close();
-
- organizer = new HdfsSortedOplogOrganizer(regionManager, 3);
- hoplogs = organizer.getSortedOplogs();
- assertEquals(3, hoplogs.size());
- hoplogs.get(0).get().rename("0-600-1.hop");
- hoplogs.get(1).get().rename("0-610-1.hop");
- hoplogs.get(2).get().rename("0-620-1.hop");
- organizer.close();
-
- organizer = new HdfsSortedOplogOrganizer(regionManager, 6);
- hoplogs = organizer.getSortedOplogs();
- assertEquals(3, hoplogs.size());
- hoplogs.get(0).get().rename("0-100-1.hop");
- hoplogs.get(1).get().rename("0-110-1.hop");
- hoplogs.get(2).get().rename("0-120-1.hop");
-
- Collection<FileStatus> filtered = HoplogUtil.getRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 300, 305);
- assertEquals(5, filtered.size());
- assertTrue(containsHoplogWithName(filtered, "0-300-1.hop"));
- assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
- assertTrue(containsHoplogWithName(filtered, "0-600-1.hop"));
-
- filtered = HoplogUtil.getRegionHoplogs(regionPath,
- hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 250, 310);
- assertEquals(6, filtered.size());
- assertTrue(containsHoplogWithName(filtered, "0-300-1.hop"));
- assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
- assertTrue(containsHoplogWithName(filtered, "0-320-1.hop"));
-
- filtered = HoplogUtil.getRegionHoplogs(regionPath,
- hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 301, 311);
- assertEquals(5, filtered.size());
- assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
- assertTrue(containsHoplogWithName(filtered, "0-320-1.hop"));
-
- filtered = HoplogUtil.getRegionHoplogs(regionPath,
- hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, 301, 309);
- assertEquals(4, filtered.size());
- assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
- organizer.close();
- }
-
- public void testExcludeSoonCleanedHoplogs() throws Exception {
- FileSystem fs = hdfsStore.getFileSystem();
- Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
- regionManager, 0);
- //delete the auto generated clean up interval file
- if (fs.exists(cleanUpIntervalPath)){
- fs.delete(cleanUpIntervalPath, true);
- }
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- int count = 10;
- for (int fileCount = 0; fileCount < 3; fileCount++) {
- items.clear();
- for (int itemCount = 0; itemCount < count; itemCount++) {
- items.add(new TestEvent(("key-" + itemCount), "value"));
- }
- organizer.flush(items.iterator(), count);
- }
- List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
-
- for(TrackedReference<Hoplog> hoplog : hoplogs) {
- Path p = new Path(testDataDir, getName() + "/0/" +
- hoplog.get().getFileName() + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- fs.createNewFile(p);
- }
- Collection<FileStatus> files = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(3, files.size());
-
- TimeUnit.MINUTES.sleep(2);
- //No clean up interval file, all expired files will be included
- files = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(3, files.size());
-
-
- long interval = 1 * 60 * 1000;
- HoplogUtil.exposeCleanupIntervalMillis(fs,cleanUpIntervalPath,interval);
-
- files = HoplogUtil.getAllRegionHoplogs(
- regionPath, hdfsStore.getFileSystem(),
- AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(0, files.size());
- organizer.close();
- }
-
-
- public void testCheckpointSelection() throws Exception {
- createHoplogs();
- // rename hoplogs for testing purpose
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
- regionManager, 0);
- List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
- assertEquals(3, hoplogs.size());
- hoplogs.get(0).get().rename("0-300-1.chop");
- hoplogs.get(1).get().rename("0-310-1.hop");
- hoplogs.get(2).get().rename("0-320-1.hop"); // checkpoint file
- organizer.close();
-
- organizer = new HdfsSortedOplogOrganizer(regionManager, 3);
- hoplogs = organizer.getSortedOplogs();
- assertEquals(3, hoplogs.size());
- hoplogs.get(0).get().rename("0-600-1.hop");
- hoplogs.get(1).get().rename("0-610-1.chop");
- hoplogs.get(2).get().rename("0-620-1.hop");
- organizer.close();
-
- organizer = new HdfsSortedOplogOrganizer(regionManager, 6);
- hoplogs = organizer.getSortedOplogs();
- assertEquals(3, hoplogs.size());
- hoplogs.get(0).get().rename("0-100-1.hop");
- hoplogs.get(1).get().rename("0-110-1.hop");
- hoplogs.get(2).get().rename("0-120-1.chop");
-
- Collection<FileStatus> filtered = HoplogUtil.filterHoplogs(
- hdfsStore.getFileSystem(), regionPath, 290, 305, false);
- assertEquals(4, filtered.size());
- assertTrue(containsHoplogWithName(filtered, "0-310-1.hop"));
- assertTrue(containsHoplogWithName(filtered, "0-600-1.hop"));
-
- filtered = HoplogUtil.filterHoplogs(hdfsStore.getFileSystem(),
- regionPath, 290, 305, true);
- assertEquals(3, filtered.size());
- assertTrue(containsHoplogWithName(filtered, "0-300-1.chop"));
- assertTrue(containsHoplogWithName(filtered, "0-610-1.chop"));
- assertTrue(containsHoplogWithName(filtered, "0-120-1.chop"));
- organizer.close();
- }
-
- private boolean containsHoplogWithName(Collection<FileStatus> filtered,
- String name) {
- for (FileStatus file : filtered) {
- if (file.getPath().getName().equals(name)) {
- return true;
- }
- }
- return false;
- }
-
- private void createHoplogs() throws IOException, Exception {
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- int count = 10;
- for (int bucketId = 0; bucketId < 15; bucketId += 3) {
- HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager,
- bucketId);
- for (int fileCount = 0; fileCount < 3; fileCount++) {
- items.clear();
- for (int itemCount = 0; itemCount < count; itemCount++) {
- items.add(new TestEvent(("key-" + itemCount), "value"));
- }
- organizer.flush(items.iterator(), count);
- }
- }
- }
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- regionPath = new Path(testDataDir, getName());
- }
-
- @Override
- protected void tearDown() throws Exception{
- FileSystem fs = hdfsStore.getFileSystem();
- Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(),HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
- if (fs.exists(cleanUpIntervalPath)){
- fs.delete(cleanUpIntervalPath, true);
- }
- super.tearDown();
- }
-}