You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2020/04/21 16:28:49 UTC

[lucene-solr] branch branch_8x updated: LUCENE-9191: make LineFileDocs random seeking more efficient by recording safe skip points in the concatenated gzip'd chunks

This is an automated email from the ASF dual-hosted git repository.

mikemccand pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 722df4f  LUCENE-9191: make LineFileDocs random seeking more efficient by recording safe skip points in the concatenated gzip'd chunks
722df4f is described below

commit 722df4ff758f89a5961863ec805e3d3f75041a48
Author: Mike McCandless <mi...@apache.org>
AuthorDate: Tue Apr 21 12:09:17 2020 -0400

    LUCENE-9191: make LineFileDocs random seeking more efficient by recording safe skip points in the concatenated gzip'd chunks
---
 dev-tools/scripts/create_line_file_docs.py         | 247 +++++++++++++++++++++
 lucene/CHANGES.txt                                 |   3 +
 .../java/org/apache/lucene/util/LineFileDocs.java  | 103 ++++++---
 .../org/apache/lucene/util/europarl.lines.txt.gz   | Bin 5730708 -> 9695474 bytes
 .../org/apache/lucene/util/europarl.lines.txt.seek |  19 ++
 5 files changed, 338 insertions(+), 34 deletions(-)

diff --git a/dev-tools/scripts/create_line_file_docs.py b/dev-tools/scripts/create_line_file_docs.py
new file mode 100644
index 0000000..875cd65
--- /dev/null
+++ b/dev-tools/scripts/create_line_file_docs.py
@@ -0,0 +1,247 @@
+import os
+import gzip
+import time
+import random
+import re
+import urllib.request
+import subprocess
+import tempfile
+import shutil
+
+DEBUG = False
+
+TARGET_DOC_CHARS = 1024
+
+def compress_with_seek_points(file_name_in, file_name_out, num_seek_points):
+
+  bytes_per_chunk = os.path.getsize(file_name_in) / num_seek_points
+
+  seek_points = []
+
+  if os.path.exists(file_name_out):
+    os.remove(file_name_out)
+
+  with open(file_name_in, 'rb') as f_in:
+
+    f_out = None
+
+    bytes_in_chunk = 0
+
+    chunk_count = 0
+
+    while True:
+      if f_out is None:
+        if os.path.exists(file_name_out):
+          seek_points.append(os.path.getsize(file_name_out))
+          print('  create chunk %s at pos=%s' % (chunk_count, seek_points[-1]))
+        else:
+          print('  create chunk %s at pos=0' % chunk_count)
+        f_out = gzip.open(file_name_out, 'ab')
+        chunk_count += 1
+
+      line = f_in.readline()
+      if len(line) == 0:
+        break
+
+      bytes_in_chunk += len(line)
+      f_out.write(line)
+
+      if bytes_in_chunk > bytes_per_chunk and chunk_count < num_seek_points:
+        f_out.close()
+        f_out = None
+        bytes_in_chunk = 0
+
+  with open(file_name_out[:-3] + '.seek', 'w') as f_out:
+    for seek_point in seek_points:
+      f_out.write('%d\n' % seek_point)
+
+re_tag = re.compile('<[^>]+?>')
+re_newlines = re.compile('\n+')
+re_space = re.compile('\s')
+
+# used to find word break, for splitting docs into ~1 KB sized smaller docs:
+re_next_non_word_character = re.compile('\W', re.U)
+
+EUROPARL_V7_URL = 'https://www.statmt.org/europarl/v7/europarl.tgz'
+
+def split_docs(all_out, title_string, date_string, body_string):
+
+  '''
+  Splits docs into smallish (~1 KB) sized docs, repeating same title and date
+  '''
+
+  doc_count = 0
+  while len(body_string) > 0:
+    char_count = int(random.gauss(TARGET_DOC_CHARS, TARGET_DOC_CHARS/4))
+    if char_count < 64:
+      # trimmed normal?
+      continue
+
+    m = re_next_non_word_character.search(body_string, char_count)
+    if m is not None:
+      char_count = m.start(0)
+    else:
+      char_count = len(body_string)
+
+    body_string_fragment = body_string[:char_count].strip()
+    
+    #print('write title %d, body %d' % (len(title_string), len(body_string_fragment)))
+    all_out.write('%s\t%s\t%s\n' % (title_string, date_string, body_string_fragment))
+    body_string = body_string[char_count:]
+    doc_count += 1
+
+  return doc_count
+
+def sample_europarl():
+
+  # download europarl.tgz v7, if not already here (in cwd):
+  file_name = 'europarl.tgz'
+  if not os.path.exists(file_name):
+    print('Download %s to %s...' % (EUROPARL_V7_URL, file_name))
+    urllib.request.urlretrieve(EUROPARL_V7_URL, file_name + '.tmp')
+    os.rename(file_name + '.tmp', file_name)
+  else:
+    print('%s already here; skipping download...' % file_name)
+
+  if not DEBUG:
+    tmp_dir_path = tempfile.mkdtemp()
+  else:
+    tmp_dir_path = '/tmp/tmp31ekzg75'
+  print('Using tmp dir "%s"...' % tmp_dir_path)
+  try:
+    if not DEBUG:
+      cmd = 'tar xzf %s -C %s' % (file_name, tmp_dir_path)
+      print('Run: %s' % cmd)
+      subprocess.run(cmd, shell=True)
+
+    doc_count = 0
+    skip_count = 0
+    file_count = 0
+
+    all_txt_file_name = '%s/all.txt' % tmp_dir_path
+
+    print('Extract text...')
+
+    start_time = time.time()
+    next_print_time = start_time + 3
+    # normalize text a bit and concatenate all lines into single file, counting total lines/bytes
+    with open(all_txt_file_name, 'w', encoding='utf-8') as all_out:
+      for dir_path, dir_names, file_names in os.walk('%s/txt' % tmp_dir_path):
+        for file_name in file_names:
+          if file_name.endswith('.txt'):
+            file_count += 1
+
+            year, month, day = (int(x) for x in file_name[3:-4].split('-')[:3])
+            if year >= 50:
+              year = 1900 + year
+            else:
+              year = 2000 + year
+
+            date_string = '%04d-%02d-%02d' % (year, month, day)
+            
+            # unfortunately we need errors='ignore' since in Europarl v7, one file (pl/ep-09-10-22-009.txt) has invalid utf-8:
+            chapter_count = 0
+            with open('%s/%s' % (dir_path, file_name), 'r', encoding='utf-8', errors='ignore') as f_in:
+              last_text = []
+              last_title = None
+              while True:
+                line = f_in.readline()
+                if line == '':
+                  break
+                line = line.strip()
+                if line.startswith('<CHAPTER '):
+                  if last_title is not None:
+                    s = ' '.join(last_text)
+                    s = re_tag.sub(' ', s)
+                    s = re_newlines.sub(' ', s)
+                    s = s.strip()
+                    if len(s) > 0:
+                      doc_count += split_docs(all_out, last_title, date_string, s)
+                    else:
+                      skip_count += 1
+                      
+                    last_text = []
+                    chapter_count += 1
+                  while True:
+                    last_title = f_in.readline()
+                    if last_title == '':
+                      last_title = None
+                      break
+                    last_title = re_tag.sub(' ', last_title).strip()
+                    if len(last_title) > 0:
+                      break
+                  continue
+                else:
+                  last_text.append(line)
+
+              if last_title is not None:
+                s = ' '.join(last_text)
+                s = re_tag.sub(' ', s)
+                s = re_newlines.sub(' ', s)
+                s = s.strip()
+                if len(s) > 0:
+                  doc_count += split_docs(all_out, last_title, date_string, s)
+                else:
+                  skip_count += 1
+                chapter_count += 1
+              else:
+                skip_count += 1
+
+              if chapter_count > 0:
+                #print('%s/%s: %d chapters' % (dir_path, file_name, chapter_count))
+                pass
+
+            now = time.time()
+            if now > next_print_time:
+              print('%4.1fs: keep %.2f K of %.2f K files (%.1f%%), %.2f M docs, %.2f GB...' % \
+                    (now - start_time, (file_count - skip_count) / 1000, file_count / 1000,
+                     100 * (file_count - skip_count) / file_count,
+                     doc_count / 1000000, all_out.tell() / 1024/1024/1024))
+              while next_print_time < now:
+                next_print_time += 3
+
+    total_mb = os.path.getsize(all_txt_file_name)/1024/1024
+    now = time.time()
+    print('%4.1fs (done): keep %.2f K of %.2f K files (%.1f%%), %.2f M docs, %.2f GB...' % \
+          (now - start_time, (file_count - skip_count) / 1000, file_count / 1000,
+           100 * (file_count - skip_count) / file_count,
+           doc_count / 1000000, os.path.getsize(all_txt_file_name) / 1024/1024/1024))
+
+    print('Shuffle...')
+    subprocess.run('shuf %s > %s.shuffled' % (all_txt_file_name, all_txt_file_name), shell=True)
+
+    for mb in (20, 200, 2000):
+      print('Sample %d MB file...' % mb)
+      file_name_out = '%dmb.txt' % mb
+      with open(file_name_out, 'w', encoding='utf-8') as f_out:
+
+        chance = mb / total_mb
+
+        with open(all_txt_file_name + '.shuffled', 'r', encoding='utf-8') as f:
+
+          while True:
+            line = f.readline()
+            if len(line) == 0:
+              break
+            if random.random() <= chance:
+              f_out.write(line)
+
+      print('  got %.2f MB' % (os.path.getsize(file_name_out)/1024/1024))
+
+      compress_with_seek_points(file_name_out,
+                                file_name_out + '.gz',
+                                mb)
+            
+  finally:
+    print('Removing tmp dir "%s"...' % tmp_dir_path)
+    if not DEBUG:
+      shutil.rmtree(tmp_dir_path)
+
+  print('\nWARNING: left ./europarl.tgz, which you should delete if you do not want it!\n')
+
+if False:
+  compress_with_seek_points('/x/tmp/europarl.lines.txt',
+                            '/x/tmp/foo.txt.gz',
+                            16)
+else:
+  sample_europarl()
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 5502183..2c7c9f5c 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -92,6 +92,9 @@ Other
 * LUCENE-9271: ByteBufferIndexInput was refactored to work on top of the
   ByteBuffer API. (Adrien Grand)
 
