You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/27 22:49:52 UTC
[06/25] incubator-geode git commit: GEODE-10: Reinstating HDFS
persistence code
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
new file mode 100644
index 0000000..011d82b
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
@@ -0,0 +1,449 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager.CompactionRequest;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSCompactionManagerJUnitTest extends BaseHoplogTestCase {
+ /**
+ * Tests queueing of major and minor compaction requests in respective queues
+ */
+ public void testMinMajCompactionIsolation() throws Exception {
+ // no-op compactor
+ Compactor compactor = new AbstractCompactor() {
+ Object minor = new Object();
+ Object major = new Object();
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ try {
+ if (isMajor) {
+ synchronized (major) {
+ major.wait();
+ }
+ } else {
+ synchronized (minor) {
+ minor.wait();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return true;
+ }
+ };
+
+ // compaction is disabled. all requests will wait in queue
+ HDFSCompactionManager instance = HDFSCompactionManager.getInstance(hdfsStore);
+ alterMinorCompaction(hdfsStore, true);
+ alterMajorCompaction(hdfsStore, true);
+
+ assertEquals(0, instance.getMinorCompactor().getActiveCount());
+ assertEquals(0, instance.getMajorCompactor().getActiveCount());
+
+ //minor request
+ CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
+ HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+ //major request
+ cr = new CompactionRequest("region", 0, compactor, true);
+ HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+
+ //wait for requests to get in queue
+ TimeUnit.MILLISECONDS.sleep(50);
+ assertEquals(1, instance.getMinorCompactor().getActiveCount());
+ assertEquals(1, instance.getMajorCompactor().getActiveCount());
+ }
+
+ /**
+ * Tests compaction pause. Once compaction is stopped, requests will
+ * start getting rejected
+ */
+ public void testAlterAutoMinorCompaction() throws Exception {
+ // each new compaction execution increments counter by 1. this way track how many pending tasks
+ final AtomicInteger totalExecuted = new AtomicInteger(0);
+ Compactor compactor = new AbstractCompactor() {
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ totalExecuted.incrementAndGet();
+ return true;
+ }
+ };
+
+ // compaction is enabled. submit requests and after some time counter should be 0
+ alterMinorCompaction(hdfsStore, true);
+ CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
+ HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+ cr = new CompactionRequest("region", 1, compactor, false);
+ HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+
+ int totalWait = 20;
+ while (totalWait > 0 && 2 != totalExecuted.get()) {
+ // wait for operations to complete. The execution will terminate as soon as possible
+ System.out.println("waiting one small cycle for dummy request to complete");
+ TimeUnit.MILLISECONDS.sleep(50);
+ totalWait--;
+ }
+ assertEquals(2, totalExecuted.get());
+
+ // so compaction works. now disable comapction and submit large number of requests till rejected
+ // execution counter should not increase
+ alterMinorCompaction(hdfsStore, false);
+ boolean success = false;
+ int i = 0;
+ do {
+ cr = new CompactionRequest("region", ++i, compactor, false);
+ success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ } while (success);
+
+ TimeUnit.MILLISECONDS.sleep(500);
+ assertEquals(2, totalExecuted.get());
+ }
+ public void testAlterAutoMajorCompaction() throws Exception {
+ // each new compaction execution increments counter by 1. this way track how many pending tasks
+ final AtomicInteger totalExecuted = new AtomicInteger(0);
+ Compactor compactor = new AbstractCompactor() {
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ totalExecuted.incrementAndGet();
+ return true;
+ }
+ };
+
+ // compaction is enabled. submit requests and after some time counter should be 0
+ alterMajorCompaction(hdfsStore, true);
+ CompactionRequest cr = new CompactionRequest("region", 0, compactor, true);
+ HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+ cr = new CompactionRequest("region", 1, compactor, true);
+ HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+
+ int totalWait = 20;
+ while (totalWait > 0 && 2 != totalExecuted.get()) {
+ // wait for operations to complete. The execution will terminate as soon as possible
+ System.out.println("waiting one small cycle for dummy request to complete");
+ TimeUnit.MILLISECONDS.sleep(50);
+ totalWait--;
+ }
+ assertEquals(2, totalExecuted.get());
+
+ // so compaction works. now disable comapction and submit large number of requests till rejected
+ // execution counter should not increase
+ alterMajorCompaction(hdfsStore, false);
+ boolean success = false;
+ int i = 0;
+ do {
+ cr = new CompactionRequest("region", ++i, compactor, true);
+ success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ System.out.println("success: " + success);
+ } while (success);
+
+ TimeUnit.MILLISECONDS.sleep(500);
+ assertEquals(2, totalExecuted.get());
+ }
+
+ /**
+ * Tests duplicate compaction requests do not cause rejection
+ */
+ public void testDuplicateRequests() throws Exception {
+ final AtomicBoolean barrierOpen = new AtomicBoolean(false);
+ class TestCompactor extends AbstractCompactor {
+ AtomicBoolean busy = new AtomicBoolean(false);
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ synchronized (barrierOpen) {
+ busy.set(true);
+ if (barrierOpen.get()) {
+ return false;
+ }
+ try {
+ barrierOpen.wait();
+ } catch (InterruptedException e) {
+ return false;
+ }
+ busy.set(false);
+ }
+ return true;
+ }
+ public boolean isBusy(boolean isMajor) {return busy.get();}
+ };
+
+ System.setProperty(HoplogConfig.COMPCATION_QUEUE_CAPACITY, "10");
+
+ alterMinorCompaction(hdfsStore, true);
+ alterMajorCompaction(hdfsStore, true);
+ // capacity is 10, thread num is 2, so only the first 12 request will be
+ // submitted
+ for (int i = 0; i < 15; i++) {
+ CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), true);
+ boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ if (success) {
+ assertTrue("failed for " + i, i < 12);
+ } else {
+ assertTrue("failed for " + i, i >= 12);
+ }
+ }
+
+ synchronized (barrierOpen) {
+ barrierOpen.set(true);
+ barrierOpen.notifyAll();
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ barrierOpen.set(false);
+
+ HDFSCompactionManager.getInstance(hdfsStore).reset();
+ TestCompactor compactor = new TestCompactor();
+ for (int i = 0; i < 10; i++) {
+ TimeUnit.MILLISECONDS.sleep(20);
+ CompactionRequest cr = new CompactionRequest("region", 0, compactor, true);
+ boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ if (success) {
+ assertTrue("failed for " + i, i < 2);
+ } else {
+ assertTrue("failed for " + i, i > 0);
+ }
+ }
+ }
+
+ public void testForceCompactionWithAutoDisabled() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1-1")));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("2"), ("2-1")));
+ organizer.flush(items.iterator(), items.size());
+
+ FileStatus[] files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ CompactionRequest cr = new CompactionRequest(getName(), 0, organizer.getCompactor(), true);
+ HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+ TimeUnit.MILLISECONDS.sleep(500);
+
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ organizer.forceCompaction(true);
+ TimeUnit.MILLISECONDS.sleep(500);
+
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+ }
+
+ /**
+ * Test force major compaction completes on version upgrade even when there is only one hoplog
+ */
+ public void testForceCompaction() throws Exception {
+ HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ items.add(new TestEvent(("1"), ("1-1")));
+ organizer.flush(items.iterator(), items.size());
+
+ items.clear();
+ items.add(new TestEvent(("2"), ("2-1")));
+ organizer.flush(items.iterator(), items.size());
+
+ FileStatus[] files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(0, files.length);
+
+ // isForced is true for user submitted compaction requests (through system procedure)
+ // we do not want to compact an already compacted file
+ CompactionRequest cr = new CompactionRequest(getName(), 0, organizer.getCompactor(), true, true/*isForced*/);
+ Future<CompactionStatus> status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+ status.get().equals(true);
+
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ // second request to force compact does not do anything
+ status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+ status.get().equals(false);
+
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(1, files.length);
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+
+ // upon version upgrade force compaction is allowed
+ cr = new CompactionRequest(getName(), 0, organizer.getCompactor(), true, true, true);
+ status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
+ status.get().equals(true);
+
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
+ assertEquals(2, files.length);
+ files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+ assertEquals(3, files.length); // + 1 for old major hoplog
+ }
+
+ /**
+ * Test successful sequential submission
+ */
+ public void testSameBucketSeqRequest() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ Compactor compactor = new AbstractCompactor() {
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ counter.set(1);
+ return true;
+ }
+ };
+
+ HDFSCompactionManager.getInstance(hdfsStore).reset();
+ alterMinorCompaction(hdfsStore, true);
+ alterMajorCompaction(hdfsStore, true);
+ CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
+ assertEquals(0, counter.get());
+ boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ assertEquals(true, success);
+ while (!counter.compareAndSet(1, 0)) {
+ TimeUnit.MILLISECONDS.sleep(20);
+ }
+
+ assertEquals(0, counter.get());
+ success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ assertEquals(true, success);
+ for (int i = 0; i < 10; i++) {
+ TimeUnit.MILLISECONDS.sleep(20);
+ if (counter.get() == 1) {
+ break;
+ }
+ }
+ assertEquals(1, counter.get());
+ }
+
+ public void testAlterMinorThreadsIncrease() throws Exception {
+ doAlterCompactionThreads(false, false);
+ }
+ public void testAlterMinorThreadsDecrease() throws Exception {
+ doAlterCompactionThreads(false, true);
+ }
+ public void testAlterMajorThreadsIncrease() throws Exception {
+ doAlterCompactionThreads(true, false);
+ }
+ public void testAlterMajorThreadsDecrease() throws Exception {
+ doAlterCompactionThreads(true, true);
+ }
+
+ public void doAlterCompactionThreads(final boolean testMajor, boolean decrease) throws Exception {
+ final AtomicBoolean barrierOpen = new AtomicBoolean(false);
+ final AtomicInteger counter = new AtomicInteger(0);
+ class TestCompactor extends AbstractCompactor {
+ public boolean compact(boolean isMajor, boolean isForced) throws IOException {
+ synchronized (barrierOpen) {
+ if ((testMajor && !isMajor) || (!testMajor && isMajor)) {
+ return true;
+ }
+ if (barrierOpen.get()) {
+ return false;
+ }
+ try {
+ barrierOpen.wait();
+ } catch (InterruptedException e) {
+ return false;
+ }
+ counter.incrementAndGet();
+ }
+ return true;
+ }
+ };
+
+ System.setProperty(HoplogConfig.COMPCATION_QUEUE_CAPACITY, "1");
+
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ int defaultThreadCount = 10;
+ if (testMajor) {
+ alterMajorCompaction(hdfsStore, true);
+ defaultThreadCount = 2;
+ mutator.setMajorCompactionThreads(15);
+ if (decrease) {
+ mutator.setMajorCompactionThreads(1);
+ }
+ } else {
+ alterMinorCompaction(hdfsStore, true);
+ mutator.setMinorCompactionThreads(15);
+ if (decrease) {
+ mutator.setMinorCompactionThreads(1);
+ }
+ }
+
+ // capacity is 1, thread num is 10 or 2, so only the first 11 or 3 request will be
+ // submitted
+ cache.getLogger().info("<ExpectedException action=add>java.util.concurrent.RejectedExecutionException</ExpectedException>");
+ for (int i = 0; i < 15; i++) {
+ CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), testMajor);
+ boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ if (success) {
+ assertTrue("failed for " + i, i <= defaultThreadCount);
+ } else {
+ assertTrue("failed for " + i, i > defaultThreadCount);
+ }
+ }
+
+ TimeUnit.MILLISECONDS.sleep(500);
+ assertEquals(0, counter.get());
+ synchronized (barrierOpen) {
+ barrierOpen.set(true);
+ barrierOpen.notifyAll();
+ }
+ TimeUnit.MILLISECONDS.sleep(500);
+ assertEquals(defaultThreadCount, counter.get());
+
+ hdfsStore.alter(mutator);
+
+ counter.set(0);
+ barrierOpen.set(false);
+ for (int i = 0; i < 15; i++) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), testMajor);
+ boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
+ if (decrease) {
+ if (i > 3) {
+ assertFalse("failed for " + i, success);
+ }
+ } else {
+ assertTrue("failed for " + i, success);
+ }
+ }
+ TimeUnit.MILLISECONDS.sleep(500);
+ synchronized (barrierOpen) {
+ barrierOpen.set(true);
+ barrierOpen.notifyAll();
+ }
+ TimeUnit.MILLISECONDS.sleep(500);
+ if (decrease) {
+ assertTrue(counter.get() < 4);
+ } else {
+ assertEquals(15, counter.get());
+ }
+
+ cache.getLogger().info("<ExpectedException action=remove>java.util.concurrent.RejectedExecutionException</ExpectedException>");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
new file mode 100644
index 0000000..dc7b987
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
@@ -0,0 +1,97 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSRegionDirectorJUnitTest extends BaseHoplogTestCase {
+ public void testDirector() throws Exception {
+ int bucketId = 0;
+
+ HdfsRegionManager mgr = regionManager;
+
+ // no buckets have been created so far.
+ assertEquals(0, director.getBucketCount("/" + getName()));
+
+ // one bucket created
+ mgr.create(bucketId);
+ assertEquals(1, director.getBucketCount("/" + getName()));
+
+ // close bucket test
+ mgr.close(bucketId);
+
+ // all buckets have been closed.
+ assertEquals(0, director.getBucketCount("/" + getName()));
+
+ mgr.create(bucketId);
+ assertEquals(1, director.getBucketCount("/" + getName()));
+ director.clear("/" + getName());
+ try {
+ assertEquals(0, director.getBucketCount("/" + getName()));
+ fail("The region is no longer managed, hence an exception is expected");
+ } catch (IllegalStateException e) {
+ // exception expected as the region is no longer managed
+ }
+ }
+
+ public void testCompactionEvents() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ HoplogListener myListener = new HoplogListener() {
+ public void hoplogDeleted(String regionFolder, int bucketId, Hoplog... oplogs)
+ throws IOException {
+ }
+ public void hoplogCreated(String regionFolder, int bucketId, Hoplog... oplogs)
+ throws IOException {
+ }
+ public void compactionCompleted(String region, int bucket, boolean isMajor) {
+ counter.incrementAndGet();
+ }
+ };
+
+ HoplogListenerForRegion listenerManager = ((LocalRegion)region).getHoplogListener();
+ listenerManager.addListener(myListener);
+
+ HoplogOrganizer bucket = regionManager.create(0);
+ // #1
+ ArrayList<PersistedEventImpl> items = new ArrayList<PersistedEventImpl>();
+ items.add(new TestEvent("1", "1"));
+ bucket.flush(items.iterator(), items.size());
+
+ // #2
+ items.clear();
+ items.add(new TestEvent("2", "1"));
+ bucket.flush(items.iterator(), items.size());
+
+ // #3
+ items.clear();
+ items.add(new TestEvent("3", "1"));
+ bucket.flush(items.iterator(), items.size());
+
+ // #4
+ items.clear();
+ items.add(new TestEvent("4", "1"));
+ bucket.flush(items.iterator(), items.size());
+
+ bucket.getCompactor().compact(false, false);
+ assertEquals(1, counter.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
new file mode 100644
index 0000000..1d17232
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
@@ -0,0 +1,250 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSStatsJUnitTest extends BaseHoplogTestCase {
+ public void testStoreUsageStats() throws Exception {
+ HoplogOrganizer bucket = regionManager.create(0);
+
+ long oldUsage = 0;
+ assertEquals(oldUsage, stats.getStoreUsageBytes());
+
+ for (int j = 0; j < 5; j++) {
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 100; i++) {
+ String key = ("key-" + (j * 100 + i));
+ String value = ("value-" + System.nanoTime());
+ items.add(new TestEvent(key, value));
+ }
+ bucket.flush(items.iterator(), 100);
+ }
+
+ assertTrue(0 < stats.getStoreUsageBytes());
+ oldUsage = stats.getStoreUsageBytes();
+
+ HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
+ assertEquals(2, stats.getStoreUsageBytes() / oldUsage);
+
+ organizer.close();
+ assertEquals(1, stats.getStoreUsageBytes() / oldUsage);
+ }
+
+ public void testWriteStats() throws Exception {
+ HoplogOrganizer bucket = regionManager.create(0);
+
+ // validate flush stats
+ // flush and create many hoplogs and execute one compaction cycle also
+ // 5 hoplogs, total 500 keys
+ assertEquals(0, stats.getFlush().getCount());
+ assertEquals(0, stats.getFlush().getBytes());
+ assertEquals(0, stats.getActiveFileCount());
+ int bytesSent = 0;
+ for (int j = 0; j < 5; j++) {
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 100; i++) {
+ String key = ("key-" + (j * 100 + i));
+ String value = ("value-" + System.nanoTime());
+ items.add(new TestEvent(key, value));
+ bytesSent += (key.getBytes().length + value.getBytes().length);
+ }
+ bucket.flush(items.iterator(), 100);
+
+ // verify stats show
+ assertEquals(j + 1, stats.getFlush().getCount());
+ assertTrue(stats.getFlush().getBytes() > bytesSent);
+ assertEquals(j + 1, stats.getActiveFileCount());
+ }
+
+ // verify compaction stats
+ assertEquals(0, stats.getMinorCompaction().getCount());
+ assertEquals(0, stats.getMinorCompaction().getBytes());
+ assertEquals(0, stats.getInactiveFileCount());
+ bucket.getCompactor().compact(false, false);
+ assertEquals(1, stats.getMinorCompaction().getCount());
+ assertEquals(1, stats.getActiveFileCount());
+ assertEquals(0, stats.getInactiveFileCount());
+ assertEquals(stats.getMinorCompaction().getBytes(), stats.getFlush()
+ .getBytes());
+ }
+
+ public void testInactiveFileStats() throws Exception {
+ // steps
+ // create files -> validate active and inactive file count
+ // -> increment reference by using scanner-> compact -> verify active and inactive file count
+ HoplogOrganizer bucket = regionManager.create(0);
+ assertEquals(0, stats.getActiveFileCount());
+ assertEquals(0, stats.getInactiveFileCount());
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int j = 0; j < 5; j++) {
+ items.clear();
+ for (int i = 0; i < 100; i++) {
+ String key = ("key-" + (j * 100 + i));
+ String value = ("value-" + System.nanoTime());
+ items.add(new TestEvent(key, value));
+ }
+ bucket.flush(items.iterator(), 100);
+ }
+
+ assertEquals(5, stats.getActiveFileCount());
+ assertEquals(0, stats.getInactiveFileCount());
+
+ HoplogIterator<byte[], PersistedEventImpl> scanner = bucket.scan();
+ bucket.getCompactor().compact(true, false);
+ assertEquals(1, stats.getActiveFileCount());
+ assertEquals(5, stats.getInactiveFileCount());
+
+ scanner.close();
+ assertEquals(1, stats.getActiveFileCount());
+ assertEquals(0, stats.getInactiveFileCount());
+ }
+
+ public void testReadStats() throws Exception {
+ HoplogOrganizer<SortedHoplogPersistedEvent> bucket = regionManager.create(0);
+
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 100; i++) {
+ items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
+ }
+ bucket.flush(items.iterator(), 100);
+
+ // validate read stats
+ assertEquals(0, stats.getRead().getCount());
+ assertEquals(0, stats.getRead().getBytes());
+ // number of bytes read must be greater than size of key and value and must be increasing
+ int bytesRead = "key-1".getBytes().length + "value=1233232".getBytes().length;
+ for (int i = 0; i < 5; i++) {
+ long previousRead = stats.getRead().getBytes();
+ PersistedEventImpl e = bucket.read(BlobHelper.serializeToBlob("key-" + i));
+ assertNotNull(e);
+ assertEquals(i + 1, stats.getRead().getCount());
+ assertTrue( (bytesRead + previousRead) < stats.getRead().getBytes());
+ }
+
+ //Make sure the block cache stats are being updated.
+// assertTrue(storeStats.getBlockCache().getMisses() > 0);
+// assertTrue(storeStats.getBlockCache().getBytesCached() > 0);
+// assertTrue(storeStats.getBlockCache().getCached() > 0);
+
+ //Do a duplicate read to make sure we get a hit in the cache
+// bucket.read(BlobHelper.serializeToBlob("key-" + 0));
+// assertTrue(storeStats.getBlockCache().getHits() > 0);
+ }
+
+ public void testBloomStats() throws Exception {
+ HoplogOrganizer bucket = regionManager.create(0);
+
+ // create 10 hoplogs
+ for (int j = 0; j < 5; j++) {
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 100; i++) {
+ String key = ("key-" + (j * 100 + i));
+ String value = ("value-" + System.nanoTime());
+ items.add(new TestEvent(key, value));
+ }
+ bucket.flush(items.iterator(), 100);
+ }
+
+ // initially bloom stat will be zero
+ // reading key in first hop will increase bloom hit by 1 (key 0 to 99)
+ // reading key in 5 hoplog will increase bloom hit by 5 (key 400 to 499)
+ assertEquals(0, stats.getBloom().getCount());
+ bucket.read(BlobHelper.serializeToBlob("key-450"));
+ assertEquals(1, stats.getBloom().getCount());
+ bucket.read(BlobHelper.serializeToBlob("key-50"));
+ assertEquals(6, stats.getBloom().getCount());
+ }
+
+ public void testScanStats() throws Exception {
+ HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(
+ testDataDir, "H-1-1.hop"),blockCache, stats, storeStats);
+ createHoplog(5, hoplog);
+
+ // initially scan stats will be zero. creating a scanner should increase
+ // scan iteration stats and bytes. On scanner close scan count should be
+ // incremented
+ assertEquals(0, stats.getScan().getCount());
+ assertEquals(0, stats.getScan().getBytes());
+ assertEquals(0, stats.getScan().getTime());
+ assertEquals(0, stats.getScan().getIterations());
+ assertEquals(0, stats.getScan().getIterationTime());
+
+ HoplogIterator<byte[], byte[]> scanner = hoplog.getReader().scan();
+ assertEquals(0, stats.getScan().getCount());
+ int count = 0;
+ for (byte[] bs = null; scanner.hasNext(); ) {
+ bs = scanner.next();
+ count += bs.length + scanner.getValue().length;
+ }
+ assertEquals(count, stats.getScan().getBytes());
+ assertEquals(5, stats.getScan().getIterations());
+ assertTrue(0 < stats.getScan().getIterationTime());
+ // getcount will be 0 as scanner.close is not being called
+ assertEquals(0, stats.getScan().getCount());
+ assertEquals(0, stats.getScan().getTime());
+ assertEquals(1, stats.getScan().getInProgress());
+
+ scanner.close();
+ assertEquals(1, stats.getScan().getCount());
+ assertTrue(0 < stats.getScan().getTime());
+ assertTrue(stats.getScan().getIterationTime() <= stats.getScan().getTime());
+ }
+
+ /**
+ * Validates two buckets belonging to same region update the same stats
+ */
+ public void testRegionBucketShareStats() throws Exception {
+ HoplogOrganizer bucket1 = regionManager.create(0);
+ HoplogOrganizer bucket2 = regionManager.create(1);
+
+ // validate flush stats
+ assertEquals(0, stats.getFlush().getCount());
+ assertEquals(0, stats.getActiveFileCount());
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < 100; i++) {
+ items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
+ }
+ bucket1.flush(items.iterator(), 100);
+ assertEquals(1, stats.getFlush().getCount());
+ assertEquals(1, stats.getActiveFileCount());
+ items.clear();
+
+ for (int i = 0; i < 100; i++) {
+ items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
+ }
+ bucket2.flush(items.iterator(), 100);
+ assertEquals(2, stats.getFlush().getCount());
+ assertEquals(2, stats.getActiveFileCount());
+ }
+
+ @Override
+ protected Cache createCache() {
+ CacheFactory cf = new CacheFactory().set("mcast-port", "0")
+ .set("log-level", "info")
+ .set("enable-time-statistics", "true")
+// .set("statistic-archive-file", "statArchive.gfs")
+ ;
+ cache = cf.create();
+
+ return cache;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
new file mode 100644
index 0000000..ab1ccac
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
@@ -0,0 +1,297 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog.SequenceFileIterator;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+/**
+ * Test class to test hoplog functionality for streaming ingest
+ *
+ * @author hemantb
+ *
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSUnsortedHoplogOrganizerJUnitTest extends BaseHoplogTestCase {
+
+ /**
+ * Tests flush operation
+ */
+ public void testFlush() throws Exception {
+ int count = 10;
+ int bucketId = (int) System.nanoTime();
+ HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
+
+ // flush and create hoplog
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < count; i++) {
+ items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+ }
+
+ organizer.flush(items.iterator(), count);
+ organizer.closeCurrentWriter();
+
+ // check file existence in bucket directory
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+
+ // only one hoplog should exists
+ assertEquals(1, hoplogs.length);
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+ }
+
+ public void testAlterRollOverInterval() throws Exception {
+ HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, 0);
+
+ // flush 4 times with small delays. Only one seq file will be created
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int j = 0; j < 3; j++) {
+ items.clear();
+ for (int i = 0; i < 10; i++) {
+ items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
+ }
+ organizer.flush(items.iterator(), 10);
+ TimeUnit.MILLISECONDS.sleep(1100);
+ }
+ organizer.closeCurrentWriter();
+
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+
+ // only one hoplog should exists
+ assertEquals(1, hoplogs.length);
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+
+ HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
+ mutator.setWriteOnlyFileRolloverInterval(1);
+ hdfsStore.alter(mutator);
+
+ TimeUnit.MILLISECONDS.sleep(1100);
+ for (int j = 0; j < 2; j++) {
+ items.clear();
+ for (int i = 0; i < 10; i++) {
+ items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
+ }
+ organizer.flush(items.iterator(), 10);
+ TimeUnit.MILLISECONDS.sleep(1100);
+ }
+ organizer.closeCurrentWriter();
+ hoplogs = getBucketHoplogs(getName() + "/" + 0,
+ HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+ assertEquals(3, hoplogs.length);
+ }
+
+ public void testSequenceFileScan() throws Exception {
+ int count = 10000;
+ int bucketId = (int) System.nanoTime();
+ HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
+
+ // flush and create hoplog
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < count; i++) {
+ items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+ }
+
+ organizer.flush(items.iterator(), count);
+ organizer.closeCurrentWriter();
+
+ // check file existence in bucket directory
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
+
+ // only one hoplog should exists
+ assertEquals(1, hoplogs.length);
+
+ SequenceFileDetails sfd = getSequenceFileDetails(hdfsStore.getFileSystem(), hoplogs[0].getPath());
+
+ // End position is before a sync. Should read until sync.
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync ,
+ 0, sfd.posBeforeSecondSync);
+
+ // Start position is inside header. Should start from first key and go to next sync point.
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync,
+ 10, sfd.posAfterFirstSync);
+
+ // Start and end position are between two sync markers. Should not read any keys.
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 29, 28,
+ sfd.posAfterFirstSync, sfd.posBeforeSecondSync - sfd.posAfterFirstSync);
+
+ // Start position is after a sync and End position is beyond the file size.
+ //Should read all the records after the next sync.
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), sfd.indexOfKeyAfterFirstSync, 9999,
+ sfd.posBeforeFirstSync, 10000000);
+
+ // Should read all the records.
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, 9999, 0, -1);
+ }
+
+ class SequenceFileDetails {
+ public int posBeforeFirstSync;
+ public int indexOfKeyBeforeFirstSync;
+
+ public int posAfterFirstSync;
+ public int indexOfKeyAfterFirstSync;
+
+ public int posBeforeSecondSync;
+ public int indexOfKeyBeforeSecondSync;
+ }
+
+ public SequenceFileDetails getSequenceFileDetails(FileSystem inputFS, Path sequenceFileName) throws Exception {
+ SequenceFileDetails fd = new SequenceFileDetails();
+ SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
+
+ SequenceFileIterator iter = (SequenceFileIterator)hoplog.getReader().scan();;
+ int currentkeyStartPos = 0;
+ int cursorPos = 0;
+ String currentKey = null;
+ boolean firstSyncSeen = false;
+ try {
+ while (iter.hasNext()) {
+ iter.next();
+ currentkeyStartPos = cursorPos;
+ currentKey = ((String)CacheServerHelper.deserialize(iter.getKey()));
+ cursorPos = (int)iter.getPosition();
+ if (iter.syncSeen()){
+ if (firstSyncSeen) {
+
+ fd.posBeforeSecondSync = currentkeyStartPos;
+ fd.indexOfKeyBeforeSecondSync = Integer.parseInt(currentKey.substring(4));
+ break;
+ } else {
+ fd.posBeforeFirstSync = currentkeyStartPos;
+ fd.indexOfKeyBeforeFirstSync = Integer.parseInt(currentKey.substring(4));
+
+ fd.posAfterFirstSync = cursorPos;
+ fd.indexOfKeyAfterFirstSync = Integer.parseInt(currentKey.substring(4)) + 1;
+ firstSyncSeen = true;
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ assertTrue(e.toString(), false);
+ }
+ iter.close();
+ hoplog.close();
+ return fd;
+ }
+
+ public void testClear() throws Exception {
+ int count = 10;
+ int bucketId = (int) System.nanoTime();
+ HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager, bucketId);
+
+ // flush and create hoplog
+ ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+ for (int i = 0; i < count; i++) {
+ items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+ }
+ organizer.flush(items.iterator(), count);
+ organizer.closeCurrentWriter();
+ // check file existence in bucket directory
+ FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
+ assertEquals(1, hoplogs.length);
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+
+
+ // write another batch but do not close the data.
+ organizer.flush(items.iterator(), count);
+
+ organizer.clear();
+
+ hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
+ // check file existence in bucket directory
+ FileStatus[] expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+
+ // two expired hoplog should exists
+ assertEquals(2, expiredhoplogs.length);
+ assertEquals(2, hoplogs.length);
+ // check the expired hops name should be same
+ assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
+ expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+ assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
+ expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+
+ // Test that second time clear should be harmless and should not result in extra files.
+ organizer.clear();
+ hoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
+ // check file existence in bucket directory
+ expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId,
+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
+
+ // two expired hoplog should exists
+ assertEquals(2, expiredhoplogs.length);
+ assertEquals(2, hoplogs.length);
+ // check the expired hops name should be same
+ assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
+ expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+ assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) ||
+ expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) );
+
+
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
+ readSequenceFile(hdfsStore.getFileSystem(), hoplogs[1].getPath(), 0);
+ }
+
+ public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index) throws IOException{
+ readSequenceFile(inputFS, sequenceFileName, index, -1, 0, -1);
+ }
+ /**
+ * Reads the sequence file assuming that it has keys and values starting from index that
+ * is specified as parameter.
+ *
+ */
+ public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index, int endIndex,
+ int startoffset, int length) throws IOException {
+ SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
+
+ HoplogIterator<byte[], byte[]> iter = null;
+ if (length == -1){
+ iter = hoplog.getReader().scan();
+ }
+ else {
+ iter = hoplog.getReader().scan(startoffset, length);
+ }
+
+ try {
+ while (iter.hasNext()) {
+ iter.next();
+ PersistedEventImpl te = UnsortedHoplogPersistedEvent.fromBytes(iter.getValue());
+ String stringkey = ((String)CacheServerHelper.deserialize(iter.getKey()));
+ assertTrue("Expected key: key-" + index + ". Actual key: " + stringkey , ((String)stringkey).equals("key-" + index));
+ index++;
+ }
+ if (endIndex != -1)
+ assertTrue ("The keys should have been until key-"+ endIndex + " but they are until key-"+ (index-1), index == endIndex + 1) ;
+ } catch (Exception e) {
+ assertTrue(e.toString(), false);
+ }
+ iter.close();
+ hoplog.close();
+ }
+
+}