You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2021/06/24 13:17:26 UTC
[lucene] branch main updated: Parallel processing (#132)
This is an automated email from the ASF dual-hosted git repository.
mikemccand pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/lucene.git
The following commit(s) were added to refs/heads/main by this push:
new f1d54f7 Parallel processing (#132)
f1d54f7 is described below
commit f1d54f7c351961a4bc00ff0880d0ab1b7a4e9890
Author: balmukundblr <ba...@intel.com>
AuthorDate: Thu Jun 24 18:47:19 2021 +0530
Parallel processing (#132)
* Added a explicit Flush Task to flush data at Thread level once it completes the processing
* Included explicit flush per Thread level
* Done changes for parallel processing
* Removed extra brace
* Removed unused variable
* Removed unused variable initialization
* Did the required formating
* Refactored the code and added required comments & checks
---
.../org/apache/lucene/benchmark/Constants.java | 2 +
.../byTask/feeds/ReutersContentSource.java | 54 +++++++++++++++-------
.../benchmark/byTask/tasks/TaskSequence.java | 14 +++++-
.../lucene/benchmark/byTask/utils/Config.java | 9 ++++
4 files changed, 61 insertions(+), 18 deletions(-)
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java
index 53a1a25..86534ea 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java
@@ -26,4 +26,6 @@ public class Constants {
public static Boolean[] BOOLEANS = new Boolean[] {Boolean.FALSE, Boolean.TRUE};
public static final int DEFAULT_MAXIMUM_DOCUMENTS = Integer.MAX_VALUE;
+
+ public static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread";
}
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java
index 76c229f..8e040b7 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java
@@ -28,6 +28,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Locale;
+import org.apache.lucene.benchmark.Constants;
import org.apache.lucene.benchmark.byTask.utils.Config;
/**
@@ -50,8 +51,8 @@ public class ReutersContentSource extends ContentSource {
private ThreadLocal<DateFormatInfo> dateFormat = new ThreadLocal<>();
private Path dataDir = null;
private ArrayList<Path> inputFiles = new ArrayList<>();
- private int nextFile = 0;
- private int iteration = 0;
+ private int[] docCountArr;
+ private volatile boolean docCountArrCreated;
@Override
public void setConfig(Config config) {
@@ -100,21 +101,35 @@ public class ReutersContentSource extends ContentSource {
@Override
public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
- Path f = null;
- String name = null;
- synchronized (this) {
- if (nextFile >= inputFiles.size()) {
- // exhausted files, start a new round, unless forever set to false.
- if (!forever) {
- throw new NoMoreDataException();
- }
- nextFile = 0;
- iteration++;
- }
- f = inputFiles.get(nextFile++);
- name = f.toRealPath() + "_" + iteration;
+ if (docCountArrCreated == false) {
+ docCountArrInit();
}
+ int threadIndexSize = Thread.currentThread().getName().length();
+ int parallelTaskThreadSize = Constants.PARALLEL_TASK_THREAD_NAME_PREFIX.length();
+
+ // Extract ThreadIndex from unique ThreadName which is set with '"ParallelTaskThread-"+index',
+ // in TaskSequence.java's doParallelTasks()
+ int threadIndex =
+ Integer.parseInt(
+ Thread.currentThread()
+ .getName()
+ .substring(parallelTaskThreadSize + 1, threadIndexSize));
+
+ assert (threadIndex >= 0 && threadIndex < docCountArr.length)
+ : "Please check threadIndex or docCountArr length";
+ int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length;
+ int inFileSize = inputFiles.size();
+
+ // Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of
+ // Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
+ int fileIndex = stride % inFileSize;
+ int iteration = stride / inFileSize;
+ docCountArr[threadIndex]++;
+
+ Path f = inputFiles.get(fileIndex);
+ String name = f.toRealPath() + "_" + iteration;
+
try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
// First line is the date, 3rd is the title, rest is body
String dateStr = reader.readLine();
@@ -143,7 +158,12 @@ public class ReutersContentSource extends ContentSource {
@Override
public synchronized void resetInputs() throws IOException {
super.resetInputs();
- nextFile = 0;
- iteration = 0;
+ }
+
+ private synchronized void docCountArrInit() {
+ if (docCountArrCreated == false) {
+ docCountArr = new int[getConfig().getNumThreads()];
+ docCountArrCreated = true;
+ }
}
}
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
index f74aaab..bb049d4 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java
@@ -20,6 +20,7 @@ import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
+import org.apache.lucene.benchmark.Constants;
import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
import org.apache.lucene.benchmark.byTask.stats.TaskStats;
@@ -340,12 +341,23 @@ public class TaskSequence extends PerfTask {
initTasksArray();
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
+ // Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's
+ // docCountArrInit()
+ this.getRunData().getConfig().setNumThreads(t.length);
// prepare threads
int index = 0;
for (int k = 0; k < repetitions; k++) {
for (int i = 0; i < tasksArray.length; i++) {
final PerfTask task = tasksArray[i].clone();
- t[index++] = new ParallelTask(task);
+ t[index] = new ParallelTask(task);
+ // Setting unique ThreadName with index value which is used in ReuersContentSource.java's
+ // getNextDocData().Please make changes
+ // in ReuersContentSource.java's getNextDocData() for
+ // Integer.parseInt(Thread.currentThread().getName().substring(parallelTaskThreadSize + 1,
+ // threadIndexSize))
+ // before making any modifications here
+ t[index].setName(Constants.PARALLEL_TASK_THREAD_NAME_PREFIX + "-" + index);
+ index++;
}
}
// run threads
diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java
index 7470915..5eafb55 100644
--- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java
+++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java
@@ -54,6 +54,7 @@ public class Config {
private HashMap<String, Object> valByRound = new HashMap<>();
private HashMap<String, String> colForValByRound = new HashMap<>();
private String algorithmText;
+ private int numThreads = 1;
/**
* Read both algorithm and config properties.
@@ -113,6 +114,14 @@ public class Config {
}
}
+ public void setNumThreads(int numThreads) {
+ this.numThreads = numThreads;
+ }
+
+ public int getNumThreads() {
+ return numThreads;
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
private void printProps() {
System.out.println("------------> config properties:");