+* LUCENE-9191: Make LineFileDocs's random seeking more efficient, making tests using LineFileDocs faster (Robert Muir,
+  Mike McCandless)
+
 ======================= Lucene 8.5.1 =======================
 
 Bug Fixes
diff --git a/lucene/test-framework/src/java/org/apache/lucene/util/LineFileDocs.java b/lucene/test-framework/src/java/org/apache/lucene/util/LineFileDocs.java
index 2f5dc74..fa409d1 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/util/LineFileDocs.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/util/LineFileDocs.java
@@ -29,6 +29,8 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.GZIPInputStream;
@@ -60,7 +62,7 @@ public class LineFileDocs implements Closeable {
   public LineFileDocs(Random random, String path) throws IOException {
     this.path = path;
     this.random = new Random(random.nextLong());
-    open(random);
+    open();
   }
 
   public LineFileDocs(Random random) throws IOException {
@@ -74,24 +76,31 @@ public class LineFileDocs implements Closeable {
   }
   
   private long randomSeekPos(Random random, long size) {
-    if (random == null || size <= 3L)
+    if (random == null || size <= 3L) {
       return 0L;
-    return (random.nextLong()&Long.MAX_VALUE) % (size/3);
+    } else {
+      return (random.nextLong()&Long.MAX_VALUE) % (size/3);
+    }
   }
 
-  private synchronized void open(Random random) throws IOException {
+  private synchronized void open() throws IOException {
     InputStream is = getClass().getResourceAsStream(path);
-    boolean needSkip = true;
+
+    // true if the InputStream is not already randomly seek'd after the if/else block below:
+    boolean needSkip;
+    boolean skipFirstLineFragment = false;
+    
     long size = 0L, seekTo = 0L;
     if (is == null) {
-      // if it's not in classpath, we load it as absolute filesystem path (e.g. Hudson's home dir)
+      // if it's not in classpath, we load it as absolute filesystem path (e.g. Jenkins' home dir)
       Path file = Paths.get(path);
       size = Files.size(file);
       if (path.endsWith(".gz")) {
-        // if it is a gzip file, we need to use InputStream and slowly skipTo:
+        // if it is a gzip file, we need to use InputStream and seek to one of the pre-computed skip points:
         is = Files.newInputStream(file);
+        needSkip = true;
       } else {
-        // optimized seek using SeekableByteChannel
+        // file is not compressed: optimized seek using SeekableByteChannel
         seekTo = randomSeekPos(random, size);
         final SeekableByteChannel channel = Files.newByteChannel(file);
         if (LuceneTestCase.VERBOSE) {
@@ -99,35 +108,61 @@ public class LineFileDocs implements Closeable {
         }
         channel.position(seekTo);
         is = Channels.newInputStream(channel);
+
+        // we (likely) seeked to the middle of a line:
+        skipFirstLineFragment = true;
+
         needSkip = false;
       }
     } else {
       // if the file comes from Classpath:
       size = is.available();
+      needSkip = true;
     }
-    
-    if (path.endsWith(".gz")) {
-      is = new GZIPInputStream(is);
-      // guestimate:
-      size *= 2.8;
-    }
-    
-    // If we only have an InputStream, we need to seek now,
-    // but this seek is a scan, so very inefficient!!!
+
     if (needSkip) {
-      seekTo = randomSeekPos(random, size);
-      if (LuceneTestCase.VERBOSE) {
-        System.out.println("TEST: LineFileDocs: stream skip to fp=" + seekTo + " on open");
+
+      // LUCENE-9191: use the optimized (pre-computed, using dev-tools/scripts/create_line_file_docs.py)
+      // seek file, so we can seek in a gzip'd file
+
+      int index = path.lastIndexOf('.');
+      if (index == -1) {
+        throw new IllegalArgumentException("could not determine extension for path \"" + path + "\"");
+      }
+
+      // e.g. foo.txt --> foo.seek, foo.txt.gz --> foo.txt.seek
+      String seekFilePath = path.substring(0, index) + ".seek";
+      InputStream seekIS = getClass().getResourceAsStream(seekFilePath);
+      if (seekIS == null) {
+        seekIS = Files.newInputStream(Paths.get(seekFilePath));
+      }
+
+      try (BufferedReader reader = new BufferedReader(new InputStreamReader(seekIS,
+                                                                    StandardCharsets.UTF_8))) {
+        List<Long> skipPoints = new ArrayList<>();
+
+        // explicitly insert implicit 0 as the first skip point:
+        skipPoints.add(0L);
+        
+        while (true) {
+          String line = reader.readLine();
+          if (line == null) {
+            break;
+          }
+          skipPoints.add(Long.parseLong(line.trim()));
+        }
+
+        seekTo = skipPoints.get(random.nextInt(skipPoints.size()));
+
+        // dev-tools/scripts/create_line_file_docs.py ensures this is a "safe" skip point, and we
+        // can begin gunziping from here:
+        is.skip(seekTo);
+        is = new GZIPInputStream(is);
+
+        if (LuceneTestCase.VERBOSE) {
+          System.out.println("TEST: LineFileDocs: stream skip to fp=" + seekTo + " on open");
+        }
       }
-      is.skip(seekTo);
-    }
-    
-    // if we seeked somewhere, read until newline char
-    if (seekTo > 0L) {
-      int b;
-      do {
-        b = is.read();
-      } while (b >= 0 && b != 13 && b != 10);
     }
     
     CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder()
@@ -135,16 +170,16 @@ public class LineFileDocs implements Closeable {
         .onUnmappableCharacter(CodingErrorAction.REPORT);
     reader = new BufferedReader(new InputStreamReader(is, decoder), BUFFER_SIZE);
     
-    if (seekTo > 0L) {
-      // read one more line, to make sure we are not inside a Windows linebreak (\r\n):
+    if (skipFirstLineFragment) {
+      // read until end of line:
       reader.readLine();
     }
   }
 
-  public synchronized void reset(Random random) throws IOException {
+  public synchronized void reset() throws IOException {
     reader.close();
     reader = null;
-    open(random);
+    open();
     id.set(0);
   }
 
@@ -209,7 +244,7 @@ public class LineFileDocs implements Closeable {
         }
         reader.close();
         reader = null;
-        open(null);
+        open();
         line = reader.readLine();
       }
     }
diff --git a/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt.gz b/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt.gz
index e0366f1..90839ee 100644
Binary files a/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt.gz and b/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt.gz differ
diff --git a/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt.seek b/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt.seek
new file mode 100644
index 0000000..525ecfe
--- /dev/null
+++ b/lucene/test-framework/src/resources/org/apache/lucene/util/europarl.lines.txt.seek
@@ -0,0 +1,19 @@
+485680
+971105
+1452723
+1940496
+2424299
+2909775
+3397858
+3881659
+4365085
+4849651
+5334212
+5817709
+6306315
+6790697
+7276765
+7760619
+8245188
+8733672
+9220523