You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@jackrabbit.apache.org by GitBox <gi...@apache.org> on 2021/01/19 15:43:09 UTC

[GitHub] [jackrabbit-oak] nit0906 opened a new pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

nit0906 opened a new pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] thomasmueller commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
thomasmueller commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560764575



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);

Review comment:
       e.printStackTrace is a bit strange if you also log the error... is it needed?
   

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);
+        }
+    }
+
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, Closer closer)
+            throws IOException, CommitFailedException {
+        IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
+
+        // TODO : See if we need to support dumpIndexStats and index defs for elastic - not needed for now
+        //dumpIndexStats(indexOpts, indexHelper);
+        //dumpIndexDefinitions(indexOpts, indexHelper);
+        reindexOperation(indexOpts, indexHelper);
+    }
+
+    private IndexHelper createIndexHelper(NodeStoreFixture fixture,
+                                          IndexOptions indexOpts, Closer closer) throws IOException {
+        IndexHelper indexHelper = new IndexHelper(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(),
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), computeIndexPaths(indexOpts));
+
+        // TODO : See if pre text extraction is needed for elastic
+        //configurePreExtractionSupport(indexOpts, indexHelper);
+
+        closer.register(indexHelper);
+        return indexHelper;
+    }
+
+    private List<String> computeIndexPaths(IndexOptions indexOpts) throws IOException {
+        //Combine the indexPaths from json and cli args
+        Set<String> indexPaths = new LinkedHashSet<>(indexOpts.getIndexPaths());
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            IndexDefinitionUpdater updater = new IndexDefinitionUpdater(definitions);
+            Set<String> indexPathsFromJson = updater.getIndexPaths();
+            Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
+            if (!diff.isEmpty()) {
+                log.info("Augmenting the indexPaths with {} which are present in {}", diff, definitions);
+            }
+            indexPaths.addAll(indexPathsFromJson);
+        }
+        return new ArrayList<>(indexPaths);
+    }
+
+    private void reindexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
+        if (!indexOpts.isReindex()) {
+            return;
+        }
+
+        String checkpoint = indexOpts.getCheckpoint();
+        reindex(indexOpts, indexHelper, checkpoint);
+    }
+
+    private void reindex(IndexOptions idxOpts, IndexHelper indexHelper, String checkpoint) throws IOException, CommitFailedException {
+        Preconditions.checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
+
+        Stopwatch w = Stopwatch.createStarted();
+        IndexerSupport indexerSupport = createIndexerSupport(indexHelper, checkpoint);
+        log.info("Proceeding to index {} upto checkpoint {} {}", indexHelper.getIndexPaths(), checkpoint,
+                indexerSupport.getCheckpointInfo());
+
+        if (opts.getCommonOpts().isMongo() && idxOpts.isDocTraversalMode()) {
+            log.info("Using Document order traversal to perform reindexing");
+            try (ElasticDocumentStoreIndexer indexer = new ElasticDocumentStoreIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                // TODO : See if this can be handled in a better manner
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //

Review comment:
       Logging the exception would be nice (debug level might be ok if it happens normally; otherwise warn level maybe)

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/run/Main.java
##########
@@ -0,0 +1,35 @@
+package org.apache.jackrabbit.oak.run;
+
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.Utils;
+
+import java.util.Locale;
+
+import static java.util.Arrays.copyOfRange;
+import static org.apache.jackrabbit.oak.run.AvailableElasticModes.MODES;
+
+public final class Main {
+    private Main() {
+        // Prevent instantiation.
+    }
+
+    public static void main(String[] args) throws Exception {

Review comment:
       While it works, isn't the "new" way using "String... args"?

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);

Review comment:
       Is this needed? If yes, could you add a (one-liner) comment about why?

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticOutOfBandIndexer.java
##########
@@ -0,0 +1,64 @@
+package org.apache.jackrabbit.oak.index;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static java.util.Arrays.asList;
+
+public class ElasticOutOfBandIndexer extends OutOfBandIndexer {
+    private final String indexPrefix;
+    private final String scheme;
+    private final String host;
+    private final int port;
+    private final String apiKeyId;
+    private final String apiSecretId;
+
+    public ElasticOutOfBandIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport,
+                                   String indexPrefix, String scheme,
+                                   String host, int port,
+                                   String apiKeyId, String apiSecretId) {
+        super(indexHelper, indexerSupport);
+        this.indexPrefix = indexPrefix;
+        this.scheme = scheme;
+        this.host = host;
+        this.port = port;
+        this.apiKeyId = apiKeyId;
+        this.apiSecretId = apiSecretId;
+    }
+
+    @Override
+    protected IndexEditorProvider createIndexEditorProvider() {
+        IndexEditorProvider elastic = createElasticEditorProvider();
+

Review comment:
       The empty line is a bit strange

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
##########
@@ -0,0 +1,55 @@
+package org.apache.jackrabbit.oak.index.indexer.document;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.index.IndexHelper;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+
+public class ElasticIndexerProvider implements NodeStateIndexerProvider {
+    private final ExtractedTextCache textCache =
+            new ExtractedTextCache(FileUtils.ONE_MB * 5, TimeUnit.HOURS.toSeconds(5));
+    private final IndexHelper indexHelper;
+    private final ElasticIndexWriterFactory indexWriterFactory;
+    private final ElasticConnection coordinate;
+
+    public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection coordinate) {
+        this.indexHelper = indexHelper;
+        this.indexWriterFactory = new ElasticIndexWriterFactory(coordinate);
+        this.coordinate = coordinate;
+    }
+
+
+    @Override
+    public @Nullable NodeStateIndexer getIndexer(@NotNull String type, @NotNull String indexPath, @NotNull NodeBuilder definition, @NotNull NodeState root, IndexingProgressReporter progressReporter) {
+        if (!ElasticIndexDefinition.TYPE_ELASTICSEARCH.equals(definition.getString(TYPE_PROPERTY_NAME))) {
+            return null;
+        }
+        ElasticIndexDefinition idxDefinition = (ElasticIndexDefinition) new ElasticIndexDefinition.Builder(coordinate.getIndexPrefix()).
+                root(root).indexPath(indexPath).defn(definition.getNodeState()).reindex().build();
+
+        FulltextIndexWriter <ElasticDocument> indexWriter = indexWriterFactory.newInstance(idxDefinition, definition, CommitInfo.EMPTY, true);
+        FulltextBinaryTextExtractor textExtractor = new FulltextBinaryTextExtractor(textCache, idxDefinition, true);
+        return new ElasticIndexer(idxDefinition, textExtractor, definition, progressReporter, indexWriter);
+    }
+
+    @Override
+    public void close() throws IOException {
+

Review comment:
       I would avoid the empty line

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);
+        }
+    }
+
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, Closer closer)
+            throws IOException, CommitFailedException {
+        IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
+
+        // TODO : See if we need to support dumpIndexStats and index defs for elastic - not needed for now
+        //dumpIndexStats(indexOpts, indexHelper);
+        //dumpIndexDefinitions(indexOpts, indexHelper);
+        reindexOperation(indexOpts, indexHelper);
+    }
+
+    private IndexHelper createIndexHelper(NodeStoreFixture fixture,
+                                          IndexOptions indexOpts, Closer closer) throws IOException {
+        IndexHelper indexHelper = new IndexHelper(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(),
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), computeIndexPaths(indexOpts));
+
+        // TODO : See if pre text extraction is needed for elastic
+        //configurePreExtractionSupport(indexOpts, indexHelper);
+
+        closer.register(indexHelper);
+        return indexHelper;
+    }
+
+    private List<String> computeIndexPaths(IndexOptions indexOpts) throws IOException {
+        //Combine the indexPaths from json and cli args
+        Set<String> indexPaths = new LinkedHashSet<>(indexOpts.getIndexPaths());
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            IndexDefinitionUpdater updater = new IndexDefinitionUpdater(definitions);
+            Set<String> indexPathsFromJson = updater.getIndexPaths();
+            Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
+            if (!diff.isEmpty()) {
+                log.info("Augmenting the indexPaths with {} which are present in {}", diff, definitions);
+            }
+            indexPaths.addAll(indexPathsFromJson);
+        }
+        return new ArrayList<>(indexPaths);
+    }
+
+    private void reindexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
+        if (!indexOpts.isReindex()) {
+            return;
+        }
+
+        String checkpoint = indexOpts.getCheckpoint();
+        reindex(indexOpts, indexHelper, checkpoint);
+    }
+
+    private void reindex(IndexOptions idxOpts, IndexHelper indexHelper, String checkpoint) throws IOException, CommitFailedException {
+        Preconditions.checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
+
+        Stopwatch w = Stopwatch.createStarted();
+        IndexerSupport indexerSupport = createIndexerSupport(indexHelper, checkpoint);
+        log.info("Proceeding to index {} upto checkpoint {} {}", indexHelper.getIndexPaths(), checkpoint,
+                indexerSupport.getCheckpointInfo());
+
+        if (opts.getCommonOpts().isMongo() && idxOpts.isDocTraversalMode()) {
+            log.info("Using Document order traversal to perform reindexing");
+            try (ElasticDocumentStoreIndexer indexer = new ElasticDocumentStoreIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                // TODO : See if this can be handled in a better manner
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //
+            }
+        } else {
+            try(ElasticOutOfBandIndexer indexer = new ElasticOutOfBandIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //

Review comment:
       same here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] fabriziofortino commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
fabriziofortino commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560303698



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticOutOfBandIndexer.java
##########
@@ -0,0 +1,65 @@
+package org.apache.jackrabbit.oak.index;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static java.util.Arrays.asList;
+
+public class ElasticOutOfBandIndexer extends OutOfBandIndexer {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());

Review comment:
       `log` is never used

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
##########
@@ -0,0 +1,54 @@
+package org.apache.jackrabbit.oak.index.indexer.document;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.index.IndexHelper;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+
+public class ElasticIndexerProvider implements NodeStateIndexerProvider {
+    private final ExtractedTextCache textCache =
+            new ExtractedTextCache(FileUtils.ONE_MB * 5, TimeUnit.HOURS.toSeconds(5));
+    private final IndexHelper indexHelper;
+    private final ElasticIndexWriterFactory indexWriterFactory;
+    private final ElasticConnection coordinate;
+
+    public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection coordinate) {
+        this.indexHelper = indexHelper;
+        this.indexWriterFactory = new ElasticIndexWriterFactory(coordinate);
+        this.coordinate = coordinate;
+    }
+
+
+    @Override
+    public @Nullable NodeStateIndexer getIndexer(@NotNull String type, @NotNull String indexPath, @NotNull NodeBuilder definition, @NotNull NodeState root, IndexingProgressReporter progressReporter) {
+        if (!ElasticIndexDefinition.TYPE_ELASTICSEARCH.equals(definition.getString(TYPE_PROPERTY_NAME))) {
+            return null;
+        }
+        ElasticIndexDefinition idxDefinition = (ElasticIndexDefinition) new ElasticIndexDefinition.Builder(coordinate.getIndexPrefix()).
+                root(root).indexPath(indexPath).defn(definition.getNodeState()).reindex().build();
+
+        FulltextIndexWriter indexWriter = indexWriterFactory.newInstance(idxDefinition, definition, CommitInfo.EMPTY, true);

Review comment:
       ```suggestion
           FulltextIndexWriter<ElasticDocument> indexWriter = indexWriterFactory.newInstance(idxDefinition, definition, CommitInfo.EMPTY, true);
   ```

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,201 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);
+        }
+    }
+
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, Closer closer)
+            throws IOException, CommitFailedException {
+        IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
+
+        // TODO : See if we need to support dumpIndexStats and index defs for elastic - not needed for now
+        //dumpIndexStats(indexOpts, indexHelper);
+        //dumpIndexDefinitions(indexOpts, indexHelper);
+        reindexOperation(indexOpts, indexHelper);
+    }
+
+    private IndexHelper createIndexHelper(NodeStoreFixture fixture,
+                                          IndexOptions indexOpts, Closer closer) throws IOException {
+        IndexHelper indexHelper = new IndexHelper(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(),
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), computeIndexPaths(indexOpts));
+
+        // TODO : See if pre text extraction is needed for elastic
+        //configurePreExtractionSupport(indexOpts, indexHelper);
+
+        closer.register(indexHelper);
+        return indexHelper;
+    }
+
+    private List<String> computeIndexPaths(IndexOptions indexOpts) throws IOException {
+        //Combine the indexPaths from json and cli args
+        Set<String> indexPaths = new LinkedHashSet<>(indexOpts.getIndexPaths());
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            IndexDefinitionUpdater updater = new IndexDefinitionUpdater(definitions);
+            Set<String> indexPathsFromJson = updater.getIndexPaths();
+            Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
+            if (!diff.isEmpty()) {
+                log.info("Augmenting the indexPaths with {} which are present in {}", diff, definitions);
+            }
+            indexPaths.addAll(indexPathsFromJson);
+        }
+        return new ArrayList<>(indexPaths);
+    }
+
+    private void reindexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
+        if (!indexOpts.isReindex()) {
+            return;
+        }
+
+        String checkpoint = indexOpts.getCheckpoint();
+        reindex(indexOpts, indexHelper, checkpoint);
+    }
+
+    private void reindex(IndexOptions idxOpts, IndexHelper indexHelper, String checkpoint) throws IOException, CommitFailedException {
+        Preconditions.checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
+
+        Stopwatch w = Stopwatch.createStarted();

Review comment:
       this gets created but we never use it. Should we print the time the reindex operation takes?

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,201 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);
+        }
+    }
+
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, Closer closer)
+            throws IOException, CommitFailedException {
+        IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
+
+        // TODO : See if we need to support dumpIndexStats and index defs for elastic - not needed for now
+        //dumpIndexStats(indexOpts, indexHelper);
+        //dumpIndexDefinitions(indexOpts, indexHelper);
+        reindexOperation(indexOpts, indexHelper);
+    }
+
+    private IndexHelper createIndexHelper(NodeStoreFixture fixture,
+                                          IndexOptions indexOpts, Closer closer) throws IOException {
+        IndexHelper indexHelper = new IndexHelper(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(),
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), computeIndexPaths(indexOpts));
+
+        // TODO : See if pre text extraction is needed for elastic
+        //configurePreExtractionSupport(indexOpts, indexHelper);
+
+        closer.register(indexHelper);
+        return indexHelper;
+    }
+
+    private List<String> computeIndexPaths(IndexOptions indexOpts) throws IOException {
+        //Combine the indexPaths from json and cli args
+        Set<String> indexPaths = new LinkedHashSet<>(indexOpts.getIndexPaths());
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            IndexDefinitionUpdater updater = new IndexDefinitionUpdater(definitions);
+            Set<String> indexPathsFromJson = updater.getIndexPaths();
+            Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
+            if (!diff.isEmpty()) {
+                log.info("Augmenting the indexPaths with {} which are present in {}", diff, definitions);
+            }
+            indexPaths.addAll(indexPathsFromJson);
+        }
+        return new ArrayList<>(indexPaths);
+    }
+
+    private void reindexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
+        if (!indexOpts.isReindex()) {
+            return;
+        }
+
+        String checkpoint = indexOpts.getCheckpoint();
+        reindex(indexOpts, indexHelper, checkpoint);
+    }
+
+    private void reindex(IndexOptions idxOpts, IndexHelper indexHelper, String checkpoint) throws IOException, CommitFailedException {
+        Preconditions.checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
+
+        Stopwatch w = Stopwatch.createStarted();
+        IndexerSupport indexerSupport = createIndexerSupport(indexHelper, checkpoint);
+        log.info("Proceeding to index {} upto checkpoint {} {}", indexHelper.getIndexPaths(), checkpoint,
+                indexerSupport.getCheckpointInfo());
+
+        if (opts.getCommonOpts().isMongo() && idxOpts.isDocTraversalMode()) {
+            log.info("Using Document order traversal to perform reindexing");
+            try (ElasticDocumentStoreIndexer indexer = new ElasticDocumentStoreIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                // TODO : See if this can be handled in a better manner
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //
+            }
+        } else {
+            try(ElasticOutOfBandIndexer indexer = new ElasticOutOfBandIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //
+            }
+        }
+        indexerSupport.writeMetaInfo(checkpoint);
+    }
+
+    private IndexerSupport createIndexerSupport(IndexHelper indexHelper, String checkpoint) {
+        IndexerSupport indexerSupport = new IndexerSupport(indexHelper, checkpoint);
+
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            Preconditions.checkArgument(definitions.exists(), "Index definitions file [%s] not found", getPath(definitions));
+            indexerSupport.setIndexDefinitions(definitions);
+        }
+        return indexerSupport;
+    }
+
+    static Path getPath(File file) {
+        return file.toPath().normalize().toAbsolutePath();
+    }
+
+    private void shutdownLogging() {
+        LoggingInitializer.shutdownLogging();
+    }
+
+    private static void logCliArgs(String[] args) {
+        log.info("Command line arguments used for indexing [{}]", Joiner.on(' ').join(args));
+        List<String> inputArgs = ManagementFactory.getRuntimeMXBean().getInputArguments();
+        if (!inputArgs.isEmpty()) {
+            log.info("System properties and vm options passed {}", inputArgs);
+        }
+    }
+
+    public static void setDisableExitOnError(boolean disableExitOnError) {

Review comment:
       this is never called

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexer.java
##########
@@ -0,0 +1,99 @@
+package org.apache.jackrabbit.oak.index.indexer.document;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocumentMaker;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class ElasticIndexer implements NodeStateIndexer {
+
+    private final IndexDefinition definition;
+    private final FulltextBinaryTextExtractor binaryTextExtractor;
+    private final NodeBuilder definitionBuilder;
+    private final IndexingProgressReporter progressReporter;
+    //private final ElasticIndexEditorProvider elasticIndexEditorProvider;
+    private final FulltextIndexWriter indexWriter;
+
+    public ElasticIndexer(IndexDefinition definition, FulltextBinaryTextExtractor binaryTextExtractor,
+                          NodeBuilder definitionBuilder, IndexingProgressReporter progressReporter,
+                          FulltextIndexWriter indexWriter) {

Review comment:
       ```suggestion
                             FulltextIndexWriter<ElasticDocument> indexWriter) {
   ```

##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexer.java
##########
@@ -0,0 +1,99 @@
+package org.apache.jackrabbit.oak.index.indexer.document;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocumentMaker;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.filter.PathFilter;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class ElasticIndexer implements NodeStateIndexer {
+
+    private final IndexDefinition definition;
+    private final FulltextBinaryTextExtractor binaryTextExtractor;
+    private final NodeBuilder definitionBuilder;
+    private final IndexingProgressReporter progressReporter;
+    //private final ElasticIndexEditorProvider elasticIndexEditorProvider;
+    private final FulltextIndexWriter indexWriter;

Review comment:
       ```suggestion
       private final FulltextIndexWriter<ElasticDocument> indexWriter;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560795087



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);
+        }
+    }
+
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, Closer closer)
+            throws IOException, CommitFailedException {
+        IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
+
+        // TODO : See if we need to support dumpIndexStats and index defs for elastic - not needed for now
+        //dumpIndexStats(indexOpts, indexHelper);
+        //dumpIndexDefinitions(indexOpts, indexHelper);
+        reindexOperation(indexOpts, indexHelper);
+    }
+
+    private IndexHelper createIndexHelper(NodeStoreFixture fixture,
+                                          IndexOptions indexOpts, Closer closer) throws IOException {
+        IndexHelper indexHelper = new IndexHelper(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(),
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), computeIndexPaths(indexOpts));
+
+        // TODO : See if pre text extraction is needed for elastic
+        //configurePreExtractionSupport(indexOpts, indexHelper);
+
+        closer.register(indexHelper);
+        return indexHelper;
+    }
+
+    private List<String> computeIndexPaths(IndexOptions indexOpts) throws IOException {
+        //Combine the indexPaths from json and cli args
+        Set<String> indexPaths = new LinkedHashSet<>(indexOpts.getIndexPaths());
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            IndexDefinitionUpdater updater = new IndexDefinitionUpdater(definitions);
+            Set<String> indexPathsFromJson = updater.getIndexPaths();
+            Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
+            if (!diff.isEmpty()) {
+                log.info("Augmenting the indexPaths with {} which are present in {}", diff, definitions);
+            }
+            indexPaths.addAll(indexPathsFromJson);
+        }
+        return new ArrayList<>(indexPaths);
+    }
+
+    private void reindexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
+        if (!indexOpts.isReindex()) {
+            return;
+        }
+
+        String checkpoint = indexOpts.getCheckpoint();
+        reindex(indexOpts, indexHelper, checkpoint);
+    }
+
+    private void reindex(IndexOptions idxOpts, IndexHelper indexHelper, String checkpoint) throws IOException, CommitFailedException {
+        Preconditions.checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
+
+        Stopwatch w = Stopwatch.createStarted();
+        IndexerSupport indexerSupport = createIndexerSupport(indexHelper, checkpoint);
+        log.info("Proceeding to index {} upto checkpoint {} {}", indexHelper.getIndexPaths(), checkpoint,
+                indexerSupport.getCheckpointInfo());
+
+        if (opts.getCommonOpts().isMongo() && idxOpts.isDocTraversalMode()) {
+            log.info("Using Document order traversal to perform reindexing");
+            try (ElasticDocumentStoreIndexer indexer = new ElasticDocumentStoreIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                // TODO : See if this can be handled in a better manner
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560795897



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticOutOfBandIndexer.java
##########
@@ -0,0 +1,64 @@
+package org.apache.jackrabbit.oak.index;
+
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.plugins.index.CompositeIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static java.util.Arrays.asList;
+
+public class ElasticOutOfBandIndexer extends OutOfBandIndexer {
+    private final String indexPrefix;
+    private final String scheme;
+    private final String host;
+    private final int port;
+    private final String apiKeyId;
+    private final String apiSecretId;
+
+    public ElasticOutOfBandIndexer(IndexHelper indexHelper, IndexerSupport indexerSupport,
+                                   String indexPrefix, String scheme,
+                                   String host, int port,
+                                   String apiKeyId, String apiSecretId) {
+        super(indexHelper, indexerSupport);
+        this.indexPrefix = indexPrefix;
+        this.scheme = scheme;
+        this.host = host;
+        this.port = port;
+        this.apiKeyId = apiKeyId;
+        this.apiSecretId = apiSecretId;
+    }
+
+    @Override
+    protected IndexEditorProvider createIndexEditorProvider() {
+        IndexEditorProvider elastic = createElasticEditorProvider();
+

Review comment:
       removed the empty line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560684885



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,201 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);
+        }
+    }
+
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, Closer closer)
+            throws IOException, CommitFailedException {
+        IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
+
+        // TODO : See if we need to support dumpIndexStats and index defs for elastic - not needed for now
+        //dumpIndexStats(indexOpts, indexHelper);
+        //dumpIndexDefinitions(indexOpts, indexHelper);
+        reindexOperation(indexOpts, indexHelper);
+    }
+
+    private IndexHelper createIndexHelper(NodeStoreFixture fixture,
+                                          IndexOptions indexOpts, Closer closer) throws IOException {
+        IndexHelper indexHelper = new IndexHelper(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(),
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), computeIndexPaths(indexOpts));
+
+        // TODO : See if pre text extraction is needed for elastic
+        //configurePreExtractionSupport(indexOpts, indexHelper);
+
+        closer.register(indexHelper);
+        return indexHelper;
+    }
+
+    private List<String> computeIndexPaths(IndexOptions indexOpts) throws IOException {
+        //Combine the indexPaths from json and cli args
+        Set<String> indexPaths = new LinkedHashSet<>(indexOpts.getIndexPaths());
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            IndexDefinitionUpdater updater = new IndexDefinitionUpdater(definitions);
+            Set<String> indexPathsFromJson = updater.getIndexPaths();
+            Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
+            if (!diff.isEmpty()) {
+                log.info("Augmenting the indexPaths with {} which are present in {}", diff, definitions);
+            }
+            indexPaths.addAll(indexPathsFromJson);
+        }
+        return new ArrayList<>(indexPaths);
+    }
+
+    private void reindexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
+        if (!indexOpts.isReindex()) {
+            return;
+        }
+
+        String checkpoint = indexOpts.getCheckpoint();
+        reindex(indexOpts, indexHelper, checkpoint);
+    }
+
+    private void reindex(IndexOptions idxOpts, IndexHelper indexHelper, String checkpoint) throws IOException, CommitFailedException {
+        Preconditions.checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
+
+        Stopwatch w = Stopwatch.createStarted();
+        IndexerSupport indexerSupport = createIndexerSupport(indexHelper, checkpoint);
+        log.info("Proceeding to index {} upto checkpoint {} {}", indexHelper.getIndexPaths(), checkpoint,
+                indexerSupport.getCheckpointInfo());
+
+        if (opts.getCommonOpts().isMongo() && idxOpts.isDocTraversalMode()) {
+            log.info("Using Document order traversal to perform reindexing");
+            try (ElasticDocumentStoreIndexer indexer = new ElasticDocumentStoreIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                // TODO : See if this can be handled in a better manner
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //
+            }
+        } else {
+            try(ElasticOutOfBandIndexer indexer = new ElasticOutOfBandIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //
+            }
+        }
+        indexerSupport.writeMetaInfo(checkpoint);
+    }
+
+    private IndexerSupport createIndexerSupport(IndexHelper indexHelper, String checkpoint) {
+        IndexerSupport indexerSupport = new IndexerSupport(indexHelper, checkpoint);
+
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            Preconditions.checkArgument(definitions.exists(), "Index definitions file [%s] not found", getPath(definitions));
+            indexerSupport.setIndexDefinitions(definitions);
+        }
+        return indexerSupport;
+    }
+
+    static Path getPath(File file) {
+        return file.toPath().normalize().toAbsolutePath();
+    }
+
+    private void shutdownLogging() {
+        LoggingInitializer.shutdownLogging();
+    }
+
+    private static void logCliArgs(String[] args) {
+        log.info("Command line arguments used for indexing [{}]", Joiner.on(' ').join(args));
+        List<String> inputArgs = ManagementFactory.getRuntimeMXBean().getInputArguments();
+        if (!inputArgs.isEmpty()) {
+            log.info("System properties and vm options passed {}", inputArgs);
+        }
+    }
+
+    public static void setDisableExitOnError(boolean disableExitOnError) {

Review comment:
       Actually I used this from the lucene index command - this can be used if we use this command from some external script. For the time being, we can just let it be.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560796787



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/ElasticIndexerProvider.java
##########
@@ -0,0 +1,55 @@
+package org.apache.jackrabbit.oak.index.indexer.document;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.index.IndexHelper;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticConnection;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticDocument;
+import org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexWriterFactory;
+import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
+import org.apache.jackrabbit.oak.plugins.index.search.ExtractedTextCache;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.binary.FulltextBinaryTextExtractor;
+import org.apache.jackrabbit.oak.plugins.index.search.spi.editor.FulltextIndexWriter;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+
+public class ElasticIndexerProvider implements NodeStateIndexerProvider {
+    private final ExtractedTextCache textCache =
+            new ExtractedTextCache(FileUtils.ONE_MB * 5, TimeUnit.HOURS.toSeconds(5));
+    private final IndexHelper indexHelper;
+    private final ElasticIndexWriterFactory indexWriterFactory;
+    private final ElasticConnection coordinate;
+
+    public ElasticIndexerProvider(IndexHelper indexHelper, ElasticConnection coordinate) {
+        this.indexHelper = indexHelper;
+        this.indexWriterFactory = new ElasticIndexWriterFactory(coordinate);
+        this.coordinate = coordinate;
+    }
+
+
+    @Override
+    public @Nullable NodeStateIndexer getIndexer(@NotNull String type, @NotNull String indexPath, @NotNull NodeBuilder definition, @NotNull NodeState root, IndexingProgressReporter progressReporter) {
+        if (!ElasticIndexDefinition.TYPE_ELASTICSEARCH.equals(definition.getString(TYPE_PROPERTY_NAME))) {
+            return null;
+        }
+        ElasticIndexDefinition idxDefinition = (ElasticIndexDefinition) new ElasticIndexDefinition.Builder(coordinate.getIndexPrefix()).
+                root(root).indexPath(indexPath).defn(definition.getNodeState()).reindex().build();
+
+        FulltextIndexWriter <ElasticDocument> indexWriter = indexWriterFactory.newInstance(idxDefinition, definition, CommitInfo.EMPTY, true);
+        FulltextBinaryTextExtractor textExtractor = new FulltextBinaryTextExtractor(textCache, idxDefinition, true);
+        return new ElasticIndexer(idxDefinition, textExtractor, definition, progressReporter, indexWriter);
+    }
+
+    @Override
+    public void close() throws IOException {
+

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 closed pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 closed pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560793575



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);

Review comment:
       This seems to be needed for proper closure of logback it seems - was added in indexing command over here https://github.com/oak-indexing/jackrabbit-oak/commit/cf2b87ff8302eaa877a72ab0dbf7dab27c0d2464




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560792790



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#issuecomment-763348572


   @fabriziofortino  - Thanks, made the suggested changes. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560795175



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/index/ElasticIndexCommand.java
##########
@@ -0,0 +1,204 @@
+package org.apache.jackrabbit.oak.index;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import joptsimple.OptionParser;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexDefinition;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexDefinitionUpdater;
+import org.apache.jackrabbit.oak.run.cli.CommonOptions;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixture;
+import org.apache.jackrabbit.oak.run.cli.NodeStoreFixtureProvider;
+import org.apache.jackrabbit.oak.run.cli.Options;
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.LoggingInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class ElasticIndexCommand implements Command {
+    private static final Logger log = LoggerFactory.getLogger(ElasticIndexCommand.class);
+    private Options opts;
+    private ElasticIndexOptions indexOpts;
+    public static final String NAME = "index";
+
+    private final String summary = "Provides elastic index management related operations";
+    private static boolean disableExitOnError;
+
+
+    @Override
+    public void execute(String... args) throws Exception {
+        OptionParser parser = new OptionParser();
+
+        opts = new Options();
+        opts.setCommandName(NAME);
+        opts.setSummary(summary);
+        opts.setConnectionString(CommonOptions.DEFAULT_CONNECTION_STRING);
+        opts.registerOptionsFactory(ElasticIndexOptions.FACTORY);
+        opts.parseAndConfigure(parser, args);
+
+        indexOpts = opts.getOptionBean(ElasticIndexOptions.class);
+
+        //Clean up before setting up NodeStore as the temp
+        //directory might be used by NodeStore for cache stuff like persistentCache
+        //setupDirectories(indexOpts);
+        //setupLogging(indexOpts);
+
+        logCliArgs(args);
+
+        boolean success = false;
+        try {
+            try (Closer closer = Closer.create()) {
+                //configureCustomizer(opts, closer, true);
+                NodeStoreFixture fixture = NodeStoreFixtureProvider.create(opts);
+                closer.register(fixture);
+                execute(fixture, indexOpts, closer);
+                //tellReportPaths();
+            }
+
+            success = true;
+        } catch (Throwable e) {
+            log.error("Error occurred while performing index tasks", e);
+            e.printStackTrace(System.err);
+            if (disableExitOnError) {
+                throw e;
+            }
+        } finally {
+            shutdownLogging();
+        }
+
+        if (!success) {
+            System.exit(1);
+        }
+    }
+
+    private void execute(NodeStoreFixture fixture, IndexOptions indexOpts, Closer closer)
+            throws IOException, CommitFailedException {
+        IndexHelper indexHelper = createIndexHelper(fixture, indexOpts, closer);
+
+        // TODO : See if we need to support dumpIndexStats and index defs for elastic - not needed for now
+        //dumpIndexStats(indexOpts, indexHelper);
+        //dumpIndexDefinitions(indexOpts, indexHelper);
+        reindexOperation(indexOpts, indexHelper);
+    }
+
+    private IndexHelper createIndexHelper(NodeStoreFixture fixture,
+                                          IndexOptions indexOpts, Closer closer) throws IOException {
+        IndexHelper indexHelper = new IndexHelper(fixture.getStore(), fixture.getBlobStore(), fixture.getWhiteboard(),
+                indexOpts.getOutDir(), indexOpts.getWorkDir(), computeIndexPaths(indexOpts));
+
+        // TODO : See if pre text extraction is needed for elastic
+        //configurePreExtractionSupport(indexOpts, indexHelper);
+
+        closer.register(indexHelper);
+        return indexHelper;
+    }
+
+    private List<String> computeIndexPaths(IndexOptions indexOpts) throws IOException {
+        //Combine the indexPaths from json and cli args
+        Set<String> indexPaths = new LinkedHashSet<>(indexOpts.getIndexPaths());
+        File definitions = indexOpts.getIndexDefinitionsFile();
+        if (definitions != null) {
+            IndexDefinitionUpdater updater = new IndexDefinitionUpdater(definitions);
+            Set<String> indexPathsFromJson = updater.getIndexPaths();
+            Set<String> diff = Sets.difference(indexPathsFromJson, indexPaths);
+            if (!diff.isEmpty()) {
+                log.info("Augmenting the indexPaths with {} which are present in {}", diff, definitions);
+            }
+            indexPaths.addAll(indexPathsFromJson);
+        }
+        return new ArrayList<>(indexPaths);
+    }
+
+    private void reindexOperation(IndexOptions indexOpts, IndexHelper indexHelper) throws IOException, CommitFailedException {
+        if (!indexOpts.isReindex()) {
+            return;
+        }
+
+        String checkpoint = indexOpts.getCheckpoint();
+        reindex(indexOpts, indexHelper, checkpoint);
+    }
+
+    private void reindex(IndexOptions idxOpts, IndexHelper indexHelper, String checkpoint) throws IOException, CommitFailedException {
+        Preconditions.checkNotNull(checkpoint, "Checkpoint value is required for reindexing done in read only mode");
+
+        Stopwatch w = Stopwatch.createStarted();
+        IndexerSupport indexerSupport = createIndexerSupport(indexHelper, checkpoint);
+        log.info("Proceeding to index {} upto checkpoint {} {}", indexHelper.getIndexPaths(), checkpoint,
+                indexerSupport.getCheckpointInfo());
+
+        if (opts.getCommonOpts().isMongo() && idxOpts.isDocTraversalMode()) {
+            log.info("Using Document order traversal to perform reindexing");
+            try (ElasticDocumentStoreIndexer indexer = new ElasticDocumentStoreIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                // TODO : See if this can be handled in a better manner
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //
+            }
+        } else {
+            try(ElasticOutOfBandIndexer indexer = new ElasticOutOfBandIndexer(indexHelper, indexerSupport, indexOpts.getIndexPrefix(),
+                    indexOpts.getElasticScheme(), indexOpts.getElasticHost(),
+                    indexOpts.getElasticPort(), indexOpts.getApiKeyId(), indexOpts.getApiKeySecret())) {
+
+                indexer.reindex();
+                // Wait for default flush interval before exiting the try block
+                // to make sure the client is not closed before the last flush
+                Thread.sleep(ElasticIndexDefinition.BULK_FLUSH_INTERVAL_MS_DEFAULT*2);
+            } catch (InterruptedException e) {
+                //

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#issuecomment-763456871


   @thomasmueller  - did the changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [jackrabbit-oak] nit0906 commented on a change in pull request #263: OAK-9312 | Add reindex support for elastic in oak-run (new module added)

Posted by GitBox <gi...@apache.org>.
nit0906 commented on a change in pull request #263:
URL: https://github.com/apache/jackrabbit-oak/pull/263#discussion_r560797435



##########
File path: oak-run-elastic/src/main/java/org/apache/jackrabbit/oak/run/Main.java
##########
@@ -0,0 +1,35 @@
+package org.apache.jackrabbit.oak.run;
+
+import org.apache.jackrabbit.oak.run.commons.Command;
+import org.apache.jackrabbit.oak.run.commons.Utils;
+
+import java.util.Locale;
+
+import static java.util.Arrays.copyOfRange;
+import static org.apache.jackrabbit.oak.run.AvailableElasticModes.MODES;
+
+public final class Main {
+    private Main() {
+        // Prevent instantiation.
+    }
+
+    public static void main(String[] args) throws Exception {

Review comment:
       ah..I still am in the habit to use String[] .. changed it to String... args now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org