You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:51 UTC
[05/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
new file mode 100644
index 0000000..8746a0b
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizerJUnitTest.java
@@ -0,0 +1,1044 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.TieredCompactionJUnitTest.TestHoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HdfsSortedOplogOrganizerJUnitTest extends BaseHoplogTestCase {
+ /**
+ * Tests flush operation
+ */
+ public void testFlush() throws Exception {
+ int count = 10;
+ int bucketId = (int) System.nanoTime();
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+ // flush and create hoplog
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < count; i++) {
+ items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+ }
+ organizer.flush(items.iterator(), count);
+
+ // check file existence in bucket directory
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ // only one hoplog should exists
+ assertEquals(1, hoplogs.length);
+
+ assertEquals(count, organizer.sizeEstimate());
+ assertEquals(0, stats.getActiveReaderCount());
+ }
+
+ /**
+ * Tests reads from a set of hoplogs containing both valid and stale KVs
+ */
+ public void testReopen() throws Exception {
+ int bucketId = (int) System.nanoTime();
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+ // flush and create hoplog
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 100; i++) {
+ items.add(new TestEvent("" + i, ("1-1")));
+ }
+ organizer.flush(items.iterator(), items.size());
+
+ Hoplog hoplog = organizer.getSortedOplogs().iterator().next().get();
+ byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+ hoplog.close();
+
+ for (int i = 0; i < 10; i++) {
+ Path path = new Path(testDataDir, getName() + "/" + bucketId + "/" + hoplog.getFileName());
+ HFileSortedOplog oplog = new HFileSortedOplog(hdfsStore, path, blockCache, stats, storeStats);
+ oplog.getReader().read(keyBytes1);
+ oplog.close(false);
+ }
+ }
+
+ /**
+ * Tests reads from a set of hoplogs containing both valid and stale KVs
+ */
+ public void testRead() throws Exception {
+ doRead(regionManager);
+ }
+
+// public void testNewReaderWithNameNodeHA() throws Exception {
+// deleteMiniClusterDir();
+// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
+// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
+//
+// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
+// initClientHAConf(nn1port, nn2port);
+//
+// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
+// regionfactory.setHDFSStoreName(store1.getName());
+// Region<Object, Object> region1 = regionfactory.create("region-1");
+// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
+//
+// HoplogOrganizer<SortedHoplogPersistedEvent> organizer = doRead(regionManager1);
+// organizer.close();
+//
+// dunit.DistributedTestCase.IgnoredException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
+// NameNode nnode2 = cluster.getNameNode(1);
+// assertTrue(nnode2.isStandbyState());
+// cluster.shutdownNameNode(0);
+// cluster.transitionToActive(1);
+// assertFalse(nnode2.isStandbyState());
+//
+// organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
+// byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+// byte[] keyBytes3 = BlobHelper.serializeToBlob("3");
+// byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
+// assertEquals("2-1", organizer.read(keyBytes1).getValue());
+// assertEquals("3-3", organizer.read(keyBytes3).getValue());
+// assertEquals("1-4", organizer.read(keyBytes4).getValue());
+// ex.remove();
+//
+// region1.destroyRegion();
+// store1.destroy();
+// cluster.shutdown();
+// FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
+// }
+
+// public void testActiveReaderWithNameNodeHA() throws Exception {
+// deleteMiniClusterDir();
+// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
+// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
+//
+// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
+// initClientHAConf(nn1port, nn2port);
+//
+// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
+// regionfactory.setHDFSStoreName(store1.getName());
+// Region<Object, Object> region1 = regionfactory.create("region-1");
+// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
+//
+// HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
+// ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+// for (int i = 100000; i < 101000; i++) {
+// items.add(new TestEvent(("" + i), (i + " some string " + i)));
+// }
+// organizer.flush(items.iterator(), items.size());
+// organizer.getSortedOplogs().get(0).get().getReader();
+//
+// dunit.DistributedTestCase.IgnoredException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
+// NameNode nnode2 = cluster.getNameNode(1);
+// assertTrue(nnode2.isStandbyState());
+// cluster.shutdownNameNode(0);
+// cluster.transitionToActive(1);
+// assertFalse(nnode2.isStandbyState());
+//
+// for (int i = 100000; i < 100500; i++) {
+// byte[] keyBytes1 = BlobHelper.serializeToBlob("" + i);
+// assertEquals(i + " some string " + i, organizer.read(keyBytes1).getValue());
+// }
+// ex.remove();
+// region1.destroyRegion();
+// store1.destroy();
+// cluster.shutdown();
+// FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
+// }
+
+// public void testFlushWithNameNodeHA() throws Exception {
+// deleteMiniClusterDir();
+// int nn1port = AvailablePortHelper.getRandomAvailableTCPPort();
+// int nn2port = AvailablePortHelper.getRandomAvailableTCPPort();
+//
+// MiniDFSCluster cluster = initMiniHACluster(nn1port, nn2port);
+//
+// initClientHAConf(nn1port, nn2port);
+// HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
+//
+// regionfactory.setHDFSStoreName(store1.getName());
+// Region<Object, Object> region1 = regionfactory.create("region-1");
+// HdfsRegionManager regionManager1 = ((LocalRegion)region1).getHdfsRegionManager();
+//
+// HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(regionManager1, 0);
+// ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+// items.add(new TestEvent(("1"), ("1-1")));
+// organizer.flush(items.iterator(), items.size());
+//
+// dunit.DistributedTestCase.IgnoredException ex = DistributedTestCase.addExpectedException("java.io.EOFException");
+// NameNode nnode2 = cluster.getNameNode(1);
+// assertTrue(nnode2.isStandbyState());
+// cluster.shutdownNameNode(0);
+// cluster.transitionToActive(1);
+// assertFalse(nnode2.isStandbyState());
+//
+// items.add(new TestEvent(("4"), ("1-4")));
+// organizer.flush(items.iterator(), items.size());
+// byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+// byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
+// assertEquals("1-1", organizer.read(keyBytes1).getValue());
+// assertEquals("1-4", organizer.read(keyBytes4).getValue());
+// ex.remove();
+//
+// region1.destroyRegion();
+// store1.destroy();
+// cluster.shutdown();
+// FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
+// }
+
+ public HoplogOrganizer<SortedHoplogPersistedEvent> doRead(HdfsRegionManager rm) throws Exception {
+ HoplogOrganizer<SortedHoplogPersistedEvent> organizer = new HdfsSortedOplogOrganizer(rm, 0);
+
+ // flush and create hoplog
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1-1")));
+ items.add(new TestEvent(("4"), ("1-4")));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("1"), ("2-1")));
+ items.add(new TestEvent(("3"), ("2-3")));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("3"), ("3-3")));
+ items.add(new TestEvent(("5"), ("3-5")));
+ organizer.flush(items.iterator(), items.size());
+
+ // check file existence in bucket directory
+ FileStatus[] hoplogs = getBucketHoplogs(rm.getStore().getFileSystem(),
+ rm.getRegionFolder() + "/" + 0,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ // expect 3 files are 3 flushes
+ assertEquals(3, hoplogs.length);
+ byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+ byte[] keyBytes3 = BlobHelper.serializeToBlob("3");
+ byte[] keyBytes4 = BlobHelper.serializeToBlob("4");
+ // expect key 1 from hoplog 2
+ assertEquals("2-1", organizer.read(keyBytes1).getValue());
+ // expect key 3 from hoplog 3
+ assertEquals("3-3", organizer.read(keyBytes3).getValue());
+ // expect key 4 from hoplog 1
+ assertEquals("1-4", organizer.read(keyBytes4).getValue());
+ return organizer;
+ }
+
+ /**
+ * Tests bucket organizer initialization during startup. Existing hoplogs should identified and
+ * returned
+ */
+ public void testHoplogIdentification() throws Exception {
+ // create one empty file and one directories in bucket directory
+ Path bucketPath = new Path(testDataDir, getName() + "/0");
+ FileSystem fs = hdfsStore.getFileSystem();
+ fs.createNewFile(new Path(bucketPath, "temp_file"));
+ fs.mkdirs(new Path(bucketPath, "temp_dir"));
+
+ // create 2 hoplogs files each of type flush, minor and major hoplog
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION};
+ for (String string : extensions) {
+ Hoplog oplog = organizer.getTmpSortedOplog(null, string);
+ createHoplog(0, oplog);
+ organizer.makeLegitimate(oplog);
+ }
+
+ // create a temp hoplog
+ Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ createHoplog(0, oplog);
+
+ // bucket directory should have 6 hoplogs, 1 temp log, 1 misc file and 1 directory
+ FileStatus[] results = fs.listStatus(bucketPath);
+ assertEquals(9, results.length);
+
+ // only two are hoplogs
+ List<Hoplog> list = organizer.identifyAndLoadSortedOplogs(true);
+ assertEquals(6, list.size());
+ }
+
+ public void testExpiryMarkerIdentification() throws Exception {
+ // epxired hoplogs from the list below should be deleted
+ String[] files = {
+ "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+ "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION };
+
+ Path bucketPath = new Path(testDataDir, getName() + "/0");
+ FileSystem fs = hdfsStore.getFileSystem();
+ for (String file : files) {
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+ }
+
+ String marker1 = "0-4-1234"
+ + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+ + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
+ fs.createNewFile(new Path(bucketPath, marker1));
+ String marker2 = "0-5-1235"
+ + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+ + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION;
+ fs.createNewFile(new Path(bucketPath, marker2));
+
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+ assertEquals(7, hoplogs.length);
+
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+ regionManager, 0);
+
+ FileStatus[] markers = organizer.getExpiryMarkers();
+ // one hoplog and one exp marker will be deletion targets
+ assertEquals(2, markers.length);
+ for (FileStatus marker : markers) {
+ String name = marker.getPath().getName();
+ assertTrue(name.equals(marker1) || name.equals(marker2));
+ }
+ organizer.close();
+ }
+
+ public void testExpiredHoplogCleanup() throws Exception {
+ // epxired hoplogs from the list below should be deleted
+ String[] files = {
+ "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION
+ + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+
+ "0-2-0000" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+ "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+
+ "0-3-0000" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ "0-3-3333" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+ + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+
+ "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION };
+
+ Path bucketPath = new Path(testDataDir, getName() + "/0");
+ FileSystem fs = hdfsStore.getFileSystem();
+ for (String file : files) {
+ if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
+ fs.createNewFile(new Path(bucketPath, file));
+ continue;
+ }
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+ }
+
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+ assertEquals(9, hoplogs.length);
+
+ long target = System.currentTimeMillis();
+ TimeUnit.SECONDS.sleep(1);
+
+ // all but minor compacted files from below this will not be deleted as it
+ // is after target delete time
+ files = new String[] {
+ "0-4-4444" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION
+ + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+
+ "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+ + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION,
+ "0-5-5555" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+
+ "0-6-6666" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+ };
+ for (String file : files) {
+ if (file.endsWith(AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
+ fs.createNewFile(new Path(bucketPath, file));
+ continue;
+ }
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+ }
+
+ hoplogs = getBucketHoplogs(getName() + "/0", "");
+ assertEquals(13, hoplogs.length);
+ int hopSize = 0;
+ for (FileStatus file : hoplogs) {
+ if(file.getLen() > hopSize) {
+ hopSize = (int) file.getLen();
+ }
+ }
+
+ final AtomicInteger behavior = new AtomicInteger(0);
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+ @Override
+ protected FileStatus[] getExpiryMarkers() throws IOException {
+ if (behavior.get() == 1) {
+ ArrayList<FileStatus> markers = new ArrayList<FileStatus>();
+ for (FileStatus marker : super.getExpiryMarkers()) {
+ markers.add(marker);
+ }
+ // inject a dummy old expiry marker for major compacted file
+ long age = 2 * HDFSStore.DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS * 60 * 1000;
+ String markerName = "0-2-2222" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
+ FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
+ markers.add(marker);
+ return markers.toArray(new FileStatus[markers.size()]);
+ }
+ return super.getExpiryMarkers();
+ }
+ };
+
+ List<FileStatus> list = organizer.getOptimizationTargets(target);
+ assertEquals(6, list.size());
+
+ behavior.set(1);
+ list = organizer.getOptimizationTargets(target);
+ assertEquals(8, list.size());
+
+ assertEquals(9 * hopSize, stats.getStoreUsageBytes());
+ int count = organizer.deleteExpiredFiles(list);
+ assertEquals(8, count);
+ assertEquals(5 * hopSize, stats.getStoreUsageBytes());
+
+ List<FileStatus> tmp = new ArrayList<FileStatus>(Arrays.asList(hoplogs));
+ for (Iterator<FileStatus> iter = tmp.iterator(); iter.hasNext();) {
+ hoplogs = getBucketHoplogs(getName() + "/0", "");
+ FileStatus file = iter.next();
+ for (FileStatus hoplog : hoplogs) {
+ if(hoplog.getPath().getName().startsWith("0-5-5555")) {
+ fail("this file should have been deleted" + hoplog.getPath().getName());
+ }
+
+ if (hoplog.getPath().getName().equals(file.getPath().getName())) {
+ iter.remove();
+ break;
+ }
+ }
+ }
+
+ assertEquals(7, tmp.size());
+ organizer.close();
+ }
+
+ public void testAlterPurgeInterval() throws Exception {
+ // epxired hoplogs from the list below should be deleted
+ String[] files = {
+ "0-1-0000" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ "0-1-1111" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION };
+
+ Path bucketPath = new Path(testDataDir, getName() + "/0");
+ hdfsStore.getFileSystem();
+ for (String file : files) {
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+ }
+
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+ int hopSize = 0;
+ for (FileStatus file : hoplogs) {
+ if(file.getLen() > hopSize) {
+ hopSize = (int) file.getLen();
+ }
+ }
+
+ final AtomicInteger behavior = new AtomicInteger(0);
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+ @Override
+ protected FileStatus[] getExpiryMarkers() throws IOException {
+ if (behavior.get() == 1) {
+ ArrayList<FileStatus> markers = new ArrayList<FileStatus>();
+ // inject dummy old expiry markers
+ long age = 120 * 1000; // 120 seconds old
+ String markerName = "0-2-2222" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
+ FileStatus marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
+ markers.add(marker);
+ markerName = "0-4-4444" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION + EXPIRED_HOPLOG_EXTENSION;
+ marker = new FileStatus(0, false, 1, 1024, System.currentTimeMillis() - age, new Path(bucketPath, markerName));
+ markers.add(marker);
+ return markers.toArray(new FileStatus[markers.size()]);
+ }
+ return super.getExpiryMarkers();
+ }
+ };
+
+ behavior.set(1);
+ int count = organizer.initiateCleanup();
+ assertEquals(0, count);
+
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ mutator.setPurgeInterval(1);
+ hdfsStore.alter(mutator);
+ count = organizer.initiateCleanup();
+ assertEquals(4, count);
+ }
+
+ public void testInUseExpiredHoplogCleanup() throws Exception {
+ Path bucketPath = new Path(testDataDir, getName() + "/0");
+ FileSystem fs = hdfsStore.getFileSystem();
+
+ String[] files = new String[] {
+ "0-1-1231" + AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ "0-2-1232" + AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+ "0-3-1233" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ "0-4-1234" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ "0-5-1235" + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION };
+
+ for (String file : files) {
+ Hoplog oplog = new HFileSortedOplog(hdfsStore, new Path(bucketPath, file),
+ blockCache, stats, storeStats);
+ createHoplog(10, oplog);
+ }
+
+ final HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(
+ regionManager, 0);
+ List<TrackedReference<Hoplog>> hopRefs = organizer.getSortedOplogs();
+ assertEquals(files.length, hopRefs.size());
+
+ // this is expiry marker for one of the files that will be compacted below.
+ // While compaction is going on file deletion should not happen
+ files = new String[] { "0-5-1235"
+ + AbstractHoplogOrganizer.MINOR_HOPLOG_EXTENSION
+ + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION };
+
+ for (String file : files) {
+ fs.createNewFile(new Path(bucketPath, file));
+ }
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/0", "");
+ assertEquals(hopRefs.size() + files.length, hoplogs.length);
+
+ TimeUnit.MILLISECONDS.sleep(200);
+ long target = System.currentTimeMillis();
+ List<FileStatus> list = organizer.getOptimizationTargets(target);
+ assertEquals(2, list.size());
+
+ for (TrackedReference<Hoplog> ref : hopRefs) {
+ ref.increment("test");
+ }
+
+ fs.delete(new Path(bucketPath, files[0]), false);
+
+ TimeUnit.MILLISECONDS.sleep(50);
+ organizer.markSortedOplogForDeletion(hopRefs, false);
+
+ list = organizer.getOptimizationTargets(target);
+ assertEquals(0, list.size());
+ organizer.close();
+ }
+
+ /**
+ * Tests max sequence initialization when file already exists and server starts
+ */
+ public void testSeqInitialization() throws Exception {
+ // create many hoplogs files
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ String[] extensions = { HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION,
+ HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION};
+ for (String string : extensions) {
+ Hoplog oplog = organizer.getTmpSortedOplog(null, string);
+ createHoplog(1, oplog);
+ organizer.makeLegitimate(oplog);
+ }
+
+ // a organizer should start creating files starting at 6 as five files already existed
+ organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ createHoplog(1, oplog);
+ organizer.makeLegitimate(oplog);
+ assertEquals(6, HdfsSortedOplogOrganizer.getSequenceNumber(oplog));
+ organizer.close();
+ }
+
+ /**
+ * Tests temp file creation and making file legitimate
+ */
+ public void testMakeLegitimate() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ // create empty tmp hoplog
+ Hoplog oplog = organizer.getTmpSortedOplog(null, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ createHoplog(0, oplog);
+
+ Path hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName());
+ FileSystem fs = hdfsStore.getFileSystem();
+ FileStatus hoplogStatus = fs.getFileStatus(hoplogPath);
+ assertNotNull(hoplogStatus);
+
+ organizer.makeLegitimate(oplog);
+
+ try {
+ hoplogStatus = fs.getFileStatus(hoplogPath);
+ assertNull(hoplogStatus);
+ } catch (FileNotFoundException e) {
+ // tmp file is renamed hence should not exist, exception expected
+ }
+
+ assertTrue(oplog.getFileName().endsWith(HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION));
+ hoplogPath = new Path(testDataDir, getName() + "/0/" + oplog.getFileName());
+ hoplogStatus = fs.getFileStatus(hoplogPath);
+ assertNotNull(hoplogStatus);
+ }
+
+ /**
+ * Tests hoplog file name comparator
+ */
+ public void testHoplogFileComparator() throws IOException {
+ String name1 = "bucket1-10-3.hop";
+ String name2 = "bucket1-1-20.hop";
+ String name3 = "bucket1-30-201.hop";
+ String name4 = "bucket1-100-201.hop";
+
+ TreeSet<TrackedReference<Hoplog>> list = new TreeSet<TrackedReference<Hoplog>>(new HoplogComparator());
+ // insert soplog is the list out of expected order
+ hdfsStore.getFileSystem();
+ list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name2), blockCache, stats, storeStats)));
+ list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name4), blockCache, stats, storeStats)));
+ list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name1), blockCache, stats, storeStats)));
+ list.add(new TrackedReference<Hoplog>(new HFileSortedOplog(hdfsStore, new Path(testDataDir, name3), blockCache, stats, storeStats)));
+
+ Iterator<TrackedReference<Hoplog>> iter = list.iterator();
+ assertEquals(name4, iter.next().get().getFileName());
+ assertEquals(name3, iter.next().get().getFileName());
+ assertEquals(name2, iter.next().get().getFileName());
+ assertEquals(name1, iter.next().get().getFileName());
+ }
+
+ /**
+ * Tests clear on a set of hoplogs.
+ */
+ public void testClear() throws Exception {
+ int bucketId = (int) System.nanoTime();
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+ // flush and create hoplog
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1-1")));
+ items.add(new TestEvent(("4"), ("1-4")));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("1"), ("2-1")));
+ items.add(new TestEvent(("3"), ("2-3")));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("3"), ("3-3")));
+ items.add(new TestEvent(("5"), ("3-5")));
+ organizer.flush(items.iterator(), items.size());
+
+ // check file existence in bucket directory
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+
+ // expect 3 files are 3 flushes
+ assertEquals(3, hoplogs.length);
+
+ organizer.clear();
+
+ // check that all files are now expired
+ hoplogs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ FileStatus[] exs = getBucketHoplogs(getName() + "/" + bucketId, HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ FileStatus[] valids = HdfsSortedOplogOrganizer.filterValidHoplogs(hoplogs, exs);
+ assertEquals(Collections.EMPTY_LIST, Arrays.asList(valids));
+
+ assertEquals(0, stats.getActiveFileCount());
+ assertEquals(0, stats.getInactiveFileCount());
+ }
+
+ public void testFixedIntervalMajorCompaction() throws Exception {
+ final AtomicInteger majorCReqCount = new AtomicInteger(0);
+
+ final Compactor compactor = new AbstractCompactor() {
+ @Override
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ majorCReqCount.incrementAndGet();
+ return true;
+ }
+ };
+
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0) {
+ @Override
+ public synchronized Compactor getCompactor() {
+ return compactor;
+ }
+ };
+
+ regionManager.addOrganizer(0, organizer);
+
+ System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "1");
+ HDFSRegionDirector.resetJanitor();
+
+ alterMajorCompaction(hdfsStore, true);
+
+ // create hoplog in the past, 90 seconds before current time
+ organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000));
+ TimeUnit.MILLISECONDS.sleep(50);
+ organizer.hoplogCreated(getName(), 0, new TestHoplog(hdfsStore, 100, System.currentTimeMillis() - 90000));
+
+ List<TrackedReference<Hoplog>> hoplogs = organizer.getSortedOplogs();
+ assertEquals(2, hoplogs.size());
+
+ for (int i = 0; i < 3; i++) {
+ TimeUnit.SECONDS.sleep(1);
+ assertEquals(0, majorCReqCount.get());
+ }
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ mutator.setMajorCompactionInterval(1);
+ hdfsStore.alter(mutator);
+ TimeUnit.SECONDS.sleep(5);
+ assertTrue(3 < majorCReqCount.get());
+ }
+
+
+ public void testCorruptHfileBucketFail() throws Exception {
+ // create a corrupt file
+ FileSystem fs = hdfsStore.getFileSystem();
+ for (int i = 0; i < 113; i++) {
+ FSDataOutputStream opStream = fs.create(new Path(testDataDir.getName() + "/region-1/" + i + "/1-1-1.hop"));
+ opStream.writeBytes("Some random corrupt file");
+ opStream.close();
+ }
+
+ // create region with store
+ regionfactory.setHDFSStoreName(HDFS_STORE_NAME);
+ Region<Object, Object> region1 = regionfactory.create("region-1");
+ IgnoredException ex = IgnoredException.addIgnoredException("CorruptHFileException");
+ try {
+ region1.get("key");
+ fail("get should have failed with corrupt file error");
+ } catch (HDFSIOException e) {
+ // expected
+ } finally {
+ ex.remove();
+ }
+
+ region1.destroyRegion();
+ }
+
+ public void testMaxOpenReaders() throws Exception {
+ System.setProperty("hoplog.bucket.max.open.files", "5");
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 10; i++) {
+ items.clear();
+ items.add(new TestEvent("" + i, "" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ HdfsSortedOplogOrganizer bucket = (HdfsSortedOplogOrganizer) organizer;
+ List<TrackedReference<Hoplog>> hoplogs = bucket.getSortedOplogs();
+ int closedCount = 0 ;
+ for (TrackedReference<Hoplog> hoplog : hoplogs) {
+ HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get();
+ if (hfile.isClosed()) {
+ closedCount++;
+ }
+ }
+ assertEquals(10, closedCount);
+ assertEquals(10, stats.getActiveFileCount());
+ assertEquals(0, stats.getActiveReaderCount());
+
+ byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+ organizer.read(keyBytes1).getValue();
+
+ closedCount = 0 ;
+ for (TrackedReference<Hoplog> hoplog : hoplogs) {
+ HFileSortedOplog hfile = (HFileSortedOplog) hoplog.get();
+ if (hfile.isClosed()) {
+ closedCount++;
+ }
+ }
+ assertEquals(5, closedCount);
+ assertEquals(10, stats.getActiveFileCount());
+ assertEquals(0, stats.getInactiveFileCount());
+ assertEquals(5, stats.getActiveReaderCount());
+
+ organizer.getCompactor().compact(false, false);
+ assertEquals(1, stats.getActiveFileCount());
+ assertEquals(0, stats.getActiveReaderCount());
+ assertEquals(0, stats.getInactiveFileCount());
+ }
+
+ public void testConcurrentReadInactiveClose() throws Exception {
+ final HoplogOrganizer<? extends PersistedEventImpl> organizer = regionManager.create(0);
+ alterMinorCompaction(hdfsStore, true);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 4; i++) {
+ items.clear();
+ items.add(new TestEvent("" + i, "" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ final byte[] keyBytes1 = BlobHelper.serializeToBlob("1");
+ class ReadTask implements Runnable {
+ public void run() {
+ try {
+ organizer.read(keyBytes1);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ ScheduledExecutorService[] readers = new ScheduledExecutorService[10];
+ for (int i = 0; i < readers.length; i++) {
+ readers[i] = Executors.newSingleThreadScheduledExecutor();
+ readers[i].scheduleWithFixedDelay(new ReadTask(), 0, 1, TimeUnit.MILLISECONDS);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ items.clear();
+ items.add(new TestEvent("" + i, "" + i));
+ organizer.flush(items.iterator(), items.size());
+ }
+
+ for (int i = 0; i < readers.length; i++) {
+ readers[i].shutdown();
+ readers[i].awaitTermination(1, TimeUnit.SECONDS);
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+
+ for (int i = 0; i < 20; i++) {
+ if (stats.getActiveFileCount() < 4) {
+ break;
+ }
+ organizer.getCompactor().compact(false, false);
+ }
+
+ organizer.performMaintenance();
+ TimeUnit.SECONDS.sleep(1);
+
+ assertTrue("" + stats.getActiveFileCount(), stats.getActiveFileCount() <= 4);
+ assertEquals(stats.getActiveReaderCount(), stats.getActiveReaderCount());
+ assertEquals(0, stats.getInactiveFileCount());
+ }
+
+ public void testEmptyBucketCleanup() throws Exception {
+ HdfsSortedOplogOrganizer o = new HdfsSortedOplogOrganizer(regionManager, 0);
+ long target = System.currentTimeMillis();
+ o.getOptimizationTargets(target);
+ // making sure empty bucket is not causing IO errors. no assertion needed
+ // for this test case.
+ }
+
+ public void testExpiredFilterAtStartup() throws Exception {
+ HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1-1")));
+ items.add(new TestEvent(("4"), ("1-4")));
+ bucket.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("1"), ("2-1")));
+ items.add(new TestEvent(("3"), ("2-3")));
+ bucket.flush(items.iterator(), items.size());
+
+ FileStatus[] files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+ List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs();
+ assertEquals(2, hoplogs.size());
+
+ bucket.clear();
+
+ files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+ hoplogs = bucket2.getSortedOplogs();
+ assertEquals(0, hoplogs.size());
+
+ items.clear();
+ items.add(new TestEvent(("1"), ("2-1")));
+ items.add(new TestEvent(("3"), ("2-3")));
+ bucket.flush(items.iterator(), items.size());
+
+ bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+ hoplogs = bucket2.getSortedOplogs();
+ assertEquals(1, hoplogs.size());
+ bucket.close();
+ bucket2.close();
+ }
+
+ public void testExpireFilterRetartAfterClear() throws Exception {
+ HdfsSortedOplogOrganizer bucket = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1-1")));
+ items.add(new TestEvent(("4"), ("1-4")));
+ bucket.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("1"), ("2-1")));
+ items.add(new TestEvent(("3"), ("2-3")));
+ bucket.flush(items.iterator(), items.size());
+
+ FileStatus[] files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ HdfsSortedOplogOrganizer bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+ List<TrackedReference<Hoplog>> hoplogs = bucket2.getSortedOplogs();
+ assertEquals(2, hoplogs.size());
+
+ bucket.clear();
+
+ files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ files = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ bucket2 = new HdfsSortedOplogOrganizer(regionManager, 0);
+ hoplogs = bucket2.getSortedOplogs();
+ assertEquals(0, hoplogs.size());
+ bucket.close();
+ bucket2.close();
+ }
+
+ /**
+ * tests maintenance does not fail even if there are no hoplogs
+ */
+ public void testNoFileJanitor() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer;
+ organizer = regionManager.create(0);
+ organizer.performMaintenance();
+ }
+
+ public void testValidHoplogRegex() {
+ String[] valid = {"1-1-1.hop", "1-1-1.ihop", "1-1-1.chop"};
+ String[] invalid = {"1-1-1.khop", "1-1-1.hop.tmphop", "1-1-1.hop.ehop", "1-1-.hop", "-1-1.hop"};
+
+ for (String string : valid) {
+ Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string);
+ assertTrue(matcher.matches());
+ }
+
+ for (String string : invalid) {
+ Matcher matcher = HdfsSortedOplogOrganizer.SORTED_HOPLOG_PATTERN.matcher(string);
+ assertFalse(matcher.matches());
+ }
+ }
+
+ public void testOneHoplogMajorCompaction() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ alterMajorCompaction(hdfsStore, true);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1-1")));
+ organizer.flush(items.iterator(),items.size());
+
+
+ FileStatus[] files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+
+ //Minor compaction will not perform on 1 .hop file
+ organizer.getCompactor().compact(false, false);
+ files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ //Major compaction will perform on 1 .hop file
+ organizer.getCompactor().compact(true, false);
+ files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+ String hoplogName =files[0].getPath().getName();
+ files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ organizer.getCompactor().compact(true, false);
+ files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+ assertEquals(hoplogName, files[0].getPath().getName());
+
+ //Minor compaction does not convert major compacted file
+ organizer.getCompactor().compact(false, false);
+ files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MINOR_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+ assertEquals(hoplogName, files[0].getPath().getName());
+
+ files = getBucketHoplogs(getName() + "/0", HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+ assertNotSame(hoplogName + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION, files[0].getPath().getName() );
+ }
+
+ public void testExposeCleanupInterval() throws Exception {
+ FileSystem fs = hdfsStore.getFileSystem();
+ Path cleanUpIntervalPath = new Path(hdfsStore.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+ assertTrue(fs.exists(cleanUpIntervalPath));
+ long interval = HDFSStore.DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS
+ *60 * 1000;
+ assertEquals(interval, HoplogUtil.readCleanUpIntervalMillis(fs,cleanUpIntervalPath));
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ System.setProperty(HoplogConfig.JANITOR_INTERVAL_SECS, "" + HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+ super.setUp();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
new file mode 100644
index 0000000..7420437
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HfileSortedOplogJUnitTest.java
@@ -0,0 +1,540 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogReader;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.junit.experimental.categories.Category;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HfileSortedOplogJUnitTest extends BaseHoplogTestCase {
+ ArrayList<Object> toBeCleaned = new ArrayList<>();
+
+ /**
+ * Tests hoplog creation using a writer. If this test fails, all the tests wills fail as hoplog
+ * creation is the first step
+ */
+ public void testHoplogWriter() throws Exception {
+ String hoplogName = getRandomHoplogName();
+ createHoplog(hoplogName, 1);
+ FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName));
+ assertNotNull(hoplogStatus);
+ }
+
+ /**
+ * Tests hoplog deletion.
+ */
+ public void testDeletion() throws Exception {
+ String hoplogName = getRandomHoplogName();
+ createHoplog(hoplogName, 1);
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+
+ testHoplog.delete();
+
+ try {
+ FileStatus hoplogStatus = hdfsStore.getFileSystem().getFileStatus(new Path(testDataDir, hoplogName));
+ // hoplog should not exists. fail if it does
+ assertNull("File deletion failed", hoplogStatus);
+ } catch (FileNotFoundException e) {
+ // exception expected after deletion
+ }
+ }
+
+ /**
+ * Tests hoplog reader creation and key based gets
+ */
+ public void testHoplogReader() throws Exception {
+ String hop1 = getRandomHoplogName();
+ Map<String, String> map = createHoplog(hop1, 10);
+
+ HFileSortedOplog testHoplog1 = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hop1), blockCache, stats, storeStats);
+ HoplogReader reader = testHoplog1.getReader();
+ // verify that each entry put in the hoplog is returned by reader
+ for (Entry<String, String> entry : map.entrySet()) {
+ byte[] value = reader.read(entry.getKey().getBytes());
+ assertNotNull(value);
+ }
+ }
+
+ /**
+ * Tests full iteration on a hoplog. Ensures all inserted keys are returned and no key is missing
+ */
+ public void testIterator() throws IOException {
+ int count = 10;
+ ByteArrayComparator bac = new ByteArrayComparator();
+
+ String hoplogName = getRandomHoplogName();
+ TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
+
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+ HoplogReader reader = testHoplog.getReader();
+
+ Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
+ HoplogIterator<byte[], byte[]> iter = reader.scan();
+ for (; iter.hasNext();) {
+ byte[] key = iter.next();
+ Entry<String, String> entry = mapIter.next();
+ assertEquals(0, bac.compare(key, iter.getKey()));
+ assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
+ assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
+ count--;
+ }
+ assertEquals(0, count);
+ }
+
+ /**
+ * Tests hoplog iterator. after returning first key, has next should return false and all
+ * subsequent next calls should return null
+ */
+ public void testSingleKVIterator() throws Exception {
+ String hoplogName = getRandomHoplogName();
+ TreeMap<String, String> map = createHoplog(hoplogName, 1);
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+ HoplogReader reader = testHoplog.getReader();
+
+ HoplogIterator<byte[], byte[]> iter = reader.scan();
+ assertNull(iter.getKey());
+ assertNull(iter.getValue());
+ assertTrue(iter.hasNext());
+ assertNull(iter.getKey());
+ assertNull(iter.getValue());
+
+ Entry<String, String> entry = map.firstEntry();
+ iter.next();
+ assertNotNull(iter.getKey());
+ assertEquals(entry.getKey(), new String(iter.getKey()));
+ assertNotNull(iter.getValue());
+ assertEquals(entry.getValue(), new String(iter.getValue()));
+
+ assertFalse(iter.hasNext());
+ try {
+ iter.next();
+ fail();
+ } catch (NoSuchElementException e) {
+ }
+ }
+
+ /**
+ * Tests iteration on a hoplog with no keys, using a scanner. Scanner should not return any value
+ * and hasNext should return false everytime
+ */
+ public void testEmptyFileIterator() throws Exception {
+ String hoplogName = getRandomHoplogName();
+ createHoplog(hoplogName, 0);
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+ HoplogReader reader = testHoplog.getReader();
+ HoplogIterator<byte[], byte[]> iter = reader.scan();
+ assertNull(iter.getKey());
+ assertNull(iter.getValue());
+ assertFalse(iter.hasNext());
+ assertNull(iter.getKey());
+ assertNull(iter.getValue());
+ try {
+ iter.next();
+ fail();
+ } catch (NoSuchElementException e) {
+ }
+ }
+
+ /**
+ * Tests from exclusive iterator
+ */
+ public void testFromExclusiveIterator() throws Exception {
+ fromIterator(false);
+ }
+
+ /**
+ * Tests from inclusive iterator
+ */
+ public void testFromInclusiveIterator() throws Exception {
+ fromIterator(true);
+ }
+
+ /**
+ * Tests from condition based iteration. creates hoplog with 10 KVs. Creates a scanner starting at
+ * a middle key and verifies the count of KVs iterated on
+ */
+ public void fromIterator(boolean includeFrom) throws Exception {
+ int count = 10;
+ ByteArrayComparator bac = new ByteArrayComparator();
+
+ String hoplogName = getRandomHoplogName();
+ // sorted map contains the keys inserted in the hoplog for testing
+ TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
+
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+ HoplogReader reader = testHoplog.getReader();
+
+ int middleKey = 4;
+ // remove top keys from the sorted map as the hoplog scanner should not
+ // return those
+ Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
+ for (int i = 0; i < middleKey; i++) {
+ mapIter.next();
+ count--;
+ }
+ if (!includeFrom) {
+ mapIter.next();
+ count--;
+ }
+
+ // keys are like Key-X, for X=0 till X=9. Start iterator at fifth key,
+ // key-4. if excluding from key, start at sixth key, key-5.
+ HoplogIterator<byte[], byte[]> iter = reader.scan(("key-" + middleKey).getBytes(), includeFrom,
+ null, true);
+
+ for (; iter.hasNext();) {
+ byte[] key = iter.next();
+ Entry<String, String> entry = mapIter.next();
+ // make sure the KV returned by iterator match the inserted KV
+ assertEquals(0, bac.compare(key, iter.getKey()));
+ assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
+ assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
+ count--;
+ }
+ assertEquals(0, count);
+ }
+
+ /**
+ * Tests to exclusive iterator
+ */
+ public void testToExclusiveIterator() throws Exception {
+ toIterator(false);
+ }
+
+ /**
+ * Tests to inclusive iterator
+ */
+ public void testToInclusiveIterator() throws Exception {
+ toIterator(true);
+ }
+
+ /**
+ * Tests to condition based iteration. creates hoplog with 10 KVs. Creates a scanner ending at
+ * a middle key and verifies the count of KVs iterated on
+ */
+ public void toIterator(boolean includeTo) throws Exception {
+ int count = 10;
+ ByteArrayComparator bac = new ByteArrayComparator();
+
+ String hoplogName = getRandomHoplogName();
+ // sorted map contains the keys inserted in the hoplog for testing
+ TreeMap<String, String> sortedMap = createHoplog(hoplogName, count);
+ Iterator<Entry<String, String>> mapIter = sortedMap.entrySet().iterator();
+
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+ HoplogReader reader = testHoplog.getReader();
+
+ int middleKey = 4;
+ // keys are like Key-X, for X=0 till X=9. End iterator at fifth key,
+ // key-4. if excluding to key, end at fourth key, key-3.
+ HoplogIterator<byte[], byte[]> iter = reader.scan(null, true, ("key-" + middleKey).getBytes(), includeTo);
+
+ for (; iter.hasNext();) {
+ byte[] key = iter.next();
+ Entry<String, String> entry = mapIter.next();
+ // make sure the KV returned by iterator match the inserted KV
+ assertEquals(0, bac.compare(key, iter.getKey()));
+ assertEquals(0, bac.compare(key, entry.getKey().getBytes()));
+ assertEquals(0, bac.compare(iter.getValue(), entry.getValue().getBytes()));
+
+ count --;
+ }
+
+ if (includeTo) {
+ count++;
+ }
+
+ assertEquals(10, count + middleKey);
+ }
+
+ /**
+ * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled
+ */
+ public void testFromToIterator() throws IOException {
+ ByteArrayComparator bac = new ByteArrayComparator();
+ String hoplogName = getRandomHoplogName();
+ HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+
+ int count = 5;
+ HoplogWriter writer = hoplog.createWriter(5);
+ for (int i = 0; i < count; i++) {
+ String value = "value-" + (i * 2);
+ // even keys key-[0 2 4 6 8]
+ writer.append(("key-" + (i * 2)).getBytes(), value.getBytes());
+ }
+ writer.close();
+
+ HoplogReader reader = hoplog.getReader();
+ HoplogIterator<byte[], byte[]> iter = reader.scan("key-1".getBytes(), true, "key-7".getBytes(), true);
+
+ for (int i = 2; i < 7; i += 2) {
+ assertTrue(iter.hasNext());
+ iter.next();
+ assertEquals(0, bac.compare(("key-" + i).getBytes(), iter.getKey()));
+ assertEquals(0, bac.compare(("value-" + i).getBytes(), iter.getValue()));
+ System.out.println(new String(iter.getKey()));
+ }
+ assertFalse(iter.hasNext());
+ }
+
+ /**
+ * Tests whether sortedoplog supports duplicate keys, required when conflation is disabled
+ */
+ public void testDuplicateKeys() throws IOException {
+ String hoplogName = getRandomHoplogName();
+ HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+
+ // write duplicate keys
+ int count = 2;
+ HoplogWriter writer = hoplog.createWriter(2);
+ List<String> values = new ArrayList<String>();
+ for(int i = 1; i <= count; i++) {
+ String value = "value" + i;
+ writer.append("key-1".getBytes(), value.getBytes());
+ values.add(value);
+ }
+ writer.close();
+
+ HoplogReader reader = hoplog.getReader();
+ HoplogIterator<byte[], byte[]> scanner = reader.scan();
+ for (byte[] key = null; scanner.hasNext();) {
+ key = scanner.next();
+ count--;
+ assertEquals(0, Bytes.compareTo(key, "key-1".getBytes()));
+ values.remove(new String(scanner.getValue()));
+ }
+ assertEquals(0, count);
+ assertEquals(0, values.size());
+ }
+
+ public void testOffsetBasedScan() throws Exception {
+ // Each record is 43 bytes. each block is 256 bytes. each block will have 6
+ // records
+
+ int blocksize = 1 << 8;
+ System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
+ String.valueOf(blocksize));
+
+ int count = 50;
+ String hoplogName = getRandomHoplogName();
+ createHoplog(hoplogName, count);
+
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
+ testDataDir, hoplogName), blockCache, stats, storeStats);
+
+ HoplogReader reader = testHoplog.getReader();
+
+ HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 1, blocksize * 2);
+ int range1Count = 0;
+ String range1EndKey = null;
+ for (byte[] key = null; scanner.hasNext();) {
+ key = scanner.next();
+ range1Count++;
+ range1EndKey = new String(key);
+ }
+ int range1EndKeyNum = Integer.valueOf(range1EndKey.substring("Key-".length()));
+
+ scanner = reader.scan(blocksize * 2, blocksize * 1);
+ int range2Count = 0;
+ String range2EndKey = null;
+ for (byte[] key = null; scanner.hasNext();) {
+ key = scanner.next();
+ range2Count++;
+ range2EndKey = new String(key);
+ }
+
+ assertEquals(range2EndKey, range1EndKey);
+ assertEquals(2, range1Count/range2Count);
+
+ scanner = reader.scan(blocksize * 3, blocksize * 1);
+ String range3FirstKey = new String(scanner.next());
+
+ int range3FirstKeyNum = Integer.valueOf(range3FirstKey.substring("Key-"
+ .length()));
+
+ // range 3 starts at the end of range 1. so the two keys must be consecutive
+ assertEquals(range1EndKeyNum + 1, range3FirstKeyNum);
+
+ testHoplog.close();
+ }
+
+ public void testOffsetScanBeyondFileSize() throws Exception {
+ // Each record is 43 bytes. each block is 256 bytes. each block will have 6
+ // records
+
+ int blocksize = 1 << 8;
+ System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
+ String.valueOf(blocksize));
+
+ int count = 20;
+ String hoplogName = getRandomHoplogName();
+ createHoplog(hoplogName, count);
+
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
+ testDataDir, hoplogName), blockCache, stats, storeStats);
+
+ HoplogReader reader = testHoplog.getReader();
+
+ HoplogIterator<byte[], byte[]> scanner = reader.scan(blocksize * 5, blocksize * 2);
+ assertFalse(scanner.hasNext());
+
+ testHoplog.close();
+ }
+
+ public void testZeroValueOffsetScan() throws Exception {
+ // Each record is 43 bytes. each block is 256 bytes. each block will have 6
+ // records
+
+ int blocksize = 1 << 8;
+ System.setProperty(HoplogConfig.HFILE_BLOCK_SIZE_CONF,
+ String.valueOf(blocksize));
+
+ int count = 20;
+ String hoplogName = getRandomHoplogName();
+ createHoplog(hoplogName, count);
+
+ HFileSortedOplog testHoplog = new HFileSortedOplog(hdfsStore, new Path(
+ testDataDir, hoplogName), blockCache, stats, storeStats);
+
+ HoplogReader reader = testHoplog.getReader();
+
+ HoplogIterator<byte[], byte[]> scanner = reader.scan(0, blocksize * 2);
+ assertTrue(scanner.hasNext());
+ int keyNum = Integer.valueOf(new String(scanner.next()).substring("Key-"
+ .length()));
+ assertEquals(100000, keyNum);
+
+ testHoplog.close();
+ }
+
+ /*
+ * Tests reader succeeds to read data even if FS client is recycled without
+ * this reader knowing
+ */
+ public void testReaderDetectAndUseRecycledFs() throws Exception {
+ HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory();
+ HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1");
+ toBeCleaned.add(store);
+
+ HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats);
+ toBeCleaned.add(hop);
+ TreeMap<String, String> map = createHoplog(10, hop);
+
+ HoplogReader reader = hop.getReader();
+ // verify that each entry put in the hoplog is returned by reader
+ for (Entry<String, String> entry : map.entrySet()) {
+ byte[] value = reader.read(entry.getKey().getBytes());
+ assertNotNull(value);
+ }
+
+ cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>");
+ try {
+ store.getFileSystem().close();
+ store.checkAndClearFileSystem();
+
+ for (Entry<String, String> entry : map.entrySet()) {
+ reader = hop.getReader();
+ byte[] value = reader.read(entry.getKey().getBytes());
+ assertNotNull(value);
+ }
+ } finally {
+ cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>");
+ }
+ }
+
+ public void testNewScannerDetechAndUseRecycledFs() throws Exception {
+ HDFSStoreFactoryImpl storeFactory = getCloseableLocalHdfsStoreFactory();
+ HDFSStoreImpl store = (HDFSStoreImpl) storeFactory.create("Store-1");
+ toBeCleaned.add(store);
+
+ HFileSortedOplog hop = new HFileSortedOplog(store, new Path(getName() + "-1-1.hop"), blockCache, stats, storeStats);
+ createHoplog(10, hop);
+
+ HoplogIterator<byte[], byte[]> scanner = hop.getReader().scan();
+ // verify that each entry put in the hoplog is returned by reader
+ int i = 0;
+ while (scanner.hasNext()) {
+ byte[] key = scanner.next();
+ assertNotNull(key);
+ i++;
+ }
+ assertEquals(10, i);
+ // flush block cache
+ hop.close(true);
+ hop.delete();
+
+ hop = new HFileSortedOplog(store, new Path(getName()+"-1-1.hop"), blockCache, stats, storeStats);
+ createHoplog(10, hop);
+ toBeCleaned.add(hop);
+ hop.getReader();
+
+ cache.getLogger().info("<ExpectedException action=add>java.io.IOException</ExpectedException>");
+ try {
+ store.getFileSystem().close();
+ store.checkAndClearFileSystem();
+
+ scanner = hop.getReader().scan();
+ // verify that each entry put in the hoplog is returned by reader
+ i = 0;
+ while (scanner.hasNext()) {
+ byte[] key = scanner.next();
+ assertNotNull(key);
+ i++;
+ }
+ assertEquals(10, i);
+ } finally {
+ cache.getLogger().info("<ExpectedException action=remove>java.io.IOException</ExpectedException>");
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ for (Object obj : toBeCleaned) {
+ try {
+ if (HDFSStoreImpl.class.isInstance(obj)) {
+ ((HDFSStoreImpl) obj).clearFolder();
+ } else if (AbstractHoplog.class.isInstance(obj)) {
+ ((AbstractHoplog) obj).close();
+ ((AbstractHoplog) obj).delete();
+ }
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+ }
+ super.tearDown();
+ }
+
+ private TreeMap<String, String> createHoplog(String hoplogName, int numKeys) throws IOException {
+ HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(testDataDir, hoplogName), blockCache, stats, storeStats);
+ TreeMap<String, String> map = createHoplog(numKeys, hoplog);
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
new file mode 100644
index 0000000..13aa6a9
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SortedOplogListIterJUnitTest.java
@@ -0,0 +1,178 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class SortedOplogListIterJUnitTest extends BaseHoplogTestCase {
+ public void testOneIterOneKey() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("0"), ("0")));
+ organizer.flush(items.iterator(), items.size());
+
+ List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+ HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+ assertTrue(iter.hasNext());
+ int count = 0;
+ for (ByteBuffer keyBB = null; iter.hasNext();) {
+ keyBB = iter.next();
+ byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+ assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+ count++;
+ }
+ assertEquals(1, count);
+ organizer.close();
+ }
+
+ public void testOneIterDuplicateKey() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("0"), ("V2")));
+ items.add(new TestEvent(("0"), ("V1")));
+ items.add(new TestEvent(("1"), ("V2")));
+ items.add(new TestEvent(("1"), ("V1")));
+ organizer.flush(items.iterator(), items.size());
+
+ List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+ HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+ assertTrue(iter.hasNext());
+ int count = 0;
+ for (ByteBuffer keyBB = null; iter.hasNext();) {
+ keyBB = iter.next();
+ byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+ byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
+ assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+ assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
+ count++;
+ }
+ assertEquals(2, count);
+ organizer.close();
+ }
+
+ public void testTwoIterSameKey() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("0"), ("V1")));
+ organizer.flush(items.iterator(), items.size());
+ items.clear();
+ items.add(new TestEvent(("0"), ("V2")));
+ organizer.flush(items.iterator(), items.size());
+
+ List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+ HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+ assertTrue(iter.hasNext());
+ int count = 0;
+ for (ByteBuffer keyBB = null; iter.hasNext();) {
+ keyBB = iter.next();
+ byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+ byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
+ assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+ assertEquals("V2", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
+ count++;
+ }
+ assertEquals(1, count);
+ organizer.close();
+ }
+
+ public void testTwoIterDiffKey() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("0"), ("V1")));
+ organizer.flush(items.iterator(), items.size());
+ items.clear();
+ items.add(new TestEvent(("1"), ("V1")));
+ organizer.flush(items.iterator(), items.size());
+
+ List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+ HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+ assertTrue(iter.hasNext());
+ int count = 0;
+ for (ByteBuffer keyBB = null; iter.hasNext();) {
+ keyBB = iter.next();
+ byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+ byte[] value = HFileSortedOplog.byteBufferToArray(iter.getValue());
+ assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+ assertEquals("V1", ((PersistedEventImpl) SortedHoplogPersistedEvent.fromBytes(value)).getValue());
+ count++;
+ }
+ assertEquals(2, count);
+ organizer.close();
+ }
+
+ public void testMergedIterator() throws Exception {
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ // #1
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1")));
+ items.add(new TestEvent(("2"), ("1")));
+ items.add(new TestEvent(("3"), ("1")));
+ items.add(new TestEvent(("4"), ("1")));
+ organizer.flush(items.iterator(), items.size());
+
+ // #2
+ items.clear();
+ items.add(new TestEvent(("2"), ("1")));
+ items.add(new TestEvent(("4"), ("1")));
+ items.add(new TestEvent(("6"), ("1")));
+ items.add(new TestEvent(("8"), ("1")));
+ organizer.flush(items.iterator(), items.size());
+
+ // #3
+ items.clear();
+ items.add(new TestEvent(("1"), ("1")));
+ items.add(new TestEvent(("3"), ("1")));
+ items.add(new TestEvent(("5"), ("1")));
+ items.add(new TestEvent(("7"), ("1")));
+ items.add(new TestEvent(("9"), ("1")));
+ organizer.flush(items.iterator(), items.size());
+
+ // #4
+ items.clear();
+ items.add(new TestEvent(("0"), ("1")));
+ items.add(new TestEvent(("1"), ("1")));
+ items.add(new TestEvent(("4"), ("1")));
+ items.add(new TestEvent(("5"), ("1")));
+ organizer.flush(items.iterator(), items.size());
+
+ List<TrackedReference<Hoplog>> oplogs = organizer.getSortedOplogs();
+ HoplogSetIterator iter = new HoplogSetIterator(oplogs);
+ // the iteration pattern for this test should be 0-9:
+ // 0 1 4 5 oplog #4
+ // 1 3 5 7 9 oplog #3
+ // 2 4 6 8 oplog #2
+ // 1 2 3 4 oplog #1
+ int count = 0;
+ for (ByteBuffer keyBB = null; iter.hasNext();) {
+ keyBB = iter.next();
+ byte[] key = HFileSortedOplog.byteBufferToArray(keyBB);
+ assertEquals(String.valueOf(count), BlobHelper.deserializeBlob(key));
+ count++;
+ }
+ assertEquals(10, count);
+ organizer.close();
+ }
+}