You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tika.apache.org by ta...@apache.org on 2022/11/15 20:54:15 UTC

[tika] branch main updated: Make unit test for FileSystemStatusReporter multithreaded.

This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 9432a73f1 Make unit test for FileSystemStatusReporter multithreaded.
9432a73f1 is described below

commit 9432a73f1a49092304c5ece2b3e3249e38670be4
Author: tballison <ta...@apache.org>
AuthorDate: Tue Nov 15 15:37:44 2022 -0500

    Make unit test for FileSystemStatusReporter multithreaded.
---
 .../reporters/fs/TestFileSystemStatusReporter.java | 120 ++++++++++++++++-----
 1 file changed, 91 insertions(+), 29 deletions(-)

diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
index 5a26be3f6..16296fa1c 100644
--- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
+++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java
@@ -22,9 +22,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.json.JsonMapper;
@@ -32,6 +40,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import org.apache.tika.pipes.PipesReporter;
 import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.async.AsyncStatus;
 import org.apache.tika.pipes.pipesiterator.PipesIterator;
@@ -67,40 +76,24 @@ public class TestFileSystemStatusReporter {
                     try {
                         Thread.sleep(50);
                     } catch (InterruptedException e) {
-                        //throw new RuntimeException(e);
+                        return;
                     }
                 }
             }
         });
         readerThread.start();
-        PipesResult.STATUS[] statuses = PipesResult.STATUS.values();
-        Map<PipesResult.STATUS, Long> written = new HashMap<>();
-        Random random = new Random();
-        for (int i = 0; i < 1000; i++) {
-            PipesResult.STATUS status = statuses[random.nextInt(statuses.length)];
-            PipesResult pipesResult = new PipesResult(status);
-            reporter.report(PipesIterator.COMPLETED_SEMAPHORE, pipesResult, 100l);
-            Long cnt = written.get(status);
-            if (cnt == null) {
-                written.put(status, 1l);
-            } else {
-                cnt++;
-                written.put(status, cnt);
-            }
-            if (i % 100 == 0) {
-                Thread.sleep(94);
-                reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)),
-                        TotalCountResult.STATUS.NOT_COMPLETED));
-            }
-        }
+
+        Map<PipesResult.STATUS, Long> total = runBatch(reporter, 10, 200);
+
+
         readerThread.interrupt();
         readerThread.join(1000);
         reporter.report(new TotalCountResult(30000, TotalCountResult.STATUS.COMPLETED));
         reporter.close();
         AsyncStatus asyncStatus = objectMapper.readValue(path.toFile(), AsyncStatus.class);
         Map<PipesResult.STATUS, Long> map = asyncStatus.getStatusCounts();
-        assertEquals(written.size(), map.size());
-        for (Map.Entry<PipesResult.STATUS, Long> e : written.entrySet()) {
+        assertEquals(total.size(), map.size());
+        for (Map.Entry<PipesResult.STATUS, Long> e : total.entrySet()) {
             assertTrue(map.containsKey(e.getKey()), e.getKey().toString());
             assertEquals(e.getValue(), map.get(e.getKey()), e.getKey().toString());
         }
@@ -109,11 +102,80 @@ public class TestFileSystemStatusReporter {
         assertEquals(TotalCountResult.STATUS.COMPLETED, asyncStatus.getTotalCountResult().getStatus());
     }
 
-    /*@Test
-    //need to turn this into an actual test
-    public void oneOff() throws Exception {
-        Path config = Paths.get("");
-        AsyncProcessor.main(new String[]{ config.toAbsolutePath().toString()});
-    }*/
+    private Map<PipesResult.STATUS, Long> runBatch(FileSystemStatusReporter reporter,
+                                                   int numThreads,
+                                                   int numIterations)
+            throws ExecutionException, InterruptedException {
+        ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
+        ExecutorCompletionService executorCompletionService =
+                new ExecutorCompletionService(executorService);
+        List<ReportWorker> workerList = new ArrayList<>();
+        for (int i = 0; i < numThreads; i++) {
+            ReportWorker reportWorker = new ReportWorker(reporter, numIterations);
+            workerList.add(reportWorker);
+            executorCompletionService.submit(reportWorker);
+        }
+
+        Map<PipesResult.STATUS, Long> total = new HashMap<>();
+        int finished = 0;
+        while (finished < numThreads) {
+            Future<Integer> future = executorCompletionService.poll();
+            if (future != null) {
+                future.get();
+                finished++;
+            }
+        }
+        for (ReportWorker r : workerList) {
+            Map<PipesResult.STATUS, Long> local = r.getWritten();
+            for (Map.Entry<PipesResult.STATUS, Long> e : local.entrySet()) {
+                Long t = total.get(e.getKey());
+                if (t == null) {
+                    t = e.getValue();
+                } else {
+                    t += e.getValue();
+                }
+                total.put(e.getKey(), t);
+            }
+        }
+        return total;
+    }
+
+    private class ReportWorker implements Callable<Integer> {
+        Map<PipesResult.STATUS, Long> written = new HashMap<>();
 
+        private final PipesReporter reporter;
+        private final int numIterations;
+        private ReportWorker(PipesReporter reporter, int numIterations) {
+            this.reporter = reporter;
+            this.numIterations = numIterations;
+        }
+        @Override
+        public Integer call() throws Exception {
+            PipesResult.STATUS[] statuses = PipesResult.STATUS.values();
+            Random random = new Random();
+            for (int i = 0; i < numIterations; i++) {
+                PipesResult.STATUS status = statuses[random.nextInt(statuses.length)];
+                PipesResult pipesResult = new PipesResult(status);
+
+                reporter.report(PipesIterator.COMPLETED_SEMAPHORE, pipesResult, 100l);
+                Long cnt = written.get(status);
+                if (cnt == null) {
+                    written.put(status, 1l);
+                } else {
+                    cnt++;
+                    written.put(status, cnt);
+                }
+                if (i % 100 == 0) {
+                    Thread.sleep(94);
+                    reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)),
+                            TotalCountResult.STATUS.NOT_COMPLETED));
+                }
+            }
+            return 1;
+        }
+
+        Map<PipesResult.STATUS, Long> getWritten() {
+            return written;
+        }
+    }
 }