You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/15 20:54:15 UTC
[tika] branch main updated: Make unit test for FileSystemStatusReporter multithreaded.
This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 9432a73f1 Make unit test for FileSystemStatusReporter multithreaded.
9432a73f1 is described below
commit 9432a73f1a49092304c5ece2b3e3249e38670be4
Author: tballison <ta...@apache.org>
AuthorDate: Tue Nov 15 15:37:44 2022 -0500
Make unit test for FileSystemStatusReporter multithreaded.
---
.../reporters/fs/TestFileSystemStatusReporter.java | 120 ++++++++++++++++-----
1 file changed, 91 insertions(+), 29 deletions(-)
diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
index 5a26be3f6..16296fa1c 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
@@ -22,9 +22,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
@@ -32,6 +40,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.apache.tika.pipes.PipesReporter;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.async.AsyncStatus;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
@@ -67,40 +76,24 @@ public class TestFileSystemStatusReporter {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
- //throw new RuntimeException(e);
+ return;
}
}
}
});
readerThread.start();
- PipesResult.STATUS[] statuses = PipesResult.STATUS.values();
- Map<PipesResult.STATUS, Long> written = new HashMap<>();
- Random random = new Random();
- for (int i = 0; i < 1000; i++) {
- PipesResult.STATUS status = statuses[random.nextInt(statuses.length)];
- PipesResult pipesResult = new PipesResult(status);
- reporter.report(PipesIterator.COMPLETED_SEMAPHORE, pipesResult, 100l);
- Long cnt = written.get(status);
- if (cnt == null) {
- written.put(status, 1l);
- } else {
- cnt++;
- written.put(status, cnt);
- }
- if (i % 100 == 0) {
- Thread.sleep(94);
- reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)),
- TotalCountResult.STATUS.NOT_COMPLETED));
- }
- }
+
+ Map<PipesResult.STATUS, Long> total = runBatch(reporter, 10, 200);
+
+
readerThread.interrupt();
readerThread.join(1000);
reporter.report(new TotalCountResult(30000, TotalCountResult.STATUS.COMPLETED));
reporter.close();
AsyncStatus asyncStatus = objectMapper.readValue(path.toFile(), AsyncStatus.class);
Map<PipesResult.STATUS, Long> map = asyncStatus.getStatusCounts();
- assertEquals(written.size(), map.size());
- for (Map.Entry<PipesResult.STATUS, Long> e : written.entrySet()) {
+ assertEquals(total.size(), map.size());
+ for (Map.Entry<PipesResult.STATUS, Long> e : total.entrySet()) {
assertTrue(map.containsKey(e.getKey()), e.getKey().toString());
assertEquals(e.getValue(), map.get(e.getKey()), e.getKey().toString());
}
@@ -109,11 +102,80 @@ public class TestFileSystemStatusReporter {
assertEquals(TotalCountResult.STATUS.COMPLETED, asyncStatus.getTotalCountResult().getStatus());
}
- /*@Test
- //need to turn this into an actual test
- public void oneOff() throws Exception {
- Path config = Paths.get("");
- AsyncProcessor.main(new String[]{ config.toAbsolutePath().toString()});
- }*/
+ private Map<PipesResult.STATUS, Long> runBatch(FileSystemStatusReporter reporter,
+ int numThreads,
+ int numIterations)
+ throws ExecutionException, InterruptedException {
+ ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+ ExecutorCompletionService executorCompletionService =
+ new ExecutorCompletionService(executorService);
+ List<ReportWorker> workerList = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ ReportWorker reportWorker = new ReportWorker(reporter, numIterations);
+ workerList.add(reportWorker);
+ executorCompletionService.submit(reportWorker);
+ }
+
+ Map<PipesResult.STATUS, Long> total = new HashMap<>();
+ int finished = 0;
+ while (finished < numThreads) {
+ Future<Integer> future = executorCompletionService.poll();
+ if (future != null) {
+ future.get();
+ finished++;
+ }
+ }
+ for (ReportWorker r : workerList) {
+ Map<PipesResult.STATUS, Long> local = r.getWritten();
+ for (Map.Entry<PipesResult.STATUS, Long> e : local.entrySet()) {
+ Long t = total.get(e.getKey());
+ if (t == null) {
+ t = e.getValue();
+ } else {
+ t += e.getValue();
+ }
+ total.put(e.getKey(), t);
+ }
+ }
+ return total;
+ }
+
+ private class ReportWorker implements Callable<Integer> {
+ Map<PipesResult.STATUS, Long> written = new HashMap<>();
+ private final PipesReporter reporter;
+ private final int numIterations;
+ private ReportWorker(PipesReporter reporter, int numIterations) {
+ this.reporter = reporter;
+ this.numIterations = numIterations;
+ }
+ @Override
+ public Integer call() throws Exception {
+ PipesResult.STATUS[] statuses = PipesResult.STATUS.values();
+ Random random = new Random();
+ for (int i = 0; i < numIterations; i++) {
+ PipesResult.STATUS status = statuses[random.nextInt(statuses.length)];
+ PipesResult pipesResult = new PipesResult(status);
+
+ reporter.report(PipesIterator.COMPLETED_SEMAPHORE, pipesResult, 100l);
+ Long cnt = written.get(status);
+ if (cnt == null) {
+ written.put(status, 1l);
+ } else {
+ cnt++;
+ written.put(status, cnt);
+ }
+ if (i % 100 == 0) {
+ Thread.sleep(94);
+ reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)),
+ TotalCountResult.STATUS.NOT_COMPLETED));
+ }
+ }
+ return 1;
+ }
+
+ Map<PipesResult.STATUS, Long> getWritten() {
+ return written;
+ }
+ }
}