You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2021/06/24 13:17:26 UTC

[lucene] branch main updated: Parallel processing (#132)

This is an automated email from the ASF dual-hosted git repository.

mikemccand pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/main by this push:
     new f1d54f7  Parallel processing (#132)
f1d54f7 is described below

commit f1d54f7c351961a4bc00ff0880d0ab1b7a4e9890
Author: balmukundblr <ba...@intel.com>
AuthorDate: Thu Jun 24 18:47:19 2021 +0530

    Parallel processing (#132)
    
    * Added a explicit Flush Task to flush data at Thread level once it completes the processing
    
    * Included explicit flush per Thread level
    
    * Done changes for parallel processing
    
    * Removed extra brace
    
    * Removed unused variable
    
    * Removed unused variable initialization
    
    * Did the required formating
    
    * Refactored the code and added required comments & checks
---
 .../org/apache/lucene/benchmark/Constants.java     |  2 +
 .../byTask/feeds/ReutersContentSource.java         | 54 +++++++++++++++-------
 .../benchmark/byTask/tasks/TaskSequence.java       | 14 +++++-
 .../lucene/benchmark/byTask/utils/Config.java      |  9 ++++
 4 files changed, 61 insertions(+), 18 deletions(-)

diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java
index 53a1a25..86534ea 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java
@@ -26,4 +26,6 @@ public class Constants {
   public static Boolean[] BOOLEANS = new Boolean[] {Boolean.FALSE, Boolean.TRUE};
 
   public static final int DEFAULT_MAXIMUM_DOCUMENTS = Integer.MAX_VALUE;
+
+  public static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread";
 }
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java
index 76c229f..8e040b7 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java
@@ -28,6 +28,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Locale;
+import org.apache.lucene.benchmark.Constants;
 import org.apache.lucene.benchmark.byTask.utils.Config;
 
 /**
@@ -50,8 +51,8 @@ public class ReutersContentSource extends ContentSource {
   private ThreadLocal<DateFormatInfo> dateFormat = new ThreadLocal<>();
   private Path dataDir = null;
   private ArrayList<Path> inputFiles = new ArrayList<>();
-  private int nextFile = 0;
-  private int iteration = 0;
+  private int[] docCountArr;
+  private volatile boolean docCountArrCreated;
 
   @Override
   public void setConfig(Config config) {
@@ -100,21 +101,35 @@ public class ReutersContentSource extends ContentSource {
 
   @Override
   public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
-    Path f = null;
-    String name = null;
-    synchronized (this) {
-      if (nextFile >= inputFiles.size()) {
-        // exhausted files, start a new round, unless forever set to false.
-        if (!forever) {
-          throw new NoMoreDataException();
-        }
-        nextFile = 0;
-        iteration++;
-      }
-      f = inputFiles.get(nextFile++);
-      name = f.toRealPath() + "_" + iteration;
+    if (docCountArrCreated == false) {
+      docCountArrInit();
     }
 
+    int threadIndexSize = Thread.currentThread().getName().length();
+    int parallelTaskThreadSize = Constants.PARALLEL_TASK_THREAD_NAME_PREFIX.length();
+
+    // Extract ThreadIndex from unique ThreadName which is set with '"ParallelTaskThread-"+index',
+    // in TaskSequence.java's doParallelTasks()
+    int threadIndex =
+        Integer.parseInt(
+            Thread.currentThread()
+                .getName()
+                .substring(parallelTaskThreadSize + 1, threadIndexSize));
+
+    assert (threadIndex >= 0 && threadIndex < docCountArr.length)
+        : "Please check threadIndex or docCountArr length";
+    int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length;
+    int inFileSize = inputFiles.size();
+
+    // Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of
+    // Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
+    int fileIndex = stride % inFileSize;
+    int iteration = stride / inFileSize;
+    docCountArr[threadIndex]++;
+
+    Path f = inputFiles.get(fileIndex);
+    String name = f.toRealPath() + "_" + iteration;
+
     try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
       // First line is the date, 3rd is the title, rest is body
       String dateStr = reader.readLine();
@@ -143,7 +158,12 @@ public class ReutersContentSource extends ContentSource {
   @Override
   public synchronized void resetInputs() throws IOException {
     super.resetInputs();
-    nextFile = 0;
-    iteration = 0;
+  }
+
+  private synchronized void docCountArrInit() {
+    if (docCountArrCreated == false) {
+      docCountArr = new int[getConfig().getNumThreads()];
+      docCountArrCreated = true;
+    }
   }
 }
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
index f74aaab..bb049d4 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
@@ -20,6 +20,7 @@ import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
+import org.apache.lucene.benchmark.Constants;
 import org.apache.lucene.benchmark.byTask.PerfRunData;
 import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
 import org.apache.lucene.benchmark.byTask.stats.TaskStats;
@@ -340,12 +341,23 @@ public class TaskSequence extends PerfTask {
 
     initTasksArray();
     ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
+    // Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's
+    // docCountArrInit()
+    this.getRunData().getConfig().setNumThreads(t.length);
     // prepare threads
     int index = 0;
     for (int k = 0; k < repetitions; k++) {
       for (int i = 0; i < tasksArray.length; i++) {
         final PerfTask task = tasksArray[i].clone();
-        t[index++] = new ParallelTask(task);
+        t[index] = new ParallelTask(task);
+        // Setting unique ThreadName with index value which is used in ReuersContentSource.java's
+        // getNextDocData().Please make changes
+        // in ReuersContentSource.java's getNextDocData() for
+        // Integer.parseInt(Thread.currentThread().getName().substring(parallelTaskThreadSize + 1,
+        // threadIndexSize))
+        // before making any modifications here
+        t[index].setName(Constants.PARALLEL_TASK_THREAD_NAME_PREFIX + "-" + index);
+        index++;
       }
     }
     // run threads
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java
index 7470915..5eafb55 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java
@@ -54,6 +54,7 @@ public class Config {
   private HashMap<String, Object> valByRound = new HashMap<>();
   private HashMap<String, String> colForValByRound = new HashMap<>();
   private String algorithmText;
+  private int numThreads = 1;
 
   /**
    * Read both algorithm and config properties.
@@ -113,6 +114,14 @@ public class Config {
     }
   }
 
+  public void setNumThreads(int numThreads) {
+    this.numThreads = numThreads;
+  }
+
+  public int getNumThreads() {
+    return numThreads;
+  }
+
   @SuppressWarnings({"unchecked", "rawtypes"})
   private void printProps() {
     System.out.println("------------> config properties:");