You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2020/12/22 21:52:07 UTC

[lucene-solr] branch jira/solr-15051-blob created (now a511e77)

This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a change to branch jira/solr-15051-blob
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


      at a511e77  SOLR-15051: Initial WIP

This branch includes the following new commits:

     new a511e77  SOLR-15051: Initial WIP

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[lucene-solr] 01/01: SOLR-15051: Initial WIP

Posted by ds...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch jira/solr-15051-blob
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit a511e77aec7db1b7436603e1267e40498030d253
Author: David Smiley <ds...@salesforce.com>
AuthorDate: Tue Dec 22 16:51:12 2020 -0500

    SOLR-15051: Initial WIP
    
    Co-authored-by: Bruno Roustant <br...@salesforce.com>
---
 gradle/maven/defaults-maven.gradle                 |   1 +
 settings.gradle                                    |   1 +
 solr/contrib/blob-directory/build.gradle           |  27 +++
 solr/contrib/blob-directory/codeDiagram.png        | Bin 0 -> 61354 bytes
 .../java/org/apache/solr/blob/BlobDirectory.java   | 231 +++++++++++++++++++++
 .../org/apache/solr/blob/BlobDirectoryFactory.java | 229 ++++++++++++++++++++
 .../src/java/org/apache/solr/blob/BlobFile.java    |  63 ++++++
 .../src/java/org/apache/solr/blob/BlobListing.java | 135 ++++++++++++
 .../src/java/org/apache/solr/blob/BlobPusher.java  | 107 ++++++++++
 .../src/java/org/apache/solr/blob/BlobStore.java   |  37 ++++
 .../org/apache/solr/blob/FilterIndexOutput.java    | 100 +++++++++
 .../apache/solr/blob/IndexInputInputStream.java    |  90 ++++++++
 solr/packaging/build.gradle                        |   1 +
 13 files changed, 1022 insertions(+)

