You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/10/21 17:59:00 UTC
[13/15] incubator-geode git commit: GEODE-429: Remove HdfsStore Junit
and Dunits
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
deleted file mode 100644
index e6a1229..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
+++ /dev/null
@@ -1,1045 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.TieredCompactionJUnitTest.TestHoplog;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-import dunit.DistributedTestCase;
-import dunit.DistributedTestCase.ExpectedException;
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HdfsSortedOplogOrganizerJUnitTest extends BaseHoplogTestCase {
- /**
- * Tests flush operation
- */
- public void testFlush() throws Exception {
- int count = 10;
- int bucketId = (int) System.nanoTime();
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- // flush and create hoplog
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < count; i++) {
- items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
- }
- organizer.flush(items.iterator(), count);
-
- // check file existence in bucket directory
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- // only one hoplog should exists
- assertEquals(1, hoplogs.length);
-
- assertEquals(count, organizer.sizeEstimate());
- assertEquals(0, stats.getActiveReaderCount());
- }
-
- /**
- * Tests reads from a set of hoplogs containing both valid and stale KVs
- */
- public void testReopen() throws Exception {
- int bucketId = (int) System.nanoTime();
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- // flush and create hoplog
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 100; i++) {
- items.add(new TestEvent("" + i, ("1-1")));
- }
- organizer.flush(items.iterator(), items.size());
-
- Hoplog hoplog = organizer.getSortedOplogs().iterator().next().get();
- byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
- hoplog.close();
-
- for (int i = 0; i < 10; i++) {
- Path path = new Path(testDataDir, getName() + "/" + bucketId + "/" + hoplog.getFileName());
- HFileSortedOplog oplog = new HFileSortedOplog(hdfsStore, path, blockCache, stats, storeStats);
- oplog.getReader().read(keyBytes1);
- oplog.close(false);
- }
- }
-
- /**
- * Tests reads from a set of hoplogs containing both valid and stale KVs
- */
- public void testRead() throws Exception {
- doRead(regionManager);
- }
-
-// public void testNewReaderWithNameNodeHA() throws Exception {
-// deleteMiniClusterDir();
-// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
-// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
-//
-// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
-// initClientHAConf(nn1port, nn2port);
-//
-// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
-// regionfactory.setHDFSStoreName(store1.getName());
-// Region<Object, Object> region1 = regionfactory.create("region-1");
-// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
-//
-// HoplogOrganizer<SortedHoplogPersistedEvent> organizer = doRead(regionManager1);
-// organizer.close();
-//
-// dunit.DistributedTestCase.ExpectedException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
-// NameNode nnode2 = cluster.getNameNode(1);
-// assertTrue(nnode2.isStandbyState());
-// cluster.shutdownNameNode(0);
-// cluster.transitionToActive(1);
-// assertFalse(nnode2.isStandbyState());
-//
-// organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
-// byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
-// byte[] keyBytes3 = BlobHelper.serializeToBlob("3");
-// byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
-// assertEquals("2-1", organizer.read(keyBytes1).getValue());
-// assertEquals("3-3", organizer.read(keyBytes3).getValue());
-// assertEquals("1-4", organizer.read(keyBytes4).getValue());
-// ex.remove();
-//
-// region1.destroyRegion();
-// store1.destroy();
-// cluster.shutdown();
-// FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
-// }
-
-// public void testActiveReaderWithNameNodeHA() throws Exception {
-// deleteMiniClusterDir();
-// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
-// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
-//
-// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
-// initClientHAConf(nn1port, nn2port);
-//
-// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
-// regionfactory.setHDFSStoreName(store1.getName());
-// Region<Object, Object> region1 = regionfactory.create("region-1");
-// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
-//
-// HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
-// ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-// for (int i = 100000; i < 101000; i++) {
-// items.add(new TestEvent(("" + i), (i + " some string " + i)));
-// }
-// organizer.flush(items.iterator(), items.size());
-// organizer.getSortedOplogs().get(0).get().getReader();
-//
-// dunit.DistributedTestCase.ExpectedException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
-// NameNode nnode2 = cluster.getNameNode(1);
-// assertTrue(nnode2.isStandbyState());
-// cluster.shutdownNameNode(0);
-// cluster.transitionToActive(1);
-// assertFalse(nnode2.isStandbyState());
-//
-// for (int i = 100000; i < 100500; i++) {
-// byte[] keyBytes1 = BlobHelper.serializeToBlob("" + i);
-// assertEquals(i + " some string " + i, organizer.read(keyBytes1).getValue());
-// }
-// ex.remove();
-// region1.destroyRegion();
-// store1.destroy();
-// cluster.shutdown();
-// FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
-// }
-
-// public void testFlushWithNameNodeHA() throws Exception {
-// deleteMiniClusterDir();
-// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
-// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
-//
-// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
-//
-// initClientHAConf(nn1port, nn2port);
-// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
-//
-// regionfactory.setHDFSStoreName(store1.getName());
-// Region<Object, Object> region1 = regionfactory.create("region-1");
-// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
-//
-// HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
-// ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-// items.add(new TestEvent(("1"), ("1-1")));
-// organizer.flush(items.iterator(), items.size());
-//
-// dunit.DistributedTestCase.ExpectedException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
-// NameNode nnode2 = cluster.getNameNode(1);
-// assertTrue(nnode2.isStandbyState());
-// cluster.shutdownNameNode(0);
-// cluster.transitionToActive(1);
-// assertFalse(nnode2.isStandbyState());
-//
-// items.add(new TestEvent(("4"), ("1-4")));
-// organizer.flush(items.iterator(), items.size());
-// byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
-// byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
-// assertEquals("1-1", organizer.read(keyBytes1).getValue());
-// assertEquals("1-4", organizer.read(keyBytes4).getValue());
-// ex.remove();
-//
-// region1.destroyRegion();
-// store1.destroy();
-// cluster.shutdown();
-// FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
-// }
-
- public HoplogOrganizer<SortedHoplogPersistedEvent> doRead(HdfsRegionManager rm) throws Exception {
- HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(rm, 0);
-
- // flush and create hoplog
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("1"), ("1-1")));
- items.add(new TestEvent(("4"), ("1-4")));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent(("1"), ("2-1")));
- items.add(new TestEvent(("3"), ("2-3")));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent(("3"), ("3-3")));
- items.add(new TestEvent(("5"), ("3-5")));
- organizer.flush(items.iterator(), items.size());
-
- // check file existence in bucket directory
- FileStatus[] hoplogs = getBucketHoplogs(rm.getStore().getFileSystem(),
- rm.getRegionFolder() + "/" + 0,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- // expect 3 files are 3 flushes
- assertEquals(3, hoplogs.length);
- byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
- byte[] keyBytes3 = BlobHelper.serializeToBlob("3");
- byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
- // expect key 1 from hoplog 2
- assertEquals("2-1", organizer.read(keyBytes1).getValue());
- // expect key 3 from hoplog 3
- assertEquals("3-3", organizer.read(keyBytes3).getValue());
- // expect key 4 from hoplog 1
- assertEquals("1-4", organizer.read(keyBytes4).getValue());
- return organizer;
- }
-
- /**
- * Tests bucket organizer initialization during startup. Existing hoplogs should identified and
- * returned
- */
- public void testHoplogIdentification() throws Exception {
- // create one empty file and one directories in bucket directory
- Path bucketPath = new Path(testDataDir, getName() + "/0");
- FileSystem fs = hdfsStore.getFileSystem();
- fs.createNewFile(new Path(bucketPath, "temp_file"));
- fs.mkdirs(new Path(bucketPath, "temp_dir"));
-
- // create 2 hoplogs files each of type flush, minor and major hoplog
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION};
- for (String string : extensions) {
- Hoplog oplog = organizer.getTmpSortedOplog(null, string);
- createHoplog(0, oplog);
- organizer.makeLegitimate(oplog);
- }
-
- // create a temp hoplog
- Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
- createHoplog(0, oplog);
-
- // bucket directory should have 6 hoplogs, 1 temp log, 1 misc file and 1 directory
- FileStatus[] results = fs.listStatus(bucketPath);
- assertEquals(9, results.length);
-
- // only two are hoplogs
- List<Hoplog> list = organizer.identifyAndLoadSortedOplogs(true);
- assertEquals(6, list.size());
- }
-
- public void testExpiryMarkerIdentification() throws Exception {
- // epxired hoplogs from the list below should be deleted
- String[] files = {
- "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
- "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
- "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
- "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION };
-
- Path bucketPath = new Path(testDataDir, getName() + "/0");
- FileSystem fs = hdfsStore.getFileSystem();
- for (String file : files) {
- Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
- }
-
- String marker1 = "0-4-1234"
- + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
- + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
- fs.createNewFile(new Path(bucketPath, marker1));
- String marker2 = "0-5-1235"
- + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
- + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
- fs.createNewFile(new Path(bucketPath, marker2));
-
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
- assertEquals(7, hoplogs.length);
-
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
- regionManager, 0);
-
- FileStatus[] markers = organizer.getExpiryMarkers();
- // one hoplog and one exp marker will be deletion targets
- assertEquals(2, markers.length);
- for (FileStatus marker : markers) {
- String name = marker.getPath().getName();
- assertTrue(name.equals(marker1) || name.equals(marker2));
- }
- organizer.close();
- }
-
- public void testExpiredHoplogCleanup() throws Exception {
- // epxired hoplogs from the list below should be deleted
- String[] files = {
- "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION
- + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
-
- "0-2-0000" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
- "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
-
- "0-3-0000" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
- "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
- "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
- + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
-
- "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION };
-
- Path bucketPath = new Path(testDataDir, getName() + "/0");
- FileSystem fs = hdfsStore.getFileSystem();
- for (String file : files) {
- if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
- fs.createNewFile(new Path(bucketPath, file));
- continue;
- }
- Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
- }
-
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
- assertEquals(9, hoplogs.length);
-
- long target = System.currentTimeMillis();
- TimeUnit.SECONDS.sleep(1);
-
- // all but minor compacted files from below this will not be deleted as it
- // is after target delete time
- files = new String[] {
- "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION
- + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
-
- "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
- + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
- "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
-
- "0-6-6666" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
- };
- for (String file : files) {
- if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
- fs.createNewFile(new Path(bucketPath, file));
- continue;
- }
- Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
- }
-
- hoplogs = getBucketHoplogs(getName() + "/0", "");
- assertEquals(13, hoplogs.length);
- int hopSize = 0;
- for (FileStatus file : hoplogs) {
- if(file.getLen() > hopSize) {
- hopSize = (int) file.getLen();
- }
- }
-
- final AtomicInteger behavior = new AtomicInteger(0);
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
- @Override
- protected FileStatus[] getExpiryMarkers() throws IOException {
- if (behavior.get() == 1) {
- ArrayList<FileStatus> markers = new ArrayList<FileStatus>();
- for (FileStatus marker : super.getExpiryMarkers()) {
- markers.add(marker);
- }
- // inject a dummy old expiry marker for major compacted file
- long age = 2 * HDFSStore.DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS * 60 * 1000;
- String markerName = "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
- FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
- markers.add(marker);
- return markers.toArray(new FileStatus[markers.size()]);
- }
- return super.getExpiryMarkers();
- }
- };
-
- List<FileStatus> list = organizer.getOptimizationTargets(target);
- assertEquals(6, list.size());
-
- behavior.set(1);
- list = organizer.getOptimizationTargets(target);
- assertEquals(8, list.size());
-
- assertEquals(9 * hopSize, stats.getStoreUsageBytes());
- int count = organizer.deleteExpiredFiles(list);
- assertEquals(8, count);
- assertEquals(5 * hopSize, stats.getStoreUsageBytes());
-
- List<FileStatus> tmp = new ArrayList<FileStatus>(Arrays.asList(hoplogs));
- for (Iterator<FileStatus> iter = tmp.iterator(); iter.hasNext();) {
- hoplogs = getBucketHoplogs(getName() + "/0", "");
- FileStatus file = iter.next();
- for (FileStatus hoplog : hoplogs) {
- if(hoplog.getPath().getName().startsWith("0-5-5555")) {
- fail("this file should have been deleted" + hoplog.getPath().getName());
- }
-
- if (hoplog.getPath().getName().equals(file.getPath().getName())) {
- iter.remove();
- break;
- }
- }
- }
-
- assertEquals(7, tmp.size());
- organizer.close();
- }
-
- public void testAlterPurgeInterval() throws Exception {
- // epxired hoplogs from the list below should be deleted
- String[] files = {
- "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION };
-
- Path bucketPath = new Path(testDataDir, getName() + "/0");
- hdfsStore.getFileSystem();
- for (String file : files) {
- Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
- }
-
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
- int hopSize = 0;
- for (FileStatus file : hoplogs) {
- if(file.getLen() > hopSize) {
- hopSize = (int) file.getLen();
- }
- }
-
- final AtomicInteger behavior = new AtomicInteger(0);
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
- @Override
- protected FileStatus[] getExpiryMarkers() throws IOException {
- if (behavior.get() == 1) {
- ArrayList<FileStatus> markers = new ArrayList<FileStatus>();
- // inject dummy old expiry markers
- long age = 120 * 1000; // 120 seconds old
- String markerName = "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
- FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
- markers.add(marker);
- markerName = "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
- marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
- markers.add(marker);
- return markers.toArray(new FileStatus[markers.size()]);
- }
- return super.getExpiryMarkers();
- }
- };
-
- behavior.set(1);
- int count = organizer.initiateCleanup();
- assertEquals(0, count);
-
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
- mutator.setPurgeInterval(1);
- hdfsStore.alter(mutator);
- count = organizer.initiateCleanup();
- assertEquals(4, count);
- }
-
- public void testInUseExpiredHoplogCleanup() throws Exception {
- Path bucketPath = new Path(testDataDir, getName() + "/0");
- FileSystem fs = hdfsStore.getFileSystem();
-
- String[] files = new String[] {
- "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
- "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
- "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
- "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION };
-
- for (String file : files) {
- Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
- blockCache, stats, storeStats);
- createHoplog(10, oplog);
- }
-
- final HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
- regionManager, 0);
- List<TrackedReference<Hoplog>> hopRefs = organizer.getSortedOplogs();
- assertEquals(files.length, hopRefs.size());
-
- // this is expiry marker for one of the files that will be compacted below.
- // While compaction is going on file deletion should not happen
- files = new String[] { "0-5-1235"
- + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
- + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION };
-
- for (String file : files) {
- fs.createNewFile(new Path(bucketPath, file));
- }
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
- assertEquals(hopRefs.size() + files.length, hoplogs.length);
-
- TimeUnit.MILLISECONDS.sleep(200);
- long target = System.currentTimeMillis();
- List<FileStatus> list = organizer.getOptimizationTargets(target);
- assertEquals(2, list.size());
-
- for (TrackedReference<Hoplog> ref : hopRefs) {
- ref.increment("test");
- }
-
- fs.delete(new Path(bucketPath, files[0]), false);
-
- TimeUnit.MILLISECONDS.sleep(50);
- organizer.markSortedOplogForDeletion(hopRefs, false);
-
- list = organizer.getOptimizationTargets(target);
- assertEquals(0, list.size());
- organizer.close();
- }
-
- /**
- * Tests max sequence initialization when file already exists and server starts
- */
- public void testSeqInitialization() throws Exception {
- // create many hoplogs files
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION,
- HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION};
- for (String string : extensions) {
- Hoplog oplog = organizer.getTmpSortedOplog(null, string);
- createHoplog(1, oplog);
- organizer.makeLegitimate(oplog);
- }
-
- // a organizer should start creating files starting at 6 as five files already existed
- organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
- createHoplog(1, oplog);
- organizer.makeLegitimate(oplog);
- assertEquals(6, HdfsSortedOplogOrganizer.getSequenceNumber(oplog));
- organizer.close();
- }
-
- /**
- * Tests temp file creation and making file legitimate
- */
- public void testMakeLegitimate() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- // create empty tmp hoplog
- Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- createHoplog(0, oplog);
-
- Path hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName());
- FileSystem fs = hdfsStore.getFileSystem();
- FileStatus hoplogStatus = fs.getFileStatus(hoplogPath);
- assertNotNull(hoplogStatus);
-
- organizer.makeLegitimate(oplog);
-
- try {
- hoplogStatus = fs.getFileStatus(hoplogPath);
- assertNull(hoplogStatus);
- } catch (FileNotFoundException e) {
- // tmp file is renamed hence should not exist, exception expected
- }
-
- assertTrue(oplog.getFileName().endsWith(HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION));
- hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName());
- hoplogStatus = fs.getFileStatus(hoplogPath);
- assertNotNull(hoplogStatus);
- }
-
- /**
- * Tests hoplog file name comparator
- */
- public void testHoplogFileComparator() throws IOException {
- String name1 = "bucket1-10-3.hop";
- String name2 = "bucket1-1-20.hop";
- String name3 = "bucket1-30-201.hop";
- String name4 = "bucket1-100-201.hop";
-
- TreeSet<TrackedReference<Hoplog>> list = new TreeSet<TrackedReference<Hoplog>>(new HoplogComparator());
- // insert soplog is the list out of expected order
- hdfsStore.getFileSystem();
- list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name2), blockCache, stats, storeStats)));
- list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name4), blockCache, stats, storeStats)));
- list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name1), blockCache, stats, storeStats)));
- list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name3), blockCache, stats, storeStats)));
-
- Iterator<TrackedReference<Hoplog>> iter = list.iterator();
- assertEquals(name4, iter.next().get().getFileName());
- assertEquals(name3, iter.next().get().getFileName());
- assertEquals(name2, iter.next().get().getFileName());
- assertEquals(name1, iter.next().get().getFileName());
- }
-
- /**
- * Tests clear on a set of hoplogs.
- */
- public void testClear() throws Exception {
- int bucketId = (int) System.nanoTime();
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
-
- // flush and create hoplog
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("1"), ("1-1")));
- items.add(new TestEvent(("4"), ("1-4")));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent(("1"), ("2-1")));
- items.add(new TestEvent(("3"), ("2-3")));
- organizer.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent(("3"), ("3-3")));
- items.add(new TestEvent(("5"), ("3-5")));
- organizer.flush(items.iterator(), items.size());
-
- // check file existence in bucket directory
- FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-
- // expect 3 files are 3 flushes
- assertEquals(3, hoplogs.length);
-
- organizer.clear();
-
- // check that all files are now expired
- hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- FileStatus[] exs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, exs);
- assertEquals(Collections.EMPTY_LIST, Arrays.asList(valids));
-
- assertEquals(0, stats.getActiveFileCount());
- assertEquals(0, stats.getInactiveFileCount());
- }
-
- public void testFixedIntervalMajorCompaction() throws Exception {
- final AtomicInteger majorCReqCount = new AtomicInteger(0);
-
- final Compactor compactor = new AbstractCompactor() {
- @Override
- public boolean compact(boolean isMajor, boolean isForced) throws IOException {
- majorCReqCount.incrementAndGet();
- return true;
- }
- };
-
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
- @Override
- public synchronized Compactor getCompactor() {
- return compactor;
- }
- };
-
- regionManager.addOrganizer(0, organizer);
-
- System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "1");
- HDFSRegionDirector.resetJanitor();
-
- alterMajorCompaction(hdfsStore, true);
-
- // create hoplog in the past, 90 seconds before current time
- organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000));
- TimeUnit.MILLISECONDS.sleep(50);
- organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000));
-
- List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
- assertEquals(2, hoplogs.size());
-
- for (int i = 0; i < 3; i++) {
- TimeUnit.SECONDS.sleep(1);
- assertEquals(0, majorCReqCount.get());
- }
- HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
- mutator.setMajorCompactionInterval(1);
- hdfsStore.alter(mutator);
- TimeUnit.SECONDS.sleep(5);
- assertTrue(3 < majorCReqCount.get());
- }
-
-
- public void testCorruptHfileBucketFail() throws Exception {
- // create a corrupt file
- FileSystem fs = hdfsStore.getFileSystem();
- for (int i = 0; i < 113; i++) {
- FSDataOutputStream opStream = fs.create(new Path(testDataDir.getName() + "/region-1/" + i + "/1-1-1.hop"));
- opStream.writeBytes("Some random corrupt file");
- opStream.close();
- }
-
- // create region with store
-// regionfactory.setHDFSStoreName(HDFS_STORE_NAME);
- Region<Object, Object> region1 = regionfactory.create("region-1");
- ExpectedException ex = DistributedTestCase.addExpectedException("CorruptHFileException");
- try {
- region1.get("key");
- fail("get should have failed with corrupt file error");
- } catch (HDFSIOException e) {
- // expected
- } finally {
- ex.remove();
- }
-
- region1.destroyRegion();
- }
-
- public void testMaxOpenReaders() throws Exception {
- System.setProperty("hoplog.bucket.max.open.files", "5");
- HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 10; i++) {
- items.clear();
- items.add(new TestEvent("" + i, "" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
- List<TrackedReference<Hoplog>> hoplogs = bucket.getSortedOplogs();
- int closedCount = 0 ;
- for (TrackedReference<Hoplog> hoplog : hoplogs) {
- HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get();
- if (hfile.isClosed()) {
- closedCount++;
- }
- }
- assertEquals(10, closedCount);
- assertEquals(10, stats.getActiveFileCount());
- assertEquals(0, stats.getActiveReaderCount());
-
- byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
- organizer.read(keyBytes1).getValue();
-
- closedCount = 0 ;
- for (TrackedReference<Hoplog> hoplog : hoplogs) {
- HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get();
- if (hfile.isClosed()) {
- closedCount++;
- }
- }
- assertEquals(5, closedCount);
- assertEquals(10, stats.getActiveFileCount());
- assertEquals(0, stats.getInactiveFileCount());
- assertEquals(5, stats.getActiveReaderCount());
-
- organizer.getCompactor().compact(false, false);
- assertEquals(1, stats.getActiveFileCount());
- assertEquals(0, stats.getActiveReaderCount());
- assertEquals(0, stats.getInactiveFileCount());
- }
-
- public void testConcurrentReadInactiveClose() throws Exception {
- final HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
- alterMinorCompaction(hdfsStore, true);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- for (int i = 0; i < 4; i++) {
- items.clear();
- items.add(new TestEvent("" + i, "" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- final byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
- class ReadTask implements Runnable {
- public void run() {
- try {
- organizer.read(keyBytes1);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- ScheduledExecutorService[] readers = new ScheduledExecutorService[10];
- for (int i = 0; i < readers.length; i++) {
- readers[i] = Executors.newSingleThreadScheduledExecutor();
- readers[i].scheduleWithFixedDelay(new ReadTask(), 0, 1, TimeUnit.MILLISECONDS);
- }
-
- for (int i = 0; i < 100; i++) {
- items.clear();
- items.add(new TestEvent("" + i, "" + i));
- organizer.flush(items.iterator(), items.size());
- }
-
- for (int i = 0; i < readers.length; i++) {
- readers[i].shutdown();
- readers[i].awaitTermination(1, TimeUnit.SECONDS);
- TimeUnit.MILLISECONDS.sleep(50);
- }
-
- for (int i = 0; i < 20; i++) {
- if (stats.getActiveFileCount() < 4) {
- break;
- }
- organizer.getCompactor().compact(false, false);
- }
-
- organizer.performMaintenance();
- TimeUnit.SECONDS.sleep(1);
-
- assertTrue("" + stats.getActiveFileCount(), stats.getActiveFileCount() <= 4);
- assertEquals(stats.getActiveReaderCount(), stats.getActiveReaderCount());
- assertEquals(0, stats.getInactiveFileCount());
- }
-
- public void testEmptyBucketCleanup() throws Exception {
- HdfsSortedOplogOrganizer o = new HdfsSortedOplogOrganizer(regionManager, 0);
- long target = System.currentTimeMillis();
- o.getOptimizationTargets(target);
- // making sure empty bucket is not causing IO errors. no assertion needed
- // for this test case.
- }
-
- public void testExpiredFilterAtStartup() throws Exception {
- HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("1"), ("1-1")));
- items.add(new TestEvent(("4"), ("1-4")));
- bucket.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent(("1"), ("2-1")));
- items.add(new TestEvent(("3"), ("2-3")));
- bucket.flush(items.iterator(), items.size());
-
- FileStatus[] files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(2, files.length);
-
- files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(0, files.length);
-
- HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
- List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs();
- assertEquals(2, hoplogs.size());
-
- bucket.clear();
-
- files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(2, files.length);
-
- files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(2, files.length);
-
- bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
- hoplogs = bucket2.getSortedOplogs();
- assertEquals(0, hoplogs.size());
-
- items.clear();
- items.add(new TestEvent(("1"), ("2-1")));
- items.add(new TestEvent(("3"), ("2-3")));
- bucket.flush(items.iterator(), items.size());
-
- bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
- hoplogs = bucket2.getSortedOplogs();
- assertEquals(1, hoplogs.size());
- bucket.close();
- bucket2.close();
- }
-
- public void testExpireFilterRetartAfterClear() throws Exception {
- HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("1"), ("1-1")));
- items.add(new TestEvent(("4"), ("1-4")));
- bucket.flush(items.iterator(), items.size());
-
- items.clear();
- items.add(new TestEvent(("1"), ("2-1")));
- items.add(new TestEvent(("3"), ("2-3")));
- bucket.flush(items.iterator(), items.size());
-
- FileStatus[] files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(2, files.length);
-
- files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(0, files.length);
-
- HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
- List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs();
- assertEquals(2, hoplogs.size());
-
- bucket.clear();
-
- files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(2, files.length);
-
- files = getBucketHoplogs(getName() + "/" + 0,
- HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(2, files.length);
-
- bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
- hoplogs = bucket2.getSortedOplogs();
- assertEquals(0, hoplogs.size());
- bucket.close();
- bucket2.close();
- }
-
- /**
- * tests maintenance does not fail even if there are no hoplogs
- */
- public void testNoFileJanitor() throws Exception {
- HoplogOrganizer<? extends PersistedEventImpl> organizer;
- organizer = regionManager.create(0);
- organizer.performMaintenance();
- }
-
- public void testValidHoplogRegex() {
- String[] valid = {"1-1-1.hop", "1-1-1.ihop", "1-1-1.chop"};
- String[] invalid = {"1-1-1.khop", "1-1-1.hop.tmphop", "1-1-1.hop.ehop", "1-1-.hop", "-1-1.hop"};
-
- for (String string : valid) {
- Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string);
- assertTrue(matcher.matches());
- }
-
- for (String string : invalid) {
- Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string);
- assertFalse(matcher.matches());
- }
- }
-
- public void testOneHoplogMajorCompaction() throws Exception {
- HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
- alterMajorCompaction(hdfsStore, true);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("1"), ("1-1")));
- organizer.flush(items.iterator(),items.size());
-
-
- FileStatus[] files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
- assertEquals(1, files.length);
-
- //Minor compaction will not perform on 1 .hop file
- organizer.getCompactor().compact(false, false);
- files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
- assertEquals(0, files.length);
-
- //Major compaction will perform on 1 .hop file
- organizer.getCompactor().compact(true, false);
- files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
- assertEquals(1, files.length);
- String hoplogName =files[0].getPath().getName();
- files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
- assertEquals(0, files.length);
-
- organizer.getCompactor().compact(true, false);
- files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
- assertEquals(1, files.length);
- assertEquals(hoplogName, files[0].getPath().getName());
-
- //Minor compaction does not convert major compacted file
- organizer.getCompactor().compact(false, false);
- files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
- assertEquals(0, files.length);
-
- files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
- assertEquals(1, files.length);
- assertEquals(hoplogName, files[0].getPath().getName());
-
- files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
- assertEquals(1, files.length);
- assertNotSame(hoplogName + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION, files[0].getPath().getName() );
- }
-
- public void testExposeCleanupInterval() throws Exception {
- FileSystem fs = hdfsStore.getFileSystem();
- Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
- assertTrue(fs.exists(cleanUpIntervalPath));
- long interval = HDFSStore.DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS
- *60 * 1000;
- assertEquals(interval, HoplogUtil.readCleanUpIntervalMillis(fs,cleanUpIntervalPath));
- }
-
- @Override
- protected void setUp() throws Exception {
- System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "" + HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
- super.setUp();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
deleted file mode 100644
index 7420437..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest
-;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
-import org.junit.experimental.categories.Category;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HfileSortedOplogJUnitTest extends BaseHoplogTestCase {
- ArrayList<Object> toBeCleaned = new ArrayList<>();
-
- /**
- * Tests hoplog creation using a writer. If this test fails, all the tests wills fail as hoplog
- * creation is the first step
- */
- public void testHoplogWriter() throws Exception {
- String hoplogName = getRandomHoplogName();
- createHoplog(hoplogName, 1);
- FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName));
- assertNotNull(hoplogStatus);
- }
-
- /**
- * Tests hoplog deletion.
- */
- public void testDeletion() throws Exception {
- String hoplogName = getRandomHoplogName();
- createHoplog(hoplogName, 1);
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
-
- testHoplog.delete();
-
- try {
- FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName));
- // hoplog should not exists. fail if it does
- assertNull("File deletion failed", hoplogStatus);
- } catch (FileNotFoundException e) {
- // exception expected after deletion
- }
- }
-
- /**
- * Tests hoplog reader creation and key based gets
- */
- public void testHoplogReader() throws Exception {
- String hop1 = getRandomHoplogName();
- Map<String, String> map = createHoplog(hop1, 10);
-
- HFileSortedOplog testHoplog1 = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hop1), blockCache, stats, storeStats);
- HoplogReader reader = testHoplog1.getReader();
- // verify that each entry put in the hoplog is returned by reader
- for (Entry<String, String> entry : map.entrySet()) {
- byte[] value = reader.read(entry.getKey().getBytes());
- assertNotNull(value);
- }
- }
-
- /**
- * Tests full iteration on a hoplog. Ensures all inserted keys are returned and no key is missing
- */
- public void testIterator() throws IOException {
- int count = 10;
- ByteArrayComparator bac = new ByteArrayComparator();
-
- String hoplogName = getRandomHoplogName();
- TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
-
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
- HoplogReader reader = testHoplog.getReader();
-
- Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
- HoplogIterator<byte[], byte[]> iter = reader.scan();
- for (; iter.hasNext();) {
- byte[] key = iter.next();
- Entry<String, String> entry = mapIter.next();
- assertEquals(0, bac.compare(key, iter.getKey()));
- assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
- assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
- count--;
- }
- assertEquals(0, count);
- }
-
- /**
- * Tests hoplog iterator. after returning first key, has next should return false and all
- * subsequent next calls should return null
- */
- public void testSingleKVIterator() throws Exception {
- String hoplogName = getRandomHoplogName();
- TreeMap<String, String> map = createHoplog(hoplogName, 1);
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
- HoplogReader reader = testHoplog.getReader();
-
- HoplogIterator<byte[], byte[]> iter = reader.scan();
- assertNull(iter.getKey());
- assertNull(iter.getValue());
- assertTrue(iter.hasNext());
- assertNull(iter.getKey());
- assertNull(iter.getValue());
-
- Entry<String, String> entry = map.firstEntry();
- iter.next();
- assertNotNull(iter.getKey());
- assertEquals(entry.getKey(), new String(iter.getKey()));
- assertNotNull(iter.getValue());
- assertEquals(entry.getValue(), new String(iter.getValue()));
-
- assertFalse(iter.hasNext());
- try {
- iter.next();
- fail();
- } catch (NoSuchElementException e) {
- }
- }
-
- /**
- * Tests iteration on a hoplog with no keys, using a scanner. Scanner should not return any value
- * and hasNext should return false everytime
- */
- public void testEmptyFileIterator() throws Exception {
- String hoplogName = getRandomHoplogName();
- createHoplog(hoplogName, 0);
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
- HoplogReader reader = testHoplog.getReader();
- HoplogIterator<byte[], byte[]> iter = reader.scan();
- assertNull(iter.getKey());
- assertNull(iter.getValue());
- assertFalse(iter.hasNext());
- assertNull(iter.getKey());
- assertNull(iter.getValue());
- try {
- iter.next();
- fail();
- } catch (NoSuchElementException e) {
- }
- }
-
- /**
- * Tests from exclusive iterator
- */
- public void testFromExclusiveIterator() throws Exception {
- fromIterator(false);
- }
-
- /**
- * Tests from inclusive iterator
- */
- public void testFromInclusiveIterator() throws Exception {
- fromIterator(true);
- }
-
- /**
- * Tests from condition based iteration. creates hoplog with 10 KVs. Creates a scanner starting at
- * a middle key and verifies the count of KVs iterated on
- */
- public void fromIterator(boolean includeFrom) throws Exception {
- int count = 10;
- ByteArrayComparator bac = new ByteArrayComparator();
-
- String hoplogName = getRandomHoplogName();
- // sorted map contains the keys inserted in the hoplog for testing
- TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
-
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
- HoplogReader reader = testHoplog.getReader();
-
- int middleKey = 4;
- // remove top keys from the sorted map as the hoplog scanner should not
- // return those
- Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
- for (int i = 0; i < middleKey; i++) {
- mapIter.next();
- count--;
- }
- if (!includeFrom) {
- mapIter.next();
- count--;
- }
-
- // keys are like Key-X, for X=0 till X=9. Start iterator at fifth key,
- // key-4. if excluding from key, start at sixth key, key-5.
- HoplogIterator<byte[], byte[]> iter = reader.scan(("key-" + middleKey).getBytes(), includeFrom,
- null, true);
-
- for (; iter.hasNext();) {
- byte[] key = iter.next();
- Entry<String, String> entry = mapIter.next();
- // make sure the KV returned by iterator match the inserted KV
- assertEquals(0, bac.compare(key, iter.getKey()));
- assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
- assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
- count--;
- }
- assertEquals(0, count);
- }
-
- /**
- * Tests to exclusive iterator
- */
- public void testToExclusiveIterator() throws Exception {
- toIterator(false);
- }
-
- /**
- * Tests to inclusive iterator
- */
- public void testToInclusiveIterator() throws Exception {
- toIterator(true);
- }
-
- /**
- * Tests to condition based iteration. creates hoplog with 10 KVs. Creates a scanner ending at
- * a middle key and verifies the count of KVs iterated on
- */
- public void toIterator(boolean includeTo) throws Exception {
- int count = 10;
- ByteArrayComparator bac = new ByteArrayComparator();
-
- String hoplogName = getRandomHoplogName();
- // sorted map contains the keys inserted in the hoplog for testing
- TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
- Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
-
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
- HoplogReader reader = testHoplog.getReader();
-
- int middleKey = 4;
- // keys are like Key-X, for X=0 till X=9. End iterator at fifth key,
- // key-4. if excluding to key, end at fourth key, key-3.
- HoplogIterator<byte[], byte[]> iter = reader.scan(null, true, ("key-" + middleKey).getBytes(), includeTo);
-
- for (; iter.hasNext();) {
- byte[] key = iter.next();
- Entry<String, String> entry = mapIter.next();
- // make sure the KV returned by iterator match the inserted KV
- assertEquals(0, bac.compare(key, iter.getKey()));
- assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
- assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
-
- count --;
- }
-
- if (includeTo) {
- count++;
- }
-
- assertEquals(10, count + middleKey);
- }
-
- /**
- * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled
- */
- public void testFromToIterator() throws IOException {
- ByteArrayComparator bac = new ByteArrayComparator();
- String hoplogName = getRandomHoplogName();
- HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
-
- int count = 5;
- HoplogWriter writer = hoplog.createWriter(5);
- for (int i = 0; i < count; i++) {
- String value = "value-" + (i * 2);
- // even keys key-[0 2 4 6 8]
- writer.append(("key-" + (i * 2)).getBytes(), value.getBytes());
- }
- writer.close();
-
- HoplogReader reader = hoplog.getReader();
- HoplogIterator<byte[], byte[]> iter = reader.scan("key-1".getBytes(), true, "key-7".getBytes(), true);
-
- for (int i = 2; i < 7; i += 2) {
- assertTrue(iter.hasNext());
- iter.next();
- assertEquals(0, bac.compare(("key-" + i).getBytes(), iter.getKey()));
- assertEquals(0, bac.compare(("value-" + i).getBytes(), iter.getValue()));
- System.out.println(new String(iter.getKey()));
- }
- assertFalse(iter.hasNext());
- }
-
- /**
- * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled
- */
- public void testDuplicateKeys() throws IOException {
- String hoplogName = getRandomHoplogName();
- HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
-
- // write duplicate keys
- int count = 2;
- HoplogWriter writer = hoplog.createWriter(2);
- List<String> values = new ArrayList<String>();
- for(int i = 1; i <= count; i++) {
- String value = "value" + i;
- writer.append("key-1".getBytes(), value.getBytes());
- values.add(value);
- }
- writer.close();
-
- HoplogReader reader = hoplog.getReader();
- HoplogIterator<byte[], byte[]> scanner = reader.scan();
- for (byte[] key = null; scanner.hasNext();) {
- key = scanner.next();
- count--;
- assertEquals(0, Bytes.compareTo(key, "key-1".getBytes()));
- values.remove(new String(scanner.getValue()));
- }
- assertEquals(0, count);
- assertEquals(0, values.size());
- }
-
- public void testOffsetBasedScan() throws Exception {
- // Each record is 43 bytes. each block is 256 bytes. each block will have 6
- // records
-
- int blocksize = 1 << 8;
- System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
- String.valueOf(blocksize));
-
- int count = 50;
- String hoplogName = getRandomHoplogName();
- createHoplog(hoplogName, count);
-
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
- testDataDir, hoplogName), blockCache, stats, storeStats);
-
- HoplogReader reader = testHoplog.getReader();
-
- HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 1, blocksize * 2);
- int range1Count = 0;
- String range1EndKey = null;
- for (byte[] key = null; scanner.hasNext();) {
- key = scanner.next();
- range1Count++;
- range1EndKey = new String(key);
- }
- int range1EndKeyNum = Integer.valueOf(range1EndKey.substring("Key-".length()));
-
- scanner = reader.scan(blocksize * 2, blocksize * 1);
- int range2Count = 0;
- String range2EndKey = null;
- for (byte[] key = null; scanner.hasNext();) {
- key = scanner.next();
- range2Count++;
- range2EndKey = new String(key);
- }
-
- assertEquals(range2EndKey, range1EndKey);
- assertEquals(2, range1Count/range2Count);
-
- scanner = reader.scan(blocksize * 3, blocksize * 1);
- String range3FirstKey = new String(scanner.next());
-
- int range3FirstKeyNum = Integer.valueOf(range3FirstKey.substring("Key-"
- .length()));
-
- // range 3 starts at the end of range 1. so the two keys must be consecutive
- assertEquals(range1EndKeyNum + 1, range3FirstKeyNum);
-
- testHoplog.close();
- }
-
- public void testOffsetScanBeyondFileSize() throws Exception {
- // Each record is 43 bytes. each block is 256 bytes. each block will have 6
- // records
-
- int blocksize = 1 << 8;
- System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
- String.valueOf(blocksize));
-
- int count = 20;
- String hoplogName = getRandomHoplogName();
- createHoplog(hoplogName, count);
-
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
- testDataDir, hoplogName), blockCache, stats, storeStats);
-
- HoplogReader reader = testHoplog.getReader();
-
- HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 5, blocksize * 2);
- assertFalse(scanner.hasNext());
-
- testHoplog.close();
- }
-
- public void testZeroValueOffsetScan() throws Exception {
- // Each record is 43 bytes. each block is 256 bytes. each block will have 6
- // records
-
- int blocksize = 1 << 8;
- System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
- String.valueOf(blocksize));
-
- int count = 20;
- String hoplogName = getRandomHoplogName();
- createHoplog(hoplogName, count);
-
- HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
- testDataDir, hoplogName), blockCache, stats, storeStats);
-
- HoplogReader reader = testHoplog.getReader();
-
- HoplogIterator<byte[], byte[]> scanner = reader.scan(0, blocksize * 2);
- assertTrue(scanner.hasNext());
- int keyNum = Integer.valueOf(new String(scanner.next()).substring("Key-"
- .length()));
- assertEquals(100000, keyNum);
-
- testHoplog.close();
- }
-
- /*
- * Tests reader succeeds to read data even if FS client is recycled without
- * this reader knowing
- */
- public void testReaderDetectAndUseRecycledFs() throws Exception {
- HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory();
- HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1");
- toBeCleaned.add(store);
-
- HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats);
- toBeCleaned.add(hop);
- TreeMap<String, String> map = createHoplog(10, hop);
-
- HoplogReader reader = hop.getReader();
- // verify that each entry put in the hoplog is returned by reader
- for (Entry<String, String> entry : map.entrySet()) {
- byte[] value = reader.read(entry.getKey().getBytes());
- assertNotNull(value);
- }
-
- cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>");
- try {
- store.getFileSystem().close();
- store.checkAndClearFileSystem();
-
- for (Entry<String, String> entry : map.entrySet()) {
- reader = hop.getReader();
- byte[] value = reader.read(entry.getKey().getBytes());
- assertNotNull(value);
- }
- } finally {
- cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>");
- }
- }
-
- public void testNewScannerDetechAndUseRecycledFs() throws Exception {
- HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory();
- HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1");
- toBeCleaned.add(store);
-
- HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats);
- createHoplog(10, hop);
-
- HoplogIterator<byte[], byte[]> scanner = hop.getReader().scan();
- // verify that each entry put in the hoplog is returned by reader
- int i = 0;
- while (scanner.hasNext()) {
- byte[] key = scanner.next();
- assertNotNull(key);
- i++;
- }
- assertEquals(10, i);
- // flush block cache
- hop.close(true);
- hop.delete();
-
- hop = new HFileSortedOplog(store, new Path(getName()+"-1-1.hop"), blockCache, stats, storeStats);
- createHoplog(10, hop);
- toBeCleaned.add(hop);
- hop.getReader();
-
- cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>");
- try {
- store.getFileSystem().close();
- store.checkAndClearFileSystem();
-
- scanner = hop.getReader().scan();
- // verify that each entry put in the hoplog is returned by reader
- i = 0;
- while (scanner.hasNext()) {
- byte[] key = scanner.next();
- assertNotNull(key);
- i++;
- }
- assertEquals(10, i);
- } finally {
- cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>");
- }
- }
-
- @Override
- protected void tearDown() throws Exception {
- for (Object obj : toBeCleaned) {
- try {
- if (HDFSStoreImpl.class.isInstance(obj)) {
- ((HDFSStoreImpl) obj).clearFolder();
- } else if (AbstractHoplog.class.isInstance(obj)) {
- ((AbstractHoplog) obj).close();
- ((AbstractHoplog) obj).delete();
- }
- } catch (Exception e) {
- System.out.println(e);
- }
- }
- super.tearDown();
- }
-
- private TreeMap<String, String> createHoplog(String hoplogName, int numKeys) throws IOException {
- HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
- TreeMap<String, String> map = createHoplog(numKeys, hoplog);
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
deleted file mode 100644
index 13aa6a9..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest
-;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class SortedOplogListIterJUnitTest extends BaseHoplogTestCase {
- public void testOneIterOneKey() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("0"), ("0")));
- organizer.flush(items.iterator(), items.size());
-
- List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
- HoplogSetIterator iter = new HoplogSetIterator(oplogs);
- assertTrue(iter.hasNext());
- int count = 0;
- for (ByteBuffer keyBB = null; iter.hasNext();) {
- keyBB = iter.next();
- byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
- assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
- count++;
- }
- assertEquals(1, count);
- organizer.close();
- }
-
- public void testOneIterDuplicateKey() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("0"), ("V2")));
- items.add(new TestEvent(("0"), ("V1")));
- items.add(new TestEvent(("1"), ("V2")));
- items.add(new TestEvent(("1"), ("V1")));
- organizer.flush(items.iterator(), items.size());
-
- List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
- HoplogSetIterator iter = new HoplogSetIterator(oplogs);
- assertTrue(iter.hasNext());
- int count = 0;
- for (ByteBuffer keyBB = null; iter.hasNext();) {
- keyBB = iter.next();
- byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
- byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
- assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
- assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
- count++;
- }
- assertEquals(2, count);
- organizer.close();
- }
-
- public void testTwoIterSameKey() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("0"), ("V1")));
- organizer.flush(items.iterator(), items.size());
- items.clear();
- items.add(new TestEvent(("0"), ("V2")));
- organizer.flush(items.iterator(), items.size());
-
- List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
- HoplogSetIterator iter = new HoplogSetIterator(oplogs);
- assertTrue(iter.hasNext());
- int count = 0;
- for (ByteBuffer keyBB = null; iter.hasNext();) {
- keyBB = iter.next();
- byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
- byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
- assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
- assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
- count++;
- }
- assertEquals(1, count);
- organizer.close();
- }
-
- public void testTwoIterDiffKey() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("0"), ("V1")));
- organizer.flush(items.iterator(), items.size());
- items.clear();
- items.add(new TestEvent(("1"), ("V1")));
- organizer.flush(items.iterator(), items.size());
-
- List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
- HoplogSetIterator iter = new HoplogSetIterator(oplogs);
- assertTrue(iter.hasNext());
- int count = 0;
- for (ByteBuffer keyBB = null; iter.hasNext();) {
- keyBB = iter.next();
- byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
- byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
- assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
- assertEquals("V1", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
- count++;
- }
- assertEquals(2, count);
- organizer.close();
- }
-
- public void testMergedIterator() throws Exception {
- HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-
- // #1
- ArrayList<TestEvent> items = new ArrayList<TestEvent>();
- items.add(new TestEvent(("1"), ("1")));
- items.add(new TestEvent(("2"), ("1")));
- items.add(new TestEvent(("3"), ("1")));
- items.add(new TestEvent(("4"), ("1")));
- organizer.flush(items.iterator(), items.size());
-
- // #2
- items.clear();
- items.add(new TestEvent(("2"), ("1")));
- items.add(new TestEvent(("4"), ("1")));
- items.add(new TestEvent(("6"), ("1")));
- items.add(new TestEvent(("8"), ("1")));
- organizer.flush(items.iterator(), items.size());
-
- // #3
- items.clear();
- items.add(new TestEvent(("1"), ("1")));
- items.add(new TestEvent(("3"), ("1")));
- items.add(new TestEvent(("5"), ("1")));
- items.add(new TestEvent(("7"), ("1")));
- items.add(new TestEvent(("9"), ("1")));
- organizer.flush(items.iterator(), items.size());
-
- // #4
- items.clear();
- items.add(new TestEvent(("0"), ("1")));
- items.add(new TestEvent(("1"), ("1")));
- items.add(new TestEvent(("4"), ("1")));
- items.add(new TestEvent(("5"), ("1")));
- organizer.flush(items.iterator(), items.size());
-
- List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
- HoplogSetIterator iter = new HoplogSetIterator(oplogs);
- // the iteration pattern for this test should be 0-9:
- // 0 1 4 5 oplog #4
- // 1 3 5 7 9 oplog #3
- // 2 4 6 8 oplog #2
- // 1 2 3 4 oplog #1
- int count = 0;
- for (ByteBuffer keyBB = null; iter.hasNext();) {
- keyBB = iter.next();
- byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
- assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
- count++;
- }
- assertEquals(10, count);
- organizer.close();
- }
-}