You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by "msokolov (via GitHub)" <gi...@apache.org> on 2023/05/08 15:33:25 UTC

[GitHub] [lucene] msokolov commented on a diff in pull request #12254: add ConcurrentOnHeapHnswGraph and Builder

msokolov commented on code in PR #12254:
URL: https://github.com/apache/lucene/pull/12254#discussion_r1187517239


##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentOnHeapHnswGraph.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * An {@link HnswGraph} that offers concurrent access; for typical graphs you will get significant
+ * speedups in construction and searching as you add threads.
+ *
+ * <p>To search this graph, you should use a View obtained from {@link #getView()} to perform `seek`
+ * and `nextNeighbor` operations. For convenience, you can use these methods directly on the graph
+ * instance, which will give you a ThreadLocal View, but you can call `getView` directly if you need
+ * more control, e.g. for performing a second search in the same thread while the first is still in
+ * progress.
+ */
+public final class ConcurrentOnHeapHnswGraph extends HnswGraph implements Accountable {
+  private final AtomicReference<NodeAtLevel>
+      entryPoint; // the current graph entry node on the top level. -1 if not set
+
+  // views for compatibility with HnswGraph interface; prefer creating views explicitly
+  private final ThreadLocal<ConcurrentHnswGraphView> views =

Review Comment:
   I really do think we need to remove these convenience methods based on ThreadLocals. We don't ever expect this to be used by the codec and we don't need to support any other use case. It seems to be driven by conformance to the pre-existing HnswGraph API, but this isn't required and results in a lot of extra complexity here (new interface abstractions, generics, etc) that obscures the basic idea for reviewers trying to understand what's going on here. The concurrent algorithm is a complex powerful new contribution - let's not clutter the PR with unrelated stuff.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentOnHeapHnswGraph.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * An {@link HnswGraph} that offers concurrent access; for typical graphs you will get significant
+ * speedups in construction and searching as you add threads.
+ *
+ * <p>To search this graph, you should use a View obtained from {@link #getView()} to perform `seek`
+ * and `nextNeighbor` operations. For convenience, you can use these methods directly on the graph
+ * instance, which will give you a ThreadLocal View, but you can call `getView` directly if you need
+ * more control, e.g. for performing a second search in the same thread while the first is still in
+ * progress.
+ */
+public final class ConcurrentOnHeapHnswGraph extends HnswGraph implements Accountable {
+  private final AtomicReference<NodeAtLevel>
+      entryPoint; // the current graph entry node on the top level. -1 if not set
+
+  // views for compatibility with HnswGraph interface; prefer creating views explicitly
+  private final ThreadLocal<ConcurrentHnswGraphView> views =
+      ThreadLocal.withInitial(ConcurrentHnswGraphView::new);
+
+  // Unlike OnHeapHnswGraph (OHHG), we use the same data structure for Level 0 and higher node
+  // lists,
+  // a ConcurrentHashMap.  While the ArrayList used for L0 in OHHG is faster for single-threaded
+  // workloads, it imposes an unacceptable contention burden for concurrent workloads.
+  private final ConcurrentMap<Integer, ConcurrentMap<Integer, ConcurrentNeighborSet>> graphLevels;
+
+  // Neighbours' size on upper levels (nsize) and level 0 (nsize0)
+  private final int nsize;
+  private final int nsize0;
+
+  ConcurrentOnHeapHnswGraph(int M) {
+    this.entryPoint =
+        new AtomicReference<>(
+            new NodeAtLevel(0, -1)); // Entry node should be negative until a node is added
+    this.nsize = M;
+    this.nsize0 = 2 * M;
+
+    this.graphLevels = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns the neighbors connected to the given node.
+   *
+   * @param level level of the graph
+   * @param node the node whose neighbors are returned, represented as an ordinal on the level 0.
+   */
+  public ConcurrentNeighborSet getNeighbors(int level, int node) {
+    return graphLevels.get(level).get(node);
+  }
+
+  @Override
+  public synchronized int size() {
+    return graphLevels.get(0).size(); // all nodes are located on the 0th level
+  }
+
+  @Override
+  public void addNode(int level, int node) {
+    if (level >= graphLevels.size()) {
+      for (int i = graphLevels.size(); i <= level; i++) {
+        graphLevels.putIfAbsent(i, new ConcurrentHashMap<>());
+      }
+    }
+
+    graphLevels.get(level).put(node, new ConcurrentNeighborSet(connectionsOnLevel(level)));

Review Comment:
   How do we know two threads aren't adding the same node at once? I guess there must be something that guarantees that, let's at least add a comment here referencing what it is



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentOnHeapHnswGraph.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * An {@link HnswGraph} that offers concurrent access; for typical graphs you will get significant
+ * speedups in construction and searching as you add threads.
+ *
+ * <p>To search this graph, you should use a View obtained from {@link #getView()} to perform `seek`
+ * and `nextNeighbor` operations. For convenience, you can use these methods directly on the graph
+ * instance, which will give you a ThreadLocal View, but you can call `getView` directly if you need
+ * more control, e.g. for performing a second search in the same thread while the first is still in
+ * progress.
+ */
+public final class ConcurrentOnHeapHnswGraph extends HnswGraph implements Accountable {
+  private final AtomicReference<NodeAtLevel>
+      entryPoint; // the current graph entry node on the top level. -1 if not set
+
+  // views for compatibility with HnswGraph interface; prefer creating views explicitly
+  private final ThreadLocal<ConcurrentHnswGraphView> views =
+      ThreadLocal.withInitial(ConcurrentHnswGraphView::new);
+
+  // Unlike OnHeapHnswGraph (OHHG), we use the same data structure for Level 0 and higher node
+  // lists,
+  // a ConcurrentHashMap.  While the ArrayList used for L0 in OHHG is faster for single-threaded
+  // workloads, it imposes an unacceptable contention burden for concurrent workloads.
+  private final ConcurrentMap<Integer, ConcurrentMap<Integer, ConcurrentNeighborSet>> graphLevels;
+
+  // Neighbours' size on upper levels (nsize) and level 0 (nsize0)
+  private final int nsize;
+  private final int nsize0;
+
+  ConcurrentOnHeapHnswGraph(int M) {
+    this.entryPoint =
+        new AtomicReference<>(
+            new NodeAtLevel(0, -1)); // Entry node should be negative until a node is added
+    this.nsize = M;
+    this.nsize0 = 2 * M;
+
+    this.graphLevels = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns the neighbors connected to the given node.
+   *
+   * @param level level of the graph
+   * @param node the node whose neighbors are returned, represented as an ordinal on the level 0.
+   */
+  public ConcurrentNeighborSet getNeighbors(int level, int node) {
+    return graphLevels.get(level).get(node);
+  }
+
+  @Override
+  public synchronized int size() {
+    return graphLevels.get(0).size(); // all nodes are located on the 0th level
+  }
+
+  @Override
+  public void addNode(int level, int node) {
+    if (level >= graphLevels.size()) {
+      for (int i = graphLevels.size(); i <= level; i++) {
+        graphLevels.putIfAbsent(i, new ConcurrentHashMap<>());
+      }
+    }
+
+    graphLevels.get(level).put(node, new ConcurrentNeighborSet(connectionsOnLevel(level)));
+  }
+
+  /**
+   * must be called after addNode to a level > 0
+   *
+   * <p>we don't do this as part of addNode itself, since it may not yet have been added to all the
+   * levels
+   */
+  void maybeUpdateEntryNode(int level, int node) {
+    while (true) {
+      NodeAtLevel oldEntry = entryPoint.get();
+      if (oldEntry.node >= 0 && oldEntry.level >= level) {
+        break;
+      }
+      entryPoint.compareAndSet(oldEntry, new NodeAtLevel(level, node));
+    }
+  }
+
+  private int connectionsOnLevel(int level) {
+    return level == 0 ? nsize0 : nsize;
+  }
+
+  @Override

Review Comment:
   Please remove, at least for now. I think we will find a better way to integrate with the codec that doesn't require conforming to this API



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them by their dense
+   * ordinals, using the given hyperparameter settings, and returns the resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - must provide a
+   *     different view over those vectors than the one used to add via addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M + 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {

Review Comment:
   Can we declare the argment to be of type ThreadPoolExecutor and let the compiler handle this? 



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them by their dense
+   * ordinals, using the given hyperparameter settings, and returns the resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - must provide a
+   *     different view over those vectors than the one used to add via addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M + 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to create potentially
+  // millions of futures by naively throwing everything at submit at once.  So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure

Review Comment:
   I see - here is where we guarantee that each node is added once. They might be added out of sequence now, but that's OK



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them by their dense
+   * ordinals, using the given hyperparameter settings, and returns the resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - must provide a
+   *     different view over those vectors than the one used to add via addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =

Review Comment:
   Instead of relying on ThreadLocal can we manage this thread -> searcher association explicitly? EG in addVectors can we create a collection of per-thread builder objects (like the views we have on the graph) and then manage them ourselves? could even be a map of threadid -> builder-view. In this project we've had issues with GC not capturing ThreadLocal storage, eg see https://markmail.org/thread/fbklvo4tkdd5d5u5. If the storage is managed locally as a reference from the builder, it should get cleaned up properly.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them by their dense
+   * ordinals, using the given hyperparameter settings, and returns the resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - must provide a
+   *     different view over those vectors than the one used to add via addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M + 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to create potentially
+  // millions of futures by naively throwing everything at submit at once.  So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure
+      try {
+        semaphore.acquire();
+        pool.submit(
+            () -> {
+              try {
+                addGraphNode(node, vectorsToAdd);
+              } catch (IOException e) {
+                throw new RuntimeException(e);

Review Comment:
   We need to find a way to propagate these exceptions to the caller so we can re-throw. Also, UncheckedIOException is usually preferable for wrapping IOExceptions.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them by their dense
+   * ordinals, using the given hyperparameter settings, and returns the resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - must provide a
+   *     different view over those vectors than the one used to add via addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M + 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(

Review Comment:
   Let's remove this method. It can always be implemented by some sugar-wrapper that creates the appropriate kind of ThreadPool



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them by their dense
+   * ordinals, using the given hyperparameter settings, and returns the resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - must provide a
+   *     different view over those vectors than the one used to add via addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M + 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to create potentially
+  // millions of futures by naively throwing everything at submit at once.  So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure
+      try {
+        semaphore.acquire();
+        pool.submit(
+            () -> {
+              try {
+                addGraphNode(node, vectorsToAdd);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              } finally {
+                semaphore.release();
+              }
+            });
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);

Review Comment:
   Lucene generally uses IOException for most low-level failures and doesn't have any mechanism for handling uncaught unchecked exceptions, so we should convert this to an IOException and add throws to all the methods here that need it.



##########
lucene/core/src/java/org/apache/lucene/util/hnsw/ConcurrentHnswGraphBuilder.java:
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.util.hnsw;
+
+import static java.lang.Math.log;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.lucene.index.VectorEncoding;
+import org.apache.lucene.index.VectorSimilarityFunction;
+import org.apache.lucene.util.AtomicBitSet;
+import org.apache.lucene.util.FixedBitSet;
+import org.apache.lucene.util.InfoStream;
+import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.lucene.util.hnsw.ConcurrentOnHeapHnswGraph.NodeAtLevel;
+
+/**
+ * Builder for Concurrent HNSW graph. See {@link HnswGraph} for a high level overview, and the
+ * comments to `addGraphNode` for details on the concurrent building approach.
+ *
+ * @param <T> the type of vector
+ */
+public final class ConcurrentHnswGraphBuilder<T> implements IHnswGraphBuilder<T> {
+
+  /** Default number of maximum connections per node */
+  public static final int DEFAULT_MAX_CONN = 16;
+
+  /**
+   * Default number of the size of the queue maintained while searching during a graph construction.
+   */
+  public static final int DEFAULT_BEAM_WIDTH = 100;
+
+  /** A name for the HNSW component for the info-stream * */
+  public static final String HNSW_COMPONENT = "HNSW";
+
+  private final int beamWidth;
+  private final double ml;
+  private final ThreadLocal<NeighborArray> scratchNeighbors;
+
+  private final VectorSimilarityFunction similarityFunction;
+  private final VectorEncoding vectorEncoding;
+  private final RandomAccessVectorValues<T> vectors;
+  private final ThreadLocal<HnswGraphSearcher<T>> graphSearcher;
+
+  final ConcurrentOnHeapHnswGraph hnsw;
+  private final ConcurrentSkipListSet<NodeAtLevel> insertionsInProgress =
+      new ConcurrentSkipListSet<>();
+
+  private InfoStream infoStream = InfoStream.getDefault();
+
+  // we need two sources of vectors in order to perform diversity check comparisons without
+  // colliding
+  private final RandomAccessVectorValues<T> vectorsCopy;
+  private final AtomicBitSet initializedNodes;
+
+  /**
+   * Reads all the vectors from vector values, builds a graph connecting them by their dense
+   * ordinals, using the given hyperparameter settings, and returns the resulting graph.
+   *
+   * @param vectors the vectors whose relations are represented by the graph - must provide a
+   *     different view over those vectors than the one used to add via addGraphNode.
+   * @param M – graph fanout parameter used to calculate the maximum number of connections a node
+   *     can have – M on upper layers, and M * 2 on the lowest level.
+   * @param beamWidth the size of the beam search to use when finding nearest neighbors.
+   */
+  public ConcurrentHnswGraphBuilder(
+      RandomAccessVectorValues<T> vectors,
+      VectorEncoding vectorEncoding,
+      VectorSimilarityFunction similarityFunction,
+      int M,
+      int beamWidth)
+      throws IOException {
+    this.vectors = vectors;
+    this.vectorsCopy = vectors.copy();
+    this.vectorEncoding = Objects.requireNonNull(vectorEncoding);
+    this.similarityFunction = Objects.requireNonNull(similarityFunction);
+    if (M <= 0) {
+      throw new IllegalArgumentException("maxConn must be positive");
+    }
+    if (beamWidth <= 0) {
+      throw new IllegalArgumentException("beamWidth must be positive");
+    }
+    this.beamWidth = beamWidth;
+    // normalization factor for level generation; currently not configurable
+    this.ml = M == 1 ? 1 : 1 / Math.log(1.0 * M);
+    this.hnsw = new ConcurrentOnHeapHnswGraph(M);
+    this.graphSearcher =
+        ThreadLocal.withInitial(
+            () -> {
+              return new HnswGraphSearcher<>(
+                  vectorEncoding,
+                  similarityFunction,
+                  new NeighborQueue(beamWidth, true),
+                  new FixedBitSet(this.vectors.size()));
+            });
+    // in scratch we store candidates in reverse order: worse candidates are first
+    scratchNeighbors =
+        ThreadLocal.withInitial(() -> new NeighborArray(Math.max(beamWidth, M + 1), false));
+    this.initializedNodes = new AtomicBitSet(vectors.size());
+  }
+
+  /**
+   * Reads all the vectors from two copies of a {@link RandomAccessVectorValues}. Providing two
+   * copies enables efficient retrieval without extra data copying, while avoiding collision of the
+   * returned values.
+   *
+   * @param vectorsToAdd the vectors for which to build a nearest neighbors graph. Must be an
+   *     independent accessor for the vectors
+   * @param autoParallel if true, the builder will allocate one thread per core to building the
+   *     graph; if false, it will use a single thread. For more fine-grained control, use the
+   *     ExecutorService (ThreadPoolExecutor) overload.
+   */
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, boolean autoParallel) throws IOException {
+    if (autoParallel) {
+      return build(
+          vectorsToAdd,
+          Executors.newFixedThreadPool(
+              Runtime.getRuntime().availableProcessors(),
+              new NamedThreadFactory("Concurrent HNSW builder")));
+    } else {
+      return build(
+          vectorsToAdd,
+          Executors.newSingleThreadExecutor(new NamedThreadFactory("Concurrent HNSW builder")));
+    }
+  }
+
+  @Override
+  public ConcurrentOnHeapHnswGraph build(RandomAccessVectorValues<T> vectorsToAdd)
+      throws IOException {
+    return build(vectorsToAdd, true);
+  }
+
+  public ConcurrentOnHeapHnswGraph build(
+      RandomAccessVectorValues<T> vectorsToAdd, ExecutorService pool) {
+    if (vectorsToAdd == this.vectors) {
+      throw new IllegalArgumentException(
+          "Vectors to build must be independent of the source of vectors provided to HnswGraphBuilder()");
+    }
+    if (infoStream.isEnabled(HNSW_COMPONENT)) {
+      infoStream.message(HNSW_COMPONENT, "build graph from " + vectorsToAdd.size() + " vectors");
+    }
+    if (!(pool instanceof ThreadPoolExecutor)) {
+      throw new IllegalArgumentException("ExecutorService must be a ThreadPoolExecutor");
+    }
+    addVectors(vectorsToAdd, (ThreadPoolExecutor) pool);
+    return hnsw;
+  }
+
+  // the goal here is to keep all the ExecutorService threads busy, but not to create potentially
+  // millions of futures by naively throwing everything at submit at once.  So, we use
+  // a semaphore to wait until a thread is free before adding a new task.
+  private void addVectors(RandomAccessVectorValues<T> vectorsToAdd, ThreadPoolExecutor pool) {
+    Semaphore semaphore = new Semaphore(pool.getMaximumPoolSize());
+
+    for (int i = 0; i < vectorsToAdd.size(); i++) {
+      final int node = i; // copy for closure
+      try {
+        semaphore.acquire();
+        pool.submit(
+            () -> {
+              try {
+                addGraphNode(node, vectorsToAdd);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              } finally {
+                semaphore.release();
+              }
+            });
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    // Wait for any remaining tasks to complete
+    pool.shutdown();
+    try {

Review Comment:
   Perhaps we could return a Future and let the caller handle the details of how long to wait and managing the ThreadExecutor that *they supplied to us*. We don't want to be closing the pool here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org