diff --git a/gradle/maven/defaults-maven.gradle b/gradle/maven/defaults-maven.gradle
index 2662a69..f64ddf0 100644
--- a/gradle/maven/defaults-maven.gradle
+++ b/gradle/maven/defaults-maven.gradle
@@ -61,6 +61,7 @@ configure(rootProject) {
         ":solr:solrj",
         ":solr:contrib:analysis-extras",
         ":solr:contrib:analytics",
+        ":solr:contrib:blob-directory",
         ":solr:contrib:clustering",
         ":solr:contrib:extraction",
         ":solr:contrib:langid",
diff --git a/settings.gradle b/settings.gradle
index be2c09c..a683ddf 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -60,6 +60,7 @@ include "solr:core"
 include "solr:server"
 include "solr:contrib:analysis-extras"
 include "solr:contrib:analytics"
+include "solr:contrib:blob-directory"
 include "solr:contrib:clustering"
 include "solr:contrib:extraction"
 include "solr:contrib:langid"
diff --git a/solr/contrib/blob-directory/build.gradle b/solr/contrib/blob-directory/build.gradle
new file mode 100644
index 0000000..5fd53c5
--- /dev/null
+++ b/solr/contrib/blob-directory/build.gradle
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+apply plugin: 'java-library'
+
+description = 'A shared storage approach based on Lucene\'s Directory abstraction'
+
+dependencies {
+  implementation project(':solr:core')
+
+  testImplementation project(':solr:test-framework')
+}
diff --git a/solr/contrib/blob-directory/codeDiagram.png b/solr/contrib/blob-directory/codeDiagram.png
new file mode 100644
index 0000000..7afe3af
Binary files /dev/null and b/solr/contrib/blob-directory/codeDiagram.png differ
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
new file mode 100644
index 0000000..a94af92
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.solr.common.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BlobDirectory extends FilterDirectory {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final IOContext SYNC_IO_CONTEXT = new IOContext();
+
+  private final BlobPusher blobPusher;
+
+  /**
+   * Map of {@link BlobFileSupplier} for each file created by this directory. Keys are file names.
+   * Each {@link BlobFileSupplier} keeps a reference to the {@link IndexOutput} created for the
+   * file, to provide the checksums on {@link #sync(Collection)}. But it is able to free earlier the
+   * reference each time an {@link IndexOutput} is closed, by getting the checksum at that time.
+   */
+  private final Map<String, BlobFileSupplier> blobFileSupplierMap;
+
+  private final Set<String> synchronizedFileNames;
+
+  private final Collection<String> deletedFileNames;
+
+  private volatile boolean isOpen;
+
+  public BlobDirectory(Directory delegate, BlobPusher blobPusher) {
+    super(delegate);
+    this.blobPusher = blobPusher;
+    blobFileSupplierMap = new HashMap<>();
+    synchronizedFileNames = new HashSet<>();
+    deletedFileNames = new ArrayList<>();
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    log.debug("deleteFile {}", name);
+    in.deleteFile(name);
+    deletedFileNames.add(name);
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    log.debug("createOutput {}", name);
+    IndexOutput indexOutput = in.createOutput(name, context);
+    BlobFileSupplier blobFileSupplier = new BlobFileSupplier(indexOutput);
+    blobFileSupplierMap.put(name, blobFileSupplier);
+    return new BlobIndexOutput(indexOutput, blobFileSupplier);
+  }
+
+  // createTempOutput(): We don't track tmp files since they are not synced.
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    log.debug("sync {}", names);
+    in.sync(names);
+    synchronizedFileNames.addAll(names);
+  }
+
+  @Override
+  public void rename(String source, String dest) throws IOException {
+    log.debug("rename {} to {}", source, dest);
+    in.rename(source, dest);
+    // Also rename the corresponding BlobFile.
+    BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source);
+    if (blobFileSupplier != null) {
+      blobFileSupplier.rename(source, dest);
+      blobFileSupplierMap.put(dest, blobFileSupplier);
+    }
+    // Also rename the tracked synchronized file.
+    if (synchronizedFileNames.remove(source)) {
+      synchronizedFileNames.add(dest);
+    }
+  }
+
+  @Override
+  public void syncMetaData() throws IOException {
+    log.debug("syncMetaData");
+    in.syncMetaData();
+    syncToBlobStore();
+  }
+
+  private void syncToBlobStore() throws IOException {
+    log.debug("File names to sync {}", synchronizedFileNames);
+
+    Collection<BlobFile> writes = new ArrayList<>(synchronizedFileNames.size());
+    for (String fileName : synchronizedFileNames) {
+      BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName);
+      if (blobFileSupplier != null) {
+        // Only sync files that were synced since this directory was released. Previous files don't
+        // need to be synced.
+        writes.add(blobFileSupplier.getBlobFile());
+      }
+    }
+
+    log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames);
+    blobPusher.push(writes, this::openInputStream, deletedFileNames);
+    synchronizedFileNames.clear();
+    deletedFileNames.clear();
+  }
+
+  private InputStream openInputStream(BlobFile blobFile) throws IOException {
+    return new IndexInputInputStream(in.openInput(blobFile.fileName(), SYNC_IO_CONTEXT));
+  }
+
+  public void release() {
+    log.debug("release");
+    blobFileSupplierMap.clear();
+    synchronizedFileNames.clear();
+    deletedFileNames.clear();
+  }
+
+  // obtainLock(): We get the delegate Directory lock.
+
+  @Override
+  public void close() {
+    log.debug("close");
+    isOpen = false;
+    IOUtils.closeQuietly(in);
+    IOUtils.closeQuietly(blobPusher);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(" + in.toString() + ")";
+  }
+
+  @Override
+  protected void ensureOpen() throws AlreadyClosedException {
+    if (!isOpen) {
+      throw new AlreadyClosedException("This Directory is closed");
+    }
+  }
+
+  /**
+   * Delegating {@link IndexOutput} that hooks the {@link #close()} method to compute the checksum.
+   * The goal is to free the reference to the delegate {@link IndexOutput} when it is closed because
+   * we only need it to get the checksum.
+   */
+  private static class BlobIndexOutput extends FilterIndexOutput {
+
+    private final BlobFileSupplier blobFileSupplier;
+
+    BlobIndexOutput(IndexOutput delegate, BlobFileSupplier blobFileSupplier) {
+      super("Blob " + delegate.toString(), delegate.getName(), delegate);
+      this.blobFileSupplier = blobFileSupplier;
+    }
+
+    @Override
+    public void close() throws IOException {
+      blobFileSupplier.getBlobFileFromIndexOutput();
+      super.close();
+    }
+  }
+
+  /**
+   * Supplies the length and checksum of a file created in this directory. Keeps a reference to the
+   * file {@link IndexOutput} to be able to get its final length and checksum. However we try to
+   * free the reference as soon as we can (when the {@link IndexOutput} is closed so we know the
+   * content is final).
+   */
+  private static class BlobFileSupplier {
+
+    IndexOutput indexOutput;
+    String name;
+    BlobFile blobFile;
+
+    BlobFileSupplier(IndexOutput indexOutput) {
+      this.indexOutput = indexOutput;
+      name = indexOutput.getName();
+    }
+
+    void rename(String source, String dest) {
+      assert name.equals(source);
+      name = dest;
+      if (blobFile != null) {
+        blobFile = new BlobFile(name, blobFile.size(), blobFile.checksum());
+      }
+    }
+
+    BlobFile getBlobFile() throws IOException {
+      if (blobFile == null) {
+        getBlobFileFromIndexOutput();
+      }
+      return blobFile;
+    }
+
+    /**
+     * Gets the {@link BlobFile} of the referenced {@link IndexOutput} and then frees the reference.
+     */
+    void getBlobFileFromIndexOutput() throws IOException {
+      blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum());
+      // log.debug("Freeing IndexOutput {}", indexOutput);
+      indexOutput = null; // Free the reference since we have the checksum.
+    }
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java
new file mode 100644
index 0000000..24b6cac
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.LockFactory;
+import org.apache.lucene.store.MMapDirectory;
+import org.apache.lucene.store.NativeFSLockFactory;
+import org.apache.lucene.store.NoLockFactory;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CachingDirectoryFactory;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO now: still failing tests (run CoreSynonymLoadTest no SolrCloud) because we have to support
+// removing dir and old
+// indexes in BlobStore. See all "TODO now"
+
+public class BlobDirectoryFactory extends CachingDirectoryFactory {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private DirectoryFactory delegateFactory;
+  private String delegateLockType;
+  private String blobPath;
+
+  // Parameters for MMapDirectory
+  // TODO: Change DirectoryFactory.get() upstream to allow us to provide a Function<Directory,
+  // Directory> to wrap the
+  //  directory when it is created. This would unblock the delegation of DirectoryFactory here. And
+  // we could get rid
+  //  of these params, we could simply delegate to a delegateFactory instead.
+  private boolean unmapHack;
+  private boolean preload;
+  private int maxChunk;
+
+  @Override
+  public void initCoreContainer(CoreContainer cc) {
+    super.initCoreContainer(cc);
+    if (delegateFactory != null) {
+      delegateFactory.initCoreContainer(cc);
+    }
+    //        blobListingManager = BlobListingManager.getInstance(cc, "/blobDirListings");
+  }
+
+  @Override
+  public void init(NamedList args) {
+    super.init(args);
+    SolrParams params = args.toSolrParams();
+
+    String delegateFactoryClass = params.get("delegateFactory");
+    if (delegateFactoryClass == null) {
+      throw new IllegalArgumentException("delegateFactory class is required");
+    }
+    delegateFactory =
+        coreContainer.getResourceLoader().newInstance(delegateFactoryClass, DirectoryFactory.class);
+    delegateFactory.initCoreContainer(coreContainer);
+    delegateFactory.init(args);
+
+    delegateLockType = params.get("delegateLockType");
+    if (delegateLockType == null) {
+      throw new IllegalArgumentException("delegateLockType is required");
+    }
+
+    blobPath = params.get("blobPath");
+    if (blobPath == null) {
+      throw new IllegalArgumentException("blobPath is required");
+    }
+
+    maxChunk = params.getInt("maxChunkSize", MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
+    if (maxChunk <= 0) {
+      throw new IllegalArgumentException("maxChunk must be greater than 0");
+    }
+    unmapHack = params.getBool("unmap", true);
+    preload = params.getBool("preload", false); // default turn-off
+  }
+
+  @Override
+  public void doneWithDirectory(Directory directory) throws IOException {
+    log.debug("doneWithDirectory {}", directory);
+    ((BlobDirectory) directory).release();
+    // TODO delegateFactory.doneWithDirectory(directory);
+    super.doneWithDirectory(directory);
+  }
+
+  @Override
+  public void close() throws IOException {
+    log.debug("close");
+    // TODO delegateFactory.close();
+    super.close();
+  }
+
+  @Override
+  protected LockFactory createLockFactory(String rawLockType) throws IOException {
+    return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE)
+        ? NoLockFactory.INSTANCE
+        : NativeFSLockFactory.INSTANCE;
+    // TODO return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE) ? NoLockFactory.INSTANCE :
+    // DELEGATE_LOCK_FACTORY;
+  }
+
+  @Override
+  protected Directory create(String path, LockFactory lockFactory, DirContext dirContext)
+      throws IOException {
+    log.debug("Create Directory {}", path);
+    MMapDirectory mapDirectory = new MMapDirectory(new File(path).toPath(), lockFactory, maxChunk);
+    try {
+      mapDirectory.setUseUnmap(unmapHack);
+    } catch (IllegalArgumentException e) {
+      log.warn("Unmap not supported on this JVM, continuing on without setting unmap", e);
+    }
+    mapDirectory.setPreload(preload);
+    Directory delegateDirectory = mapDirectory;
+    // TODO
+    // String delegateLockType = lockFactory == NoLockFactory.INSTANCE ?
+    // DirectoryFactory.LOCK_TYPE_NONE : this.delegateLockType;
+    // Directory delegateDirectory = delegateFactory.get(path, dirContext, delegateLockType);
+    BlobPusher blobPusher =
+        null; // nocommit new BlobPusher(new BlobStore(blobPath, path));//TODO now: Reuse BlobPusher
+    return new BlobDirectory(delegateDirectory, blobPusher);
+  }
+
+  @Override
+  public boolean exists(String path) throws IOException {
+    boolean exists = super.exists(path);
+    log.debug("exists {} = {}", path, exists);
+    return exists;
+    // TODO return delegateFactory.exists(path);
+  }
+
+  @Override
+  protected void removeDirectory(CacheValue cacheValue) throws IOException {
+    log.debug("removeDirectory {}", cacheValue);
+    File dirFile = new File(cacheValue.path);
+    FileUtils.deleteDirectory(dirFile);
+    // TODO delegateFactory.remove(cacheValue.path);
+    BlobPusher blobPusher =
+        null; // nocommit new BlobPusher(new BlobStore(blobPath, cacheValue.path));//TODO now: Reuse
+    // BlobPusher
+    // TODO now: blobPusher.deleteDirectory();
+  }
+
+  @Override
+  public void move(Directory fromDir, Directory toDir, String fileName, IOContext ioContext)
+      throws IOException {
+    // TODO: override for efficiency?
+    log.debug("move {} {} to {}", fromDir, fileName, toDir);
+    super.move(fromDir, toDir, fileName, ioContext);
+  }
+
+  @Override
+  public void renameWithOverwrite(Directory dir, String fileName, String toName)
+      throws IOException {
+    // TODO: override to perform an atomic rename if possible?
+    log.debug("renameWithOverwrite {} {} to {}", dir, fileName, toName);
+    super.renameWithOverwrite(dir, fileName, toName);
+  }
+
+  @Override
+  public boolean isPersistent() {
+    return true;
+  }
+
+  @Override
+  public boolean isSharedStorage() {
+    return true;
+  }
+
+  @Override
+  public void release(Directory directory) throws IOException {
+    log.debug("release {}", directory);
+    ((BlobDirectory) directory).release();
+    // TODO delegateFactory.release(directory);
+    super.release(directory);
+  }
+
+  @Override
+  public boolean isAbsolute(String path) {
+    boolean isAbsolute = new File(path).isAbsolute();
+    log.debug("isAbsolute {} = {}", path, isAbsolute);
+    return isAbsolute;
+    // TODO return delegateFactory.isAbsolute(path);
+  }
+
+  @Override
+  public boolean searchersReserveCommitPoints() {
+    return false; // TODO: double check
+  }
+
+  @Override
+  public String getDataHome(CoreDescriptor cd) throws IOException {
+    String dataHome = super.getDataHome(cd);
+    log.debug("getDataHome {}", dataHome);
+    return dataHome;
+  }
+
+  @Override
+  public void cleanupOldIndexDirectories(
+      final String dataDirPath, final String currentIndexDirPath, boolean afterCoreReload) {
+    log.debug("cleanupOldIndexDirectories {} {}", dataDirPath, currentIndexDirPath);
+    super.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath, afterCoreReload);
+    // TODO now: cleanup with BlobPusher
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java
new file mode 100644
index 0000000..6763483
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobFile.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.util.Objects;
+
+/** A file in Blob, consisting of a name, size, and checksum. */
+public class BlobFile {
+  protected final String fileName;
+  protected final long size;
+  protected final long checksum;
+
+  public BlobFile(String fileName, long size, long checksum) {
+    this.fileName = fileName;
+    this.size = size;
+    this.checksum = checksum;
+  }
+
+  public String fileName() {
+    return fileName;
+  }
+
+  public long size() {
+    return size;
+  }
+
+  public long checksum() {
+    return checksum;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof BlobFile)) return false;
+    BlobFile impl = (BlobFile) o;
+    return size == impl.size && checksum == impl.checksum && fileName.equals(impl.fileName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fileName, size, checksum);
+  }
+
+  @Override
+  public String toString() {
+    return fileName + " size=" + size + " chk=" + checksum;
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java
new file mode 100644
index 0000000..bacf616
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobListing.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** A listing of files with references to files in other listings. Lucene/Solr agnostic. */
+public class BlobListing {
+
+  public static final class LocalEntry {
+    private final BlobFile blobFile;
+    /**
+     * Set of relative paths to other listings that refer to this file. The current listing is "."
+     * and is always first if present.
+     */
+    private final List<String> references;
+
+    public LocalEntry(BlobFile blobFile, List<String> references) {
+      this.blobFile = blobFile;
+      this.references = references;
+    }
+
+    public BlobFile getBlobFile() {
+      return blobFile;
+    }
+
+    public boolean localDeleted() {
+      return references.get(0).equals(".");
+    }
+
+    public List<String> references() {
+      return references;
+    }
+
+    @Override
+    public String toString() {
+      return "LocalEntry{" + "blobFile=" + blobFile + ", references=" + references + '}';
+    }
+
+    public LocalEntry copyWithRef(String path) {
+      final int idx = Collections.binarySearch(references, path);
+      assert idx < 0;
+      final int insertionPoint = -idx - 1;
+      ArrayList<String> newList = new ArrayList<>(references.size() + 1);
+      newList.add(insertionPoint, path);
+      return new LocalEntry(blobFile, newList);
+    }
+  }
+
+  public static final class RefEntry {
+    private final String fileName;
+    private final String sourcePath;
+
+    public RefEntry(String fileName, String sourcePath) {
+      this.fileName = fileName;
+      this.sourcePath = sourcePath;
+    }
+
+    public String fileName() {
+      return fileName;
+    }
+
+    public String sourcePath() {
+      return sourcePath;
+    }
+
+    @Override
+    public String toString() {
+      return "RefEntry{"
+          + "fileName='"
+          + fileName
+          + '\''
+          + ", sourcePath='"
+          + sourcePath
+          + '\''
+          + '}';
+    }
+  }
+
+  public static BlobListing fromJson(byte[] bytes) {
+    throw new UnsupportedOperationException("TODO"); // TODO
+  }
+
+  public byte[] toJson() {
+    throw new UnsupportedOperationException("TODO"); // TODO
+  }
+
+  private final Map<BlobFile, LocalEntry> localFiles;
+  private final Map<String, RefEntry> refFiles;
+
+  public BlobListing(Map<BlobFile, LocalEntry> localFiles, Map<String, RefEntry> refFiles) {
+    this.localFiles = localFiles;
+    this.refFiles = refFiles;
+
+    final Set<String> refFileSet = refFiles.keySet();
+    assert localFiles.keySet().stream().noneMatch(bf -> refFileSet.contains(bf.fileName()));
+    // TODO assert sorted
+  }
+
+  public LocalEntry lookupLocalEntry(BlobFile blobFile) {
+    return localFiles.get(blobFile);
+  }
+
+  public BlobFile lookupLocalBlobFile(String fileName) {
+    for (BlobFile blobFile : localFiles.keySet()) {
+      if (blobFile.fileName().equals(fileName)) {
+        return blobFile;
+      }
+    }
+    return null;
+  }
+
+  public RefEntry lookupRemoteEntry(String fileName) {
+    return refFiles.get(fileName);
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java
new file mode 100644
index 0000000..324ca99
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Pushes a set of files to Blob, and works with listings. */
+public class BlobPusher implements Closeable {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // WORK IN PROGRESS!!
+
+  private final BlobStore blobStore;
+  private final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("blobPusher");
+
+  public BlobPusher(BlobStore blobStore) {
+    this.blobStore = blobStore;
+  }
+
+  public void push(
+      Collection<BlobFile> writes,
+      IOUtils.IOFunction<BlobFile, InputStream> inputStreamSupplier,
+      Collection<String> deletes)
+      throws IOException {
+
+    // update "foreign" listings
+    //      TODO David
+
+    // send files to BlobStore and delete our files too
+    log.debug("Pushing {}", writes);
+    executeAll(pushFiles(writes, inputStreamSupplier));
+    log.debug("Deleting {}", deletes);
+    deleteFiles(deletes);
+
+    // update "our" listing
+    //      TODO David
+  }
+
+  private void executeAll(List<Callable<Void>> actions) throws IOException {
+    try {
+      for (Future<Void> future : executor.invokeAll(actions)) {
+        future.get();
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException(e);
+    } catch (ExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private List<Callable<Void>> pushFiles(
+      Collection<BlobFile> blobFiles,
+      IOUtils.IOFunction<BlobFile, InputStream> inputStreamSupplier) {
+    return blobFiles.stream()
+        .map(
+            (blobFile) ->
+                (Callable<Void>)
+                    () -> {
+                      try (InputStream in = inputStreamSupplier.apply(blobFile)) {
+                        blobStore.create(blobFile.fileName(), in, blobFile.size());
+                      }
+                      return null;
+                    })
+        .collect(Collectors.toList());
+  }
+
+  private void deleteFiles(Collection<String> fileNames) throws IOException {
+    blobStore.delete(fileNames);
+  }
+
+  @Override
+  public void close() {
+    // TODO
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java
new file mode 100644
index 0000000..b411d6d
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+
+public abstract class BlobStore implements Closeable {
+
+  public abstract void create(String fileName, InputStream inputStream, long contentLength);
+
+  /** Delete blob path */
+  public abstract void delete(Collection<String> fileNames);
+
+  /** Read stream from blob path */
+  public abstract InputStream read(String blobPath);
+
+  /** List blob files */
+  public abstract List<String> list(String blobPath);
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java
new file mode 100644
index 0000000..ff9331d
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.IndexOutput;
+
+public class FilterIndexOutput extends IndexOutput {
+
+  protected final IndexOutput delegate;
+
+  public FilterIndexOutput(String resourceDescription, String name, IndexOutput delegate) {
+    super(resourceDescription, name);
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public long getFilePointer() {
+    return delegate.getFilePointer();
+  }
+
+  @Override
+  public long getChecksum() throws IOException {
+    return delegate.getChecksum();
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    delegate.writeByte(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    delegate.writeBytes(b, offset, length);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int length) throws IOException {
+    delegate.writeBytes(b, length);
+  }
+
+  @Override
+  public void writeInt(int i) throws IOException {
+    delegate.writeInt(i);
+  }
+
+  @Override
+  public void writeShort(short i) throws IOException {
+    delegate.writeShort(i);
+  }
+
+  @Override
+  public void writeLong(long i) throws IOException {
+    delegate.writeLong(i);
+  }
+
+  @Override
+  public void writeString(String s) throws IOException {
+    delegate.writeString(s);
+  }
+
+  @Override
+  public void copyBytes(DataInput input, long numBytes) throws IOException {
+    delegate.copyBytes(input, numBytes);
+  }
+
+  @Override
+  public void writeMapOfStrings(Map<String, String> map) throws IOException {
+    delegate.writeMapOfStrings(map);
+  }
+
+  @Override
+  public void writeSetOfStrings(Set<String> set) throws IOException {
+    delegate.writeSetOfStrings(set);
+  }
+}
diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java
new file mode 100644
index 0000000..4dc1adc
--- /dev/null
+++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/IndexInputInputStream.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.blob;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.lucene.store.IndexInput;
+
+/**
+ * An {@link InputStream} which wraps an {@link IndexInput}. Exact copy of {@code
+ * org.apache.lucene.replicator.IndexInputInputStream}.
+ */
+public final class IndexInputInputStream extends InputStream {
+
+  private final IndexInput in;
+
+  private long remaining;
+
+  public IndexInputInputStream(IndexInput in) {
+    this.in = in;
+    remaining = in.length();
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (remaining == 0) {
+      return -1;
+    } else {
+      --remaining;
+      return in.readByte();
+    }
+  }
+
+  @Override
+  public int available() throws IOException {
+    return (int) in.length();
+  }
+
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (remaining == 0) {
+      return -1;
+    }
+    if (remaining < len) {
+      len = (int) remaining;
+    }
+    in.readBytes(b, off, len);
+    remaining -= len;
+    return len;
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    if (remaining == 0) {
+      return -1;
+    }
+    if (remaining < n) {
+      n = remaining;
+    }
+    in.seek(in.getFilePointer() + n);
+    remaining -= n;
+    return n;
+  }
+}
diff --git a/solr/packaging/build.gradle b/solr/packaging/build.gradle
index 27e8e3a..d3aacde 100644
--- a/solr/packaging/build.gradle
+++ b/solr/packaging/build.gradle
@@ -46,6 +46,7 @@ dependencies {
 
   [":solr:contrib:analysis-extras",
    ":solr:contrib:analytics",
+   ":solr:contrib:blob-directory",
    ":solr:contrib:extraction",
    ":solr:contrib:clustering",
    ":solr:contrib:jaegertracer-configurator",