You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@jackrabbit.apache.org by GitBox <gi...@apache.org> on 2022/09/21 10:04:33 UTC

[GitHub] [jackrabbit-oak] amit-jain commented on a diff in pull request #587: OAK-9790 - Implement parallel indexing for speeding up oak run indexing command

amit-jain commented on code in PR #587:
URL: https://github.com/apache/jackrabbit-oak/pull/587#discussion_r969237928


##########
oak-run-commons/pom.xml:
##########
@@ -147,6 +147,11 @@
             <artifactId>h2</artifactId>
             <version>${h2.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.lz4</groupId>
+            <artifactId>lz4-java</artifactId>
+            <version>1.8.0</version>
+        </dependency>

Review Comment:
   dependency optional?



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java:
##########
@@ -79,6 +84,8 @@ public abstract class DocumentStoreIndexerBase implements Closeable{
     protected final IndexerSupport indexerSupport;
     private final Set<String> indexerPaths = new HashSet<>();
     private static final int MAX_DOWNLOAD_ATTEMPTS = Integer.parseInt(System.getProperty("oak.indexer.maxDownloadRetries", "5")) + 1;
+    private final boolean parallelIndex = Boolean.parseBoolean(System.getProperty(FlatFileNodeStoreBuilder.OAK_INDEXER_PARALLEL_INDEX, "false"));

Review Comment:
   There are too many system properties, scattered around for the implementation. It would help if there's some kind of a Configurator object where these are defined with default values set to maintain status quo



##########
oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterUtils.java:
##########
@@ -33,12 +35,13 @@
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
 import org.apache.lucene.analysis.shingle.ShingleAnalyzerWrapper;
+import org.apache.lucene.index.ConcurrentMergeScheduler;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.index.SerialMergeScheduler;
 
-import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.VERSION;
-
 public class IndexWriterUtils {
+    private static final int INDEX_WRITER_MAX_MERGE = 8;
+    private static final int INDEX_WRITER_MAX_THREAD = 8;

Review Comment:
   With default set to 1



##########
oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/DefaultIndexWriter.java:
##########
@@ -180,25 +187,26 @@ IndexWriter getWriter() throws IOException {
      * @param analyzer the analyzer used to update the suggester
      */
     private boolean updateSuggester(Analyzer analyzer, Calendar currentTime) throws IOException {
-        final Closer closer = Closer.create();
-
-        NodeBuilder suggesterStatus = definitionBuilder.child(suggestDirName);
-        DirectoryReader reader = closer.register(DirectoryReader.open(writer, false));
-        final Directory suggestDirectory =
-            directoryFactory.newInstance(definition, definitionBuilder, suggestDirName, false);
-        // updateSuggester would close the directory (directly or via lookup)
-        // closer.register(suggestDirectory);
-        boolean updated = false;
-        try {
-            SuggestHelper.updateSuggester(suggestDirectory, analyzer, reader, closer);
-            suggesterStatus.setProperty("lastUpdated", ISO8601.format(currentTime), Type.DATE);
-            updated = true;
-        } catch (Throwable e) {
-            log.warn("could not update suggester", e);
-        } finally {
-            closer.close();
+        synchronized (this) {
+            final Closer closer = Closer.create();
+
+            NodeBuilder suggesterStatus = definitionBuilder.child(suggestDirName);
+            DirectoryReader reader = closer.register(DirectoryReader.open(writer, false));

Review Comment:
   Didn't understand the need for synchronization here as this is called from close() which in turn is called from indexer.close()



##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileSplitter.java:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.api.Type;
+import org.apache.jackrabbit.oak.commons.Compression;
+import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntry;
+import org.apache.jackrabbit.oak.plugins.index.search.Aggregate;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.query.ast.NodeTypeInfo;
+import org.apache.jackrabbit.oak.query.ast.NodeTypeInfoProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Stack;
+import java.util.stream.Collectors;
+
+import static org.apache.jackrabbit.JcrConstants.JCR_PRIMARYTYPE;
+import static org.apache.jackrabbit.JcrConstants.NT_BASE;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_SPLIT_STORE_SIZE;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_USE_LZ4;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.OAK_INDEXER_USE_ZIP;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_SPLIT_STORE_SIZE;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createReader;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
+import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.getSortedStoreFileName;
+
+/**
+ * This class is being used when {@link FlatFileNodeStoreBuilder.OAK_INDEXER_PARALLEL_INDEX} is set to true.
+ * It will split a flat file safely by checking the index definitions. An entry is considered safe to split if only
+ * none of the parent directories contains nodes in indexRule and aggregate fields of the provided index definitions.
+ */
+public class FlatFileSplitter {
+    private static final Logger log = LoggerFactory.getLogger(FlatFileSplitter.class);
+
+    private static final String SPLIT_DIR_NAME = "split";
+    private static final long MINIMUM_SPLIT_THRESHOLD = 10 * FileUtils.ONE_MB;
+
+    private final File workDir;
+    private final NodeTypeInfoProvider infoProvider;
+    private final File flatFile;
+    private final NodeStateEntryReader entryReader;
+    private final Compression.Algorithm algorithm;
+    private Set<IndexDefinition> indexDefinitions;
+    private Set<String> splitNodeTypeNames;
+    private long minimumSplitThreshold = MINIMUM_SPLIT_THRESHOLD;
+    private int splitSize = Integer.getInteger(PROP_SPLIT_STORE_SIZE, DEFAULT_NUMBER_OF_SPLIT_STORE_SIZE);
+    private boolean useCompression = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_USE_ZIP, "true"));
+    private boolean useLZ4 = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_USE_LZ4, "false"));
+
+    public FlatFileSplitter(File flatFile, File workdir, NodeTypeInfoProvider infoProvider, NodeStateEntryReader entryReader,
+            Set<IndexDefinition> indexDefinitions) {
+        this.flatFile = flatFile;
+        this.indexDefinitions = indexDefinitions;
+        this.workDir = new File(workdir, SPLIT_DIR_NAME);
+
+        this.infoProvider = infoProvider;
+        this.entryReader = entryReader;
+
+        Compression.Algorithm algorithm = Compression.Algorithm.GZIP;
+        if (!useCompression) {
+            algorithm = Compression.Algorithm.NONE;
+        } else if (useLZ4) {
+            algorithm = Compression.Algorithm.LZ4;
+        }
+        this.algorithm = algorithm;
+    }
+
+    private List<File> returnOriginalFlatFile() {
+        return Collections.singletonList(flatFile);
+    }
+
+    public List<File> split() throws IOException {
+        return split(true);
+    }
+
+    public List<File> split(boolean deleteOriginal) throws IOException {
+        List<File> splitFlatFiles = new ArrayList<>();
+        try {
+            FileUtils.forceMkdir(workDir);
+        } catch (IOException e) {
+            log.error("failed to create split directory {}", workDir.getAbsolutePath());
+            return returnOriginalFlatFile();
+        }
+
+        long fileSizeInBytes = flatFile.length();
+        long splitThreshold = Math.round((double) (fileSizeInBytes / splitSize));
+        log.info("original flat file size: ~{}",  FileUtils.byteCountToDisplaySize(fileSizeInBytes));
+        log.info("split threshold is ~{} bytes, estimate split size >={} files",  FileUtils.byteCountToDisplaySize(splitThreshold), splitSize);
+
+        // return original if file too small or split size equals 1

Review Comment:
   The log statements are not listing the files but only logging the conditions for split which is then subsequently checked, think its better to have these logged before returning



##########
oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/Compression.java:
##########
@@ -0,0 +1,68 @@
+package org.apache.jackrabbit.oak.commons;
+
+import net.jpountz.lz4.LZ4FrameInputStream;
+import net.jpountz.lz4.LZ4FrameOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * This class provides a common list of support compression algorithms and some utility functions.
+ * It is mainly used by intermediate stored files in {@link org.apache.jackrabbit.oak.commons.sort.ExternalSort} and
+ * sort/index utilities in {@link org.apache.jackrabbit.oak.index.indexer.document.flatfile}.
+ */
+public class Compression {

Review Comment:
   Not required to enclose enum within a class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org