You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/10/30 20:45:13 UTC
[lucene-solr] 08/11: @W-6684857 indentation cleanup,
precommit error fixes, remove junit use in non-test classes (#400)
This is an automated email from the ASF dual-hosted git repository.
yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 535845327c1f0a9b848a723bddcb4dbb5a22ee93
Author: Andy Vuong <a....@salesforce.com>
AuthorDate: Thu Oct 10 18:18:04 2019 -0400
@W-6684857 indentation cleanup, precommit error fixes, remove junit use in non-test classes (#400)
* Fix indentation in the remaining ported classes
* Fix all precommit failures and remove junit from non-test classes
* Remove MetadataResolverTest which is no longer relevant
---
solr/core/ivy.xml | 5 +-
.../store/blob/DeletedCorruptCoreException.java | 6 +-
.../solr/store/blob/PullInProgressException.java | 6 +-
.../store/blob/client/BlobClientException.java | 20 +-
.../solr/store/blob/client/BlobCoreMetadata.java | 490 +++++++--------
.../store/blob/client/BlobCoreMetadataBuilder.java | 164 ++---
.../solr/store/blob/client/BlobException.java | 18 +-
.../store/blob/client/BlobServiceException.java | 18 +-
.../solr/store/blob/client/CoreStorageClient.java | 248 ++++----
.../apache/solr/store/blob/client/ToFromJson.java | 32 +-
.../solr/store/blob/metadata/BlobCoreSyncer.java | 662 ++++++++++-----------
.../store/blob/metadata/ServerSideMetadata.java | 406 ++++++-------
.../blob/metadata/SharedStoreResolutionUtil.java | 4 +-
.../solr/store/blob/process/BlobDeleteManager.java | 5 +
.../solr/store/blob/process/CoreSyncFeeder.java | 189 +++---
.../solr/store/blob/util/BlobStoreUtils.java | 4 +-
.../solr/store/blob/util/DeduplicatingList.java | 294 ++++-----
.../processor/DistributedZkUpdateProcessor.java | 3 +-
.../solr/store/blob/metadata/CorePushPullTest.java | 1 +
.../metadata/SharedStoreResolutionUtilTest.java | 8 +-
.../solr/store/blob/util/BlobStoreUtilsTest.java | 3 +-
.../SharedShardMetadataControllerTest.java | 3 +-
.../DistributedZkUpdateProcessorTest.java | 3 +-
23 files changed, 1296 insertions(+), 1296 deletions(-)
diff --git a/solr/core/ivy.xml b/solr/core/ivy.xml
index a09196c..eb6dc60 100644
--- a/solr/core/ivy.xml
+++ b/solr/core/ivy.xml
@@ -154,9 +154,6 @@
<dependency org="software.amazon.ion" name="ion-java" rev="${/software.amazon.ion/ion-java}" conf="compile"/>
<dependency org="com.google.code.gson" name="gson" rev="${/com.google/gson}" conf="compile"/>
- <!-- Junit -->
- <dependency org="junit" name="junit" rev="4.13-beta-1"/>
-
<exclude org="*" ext="*" matcher="regexp" type="${ivy.exclude.types}"/>
</dependencies>
-</ivy-module>
+</ivy-module>
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/store/blob/DeletedCorruptCoreException.java b/solr/core/src/java/org/apache/solr/store/blob/DeletedCorruptCoreException.java
index a960c3f..ce7cf19 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/DeletedCorruptCoreException.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/DeletedCorruptCoreException.java
@@ -21,7 +21,7 @@ package org.apache.solr.store.blob;
* another version from Blob Store.
*/
public class DeletedCorruptCoreException extends Exception {
- public DeletedCorruptCoreException(String message) {
- super(message);
- }
+ public DeletedCorruptCoreException(String message) {
+ super(message);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/PullInProgressException.java b/solr/core/src/java/org/apache/solr/store/blob/PullInProgressException.java
index 20460db..9181e60 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/PullInProgressException.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/PullInProgressException.java
@@ -21,7 +21,7 @@ package org.apache.solr.store.blob;
* available so can't be used and request can't be handled.
*/
public class PullInProgressException extends Exception {
- public PullInProgressException(String message) {
- super(message);
- }
+ public PullInProgressException(String message) {
+ super(message);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/BlobClientException.java b/solr/core/src/java/org/apache/solr/store/blob/client/BlobClientException.java
index 0f5387b..e5c1b3b 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/BlobClientException.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/BlobClientException.java
@@ -20,19 +20,19 @@ package org.apache.solr.store.blob.client;
*
* Extension of BlobException that represents an error on the client-side when attempting to make
* a request to the underlying blob store service. An exception of this type indicates that the
- * client was unable to successfully make the service call.
+ * client was unable to successfully make the service call.
*/
public class BlobClientException extends BlobException {
- public BlobClientException(Throwable cause) {
- super(cause);
- }
+ public BlobClientException(Throwable cause) {
+ super(cause);
+ }
- public BlobClientException(String message) {
- super(message);
- }
+ public BlobClientException(String message) {
+ super(message);
+ }
- public BlobClientException(String message, Throwable cause) {
- super(message, cause);
- }
+ public BlobClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java
index 8bf2f2a..dc36dc9 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadata.java
@@ -14,287 +14,287 @@ import java.util.UUID;
*/
public class BlobCoreMetadata {
+ /**
+ * Name of the shard index data that is shared by all replicas belonging to that shard. This
+ * name is to decouple the core name that Solr manages from the name of the core on blob store.
+ */
+ private final String sharedBlobName;
+
+ /**
+ * Generation number of index represented by this metadata.
+ * This generation number is only meant to identify a scenario where local index generation number is higher than
+ * what we have in blob. In that scenario we would switch index to a new directory when pulling contents from blob.
+ * Because in the presence of higher generation number locally, blob contents cannot establish their legitimacy.
+ */
+ private final long generation;
+
+ /**
+ * Unique identifier of this metadata, that changes on every update to the metadata (except generating a new corrupt metadata
+ * through {@link #getCorruptOf}).
+ */
+ private final String uniqueIdentifier;
+
+ /**
+ * Indicates that a Solr (search) server pulled this core and was then unable to open or use it. This flag is used as
+ * an indication to servers pushing blobs for that core into Blob Store to push a complete set of files if they have
+ * a locally working copy rather than just diffs (files missing on Blob Store).
+ */
+ private final boolean isCorrupt;
+
+ /**
+ * Indicates that this core has been deleted by the client. This flag is used as a marker to prevent other servers
+ * from pushing their version of this core to blob and to allow local copy cleanup.
+ */
+ private final boolean isDeleted;
+
+ /**
+ * The array of files that constitute the current commit point of the core (as known by the Blob store).
+ * This array is not ordered! There are no duplicate entries in it either (see how it's built in {@link BlobCoreMetadataBuilder}).
+ */
+ private final BlobFile[] blobFiles;
+
+ /**
+ * Files marked for delete but not yet removed from the Blob store. Each such file contains information indicating when
+ * it was marked for delete so we can actually remove the corresponding blob (and the entry from this array in the metadata)
+ * when it's safe to do so even if there are (unexpected) conflicting updates to the blob store by multiple solr servers...
+ * TODO: we might want to separate the metadata blob with the deletes as it's not required to always fetch the delete list when checking freshness of local core...
+ */
+ private final BlobFileToDelete[] blobFilesToDelete;
+
+ /**
+ * This is the constructor called by {@link BlobCoreMetadataBuilder}.
+ * It always builds non "isCorrupt" and non "isDeleted" metadata.
+ * The only way to build an instance of "isCorrupt" metadata is to use {@link #getCorruptOf} and for "isDeleted" use {@link #getDeletedOf()}
+ */
+ BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete, long generation) {
+ this(sharedBlobName, blobFiles, blobFilesToDelete, generation, UUID.randomUUID().toString(), false,
+ false);
+ }
+
+ private BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete, long generation,
+ String uniqueIdentifier, boolean isCorrupt, boolean isDeleted) {
+ this.sharedBlobName = sharedBlobName;
+ this.blobFiles = blobFiles;
+ this.blobFilesToDelete = blobFilesToDelete;
+ this.generation = generation;
+ this.uniqueIdentifier = uniqueIdentifier;
+ this.isCorrupt = isCorrupt;
+ this.isDeleted = isDeleted;
+ }
+
+ /**
+ * Given a non corrupt {@link BlobCoreMetadata} instance, creates an equivalent one based on it but marked as corrupt.<p>
+ * The new instance keeps all the rest of the metadata unchanged, including the {@link #uniqueIdentifier}.
+ */
+ public BlobCoreMetadata getCorruptOf() {
+ assert !isCorrupt;
+ return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, generation, uniqueIdentifier, true, isDeleted);
+ }
+
+ /**
+ * Given a {@link BlobCoreMetadata} instance, creates an equivalent one based on it but marked as deleted.
+ * <p>
+ * The new instance keeps all the rest of the metadata unchanged, including the {@link #uniqueIdentifier}.
+ */
+ public BlobCoreMetadata getDeletedOf() {
+ assert !isDeleted;
+ return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, generation, uniqueIdentifier, isCorrupt, true);
+ }
+
+ /**
+ * Returns true if the Blob metadata was marked as deleted
+ */
+ public boolean getIsDeleted() {
+ return isDeleted;
+ }
+
+ /**
+ * Returns the core name corresponding to this metadata
+ */
+ public String getSharedBlobName() {
+ return sharedBlobName;
+ }
+
+ /**
+ * Returns true if the Blob metadata was marked as corrupt. In which case, the core should not be pulled from the Blob Store
+ * as it is useless.
+ */
+ public boolean getIsCorrupt() {
+ return isCorrupt;
+ }
+
+ /**
+ * Unique identifier of this blob core metadata. Allows quickly seeing that the core metadata has changed without comparing
+ * the whole content.<p>
+ * {@link #getCorruptOf()} is the only call allowing the creation of two instances of {@link BlobCoreMetadata} having
+ * the same unique identifier.
+ */
+ public String getUniqueIdentifier() {
+ return uniqueIdentifier;
+ }
+
+ public long getGeneration() {
+ return this.generation;
+ }
+
+ public BlobFile[] getBlobFiles() {
+ return blobFiles;
+ }
+
+ public BlobFileToDelete[] getBlobFilesToDelete() {
+ return blobFilesToDelete;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ BlobCoreMetadata that = (BlobCoreMetadata) o;
+
+ if (this.generation != that.generation) return false;
+ if (this.isCorrupt != that.isCorrupt) return false;
+ if (this.isDeleted != that.isDeleted) return false;
+ if (!this.uniqueIdentifier.equals(that.uniqueIdentifier)) return false;
+ if (!this.sharedBlobName.equals(that.sharedBlobName)) return false;
+
+ // blobFiles array is not ordered so not using Arrays.equals here but rather Set comparison (we also know all elements are distinct in the array)
+ Set<BlobFile> thisFiles = new HashSet<>(Arrays.asList(this.blobFiles));
+ Set<BlobFile> thatFiles = new HashSet<>(Arrays.asList(that.blobFiles));
+ if (!thisFiles.equals(thatFiles)) return false;
+
+ // same for the conf files
+ Set<BlobFileToDelete> thisFilesToDelete = new HashSet<>(Arrays.asList(this.blobFilesToDelete));
+ Set<BlobFileToDelete> thatFilesToDelete = new HashSet<>(Arrays.asList(that.blobFilesToDelete));
+ return thisFilesToDelete.equals(thatFilesToDelete);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sharedBlobName, uniqueIdentifier, generation,
+ // The array of files is not ordered so need to compare as a set
+ new HashSet<>(Arrays.asList(this.blobFiles)).hashCode(),
+ new HashSet<>(Arrays.asList(this.blobFilesToDelete)).hashCode(),
+ isCorrupt, isDeleted);
+ }
+
+ @Override
+ public String toString() {
+ return "sharedBlobName=" + sharedBlobName + " generation=" + generation
+ + " isCorrupt=" + isCorrupt + " uniqueIdentifier=" + uniqueIdentifier;
+ }
+
+ /**
+ * A file (or blob) stored in the blob store.
+ */
+ public static class BlobFile {
/**
- * Name of the shard index data that is shared by all replicas belonging to that shard. This
- * name is to decouple the core name that Solr manages from the name of the core on blob store.
+ * Name the file should have on a Solr server retrieving it, not including the core specific part of the filename (i.e. the path)
*/
- private final String sharedBlobName;
+ private final String solrFileName;
/**
- * Generation number of index represented by this metadata.
- * This generation number is only meant to identify a scenario where local index generation number is higher than
- * what we have in blob. In that scenario we would switch index to a new directory when pulling contents from blob.
- * Because in the presence of higher generation number locally, blob contents cannot establish their legitimacy.
+ * Name of the blob representing the file on the blob store. This will initially be an absolute path on the Blob
+ * server (for compatibility with {@link org.apache.solr.store.blob.client.LocalStorageClient}) but eventually might not include
+ * the core name if cores are organized into per core S3 buckets).
*/
- private final long generation;
+ private final String blobName;
- /**
- * Unique identifier of this metadata, that changes on every update to the metadata (except generating a new corrupt metadata
- * through {@link #getCorruptOf}).
- */
- private final String uniqueIdentifier;
-
- /**
- * Indicates that a Solr (search) server pulled this core and was then unable to open or use it. This flag is used as
- * an indication to servers pushing blobs for that core into Blob Store to push a complete set of files if they have
- * a locally working copy rather than just diffs (files missing on Blob Store).
- */
- private final boolean isCorrupt;
-
- /**
- * Indicates that this core has been deleted by the client. This flag is used as a marker to prevent other servers
- * from pushing their version of this core to blob and to allow local copy cleanup.
- */
- private final boolean isDeleted;
-
- /**
- * The array of files that constitute the current commit point of the core (as known by the Blob store).
- * This array is not ordered! There are no duplicate entries in it either (see how it's built in {@link BlobCoreMetadataBuilder}).
- */
- private final BlobFile[] blobFiles;
+ private final long fileSize;
/**
- * Files marked for delete but not yet removed from the Blob store. Each such file contains information indicating when
- * it was marked for delete so we can actually remove the corresponding blob (and the entry from this array in the metadata)
- * when it's safe to do so even if there are (unexpected) conflicting updates to the blob store by multiple solr servers...
- * TODO: we might want to separate the metadata blob with the deletes as it's not required to always fetch the delete list when checking freshness of local core...
+ * Lucene generated checksum of the file. It is used in addition to file size to compare local and blob files.
*/
- private final BlobFileToDelete[] blobFilesToDelete;
+ private final long checksum;
- /**
- * This is the constructor called by {@link BlobCoreMetadataBuilder}.
- * It always builds non "isCorrupt" and non "isDeleted" metadata.
- * The only way to build an instance of "isCorrupt" metadata is to use {@link #getCorruptOf} and for "isDeleted" use {@link #getDeletedOf()}
- */
- BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete, long generation) {
- this(sharedBlobName, blobFiles, blobFilesToDelete, generation, UUID.randomUUID().toString(), false,
- false);
+ public BlobFile(String solrFileName, String blobName, long fileSize, long checksum) {
+ this.solrFileName = solrFileName;
+ this.blobName = blobName;
+ this.fileSize = fileSize;
+ this.checksum = checksum;
}
- private BlobCoreMetadata(String sharedBlobName, BlobFile[] blobFiles, BlobFileToDelete[] blobFilesToDelete, long generation,
- String uniqueIdentifier, boolean isCorrupt, boolean isDeleted) {
- this.sharedBlobName = sharedBlobName;
- this.blobFiles = blobFiles;
- this.blobFilesToDelete = blobFilesToDelete;
- this.generation = generation;
- this.uniqueIdentifier = uniqueIdentifier;
- this.isCorrupt = isCorrupt;
- this.isDeleted = isDeleted;
- }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
- /**
- * Given a non corrupt {@link BlobCoreMetadata} instance, creates an equivalent one based on it but marked as corrupt.<p>
- * The new instance keeps all the rest of the metadata unchanged, including the {@link #uniqueIdentifier}.
- */
- public BlobCoreMetadata getCorruptOf() {
- assert !isCorrupt;
- return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, generation, uniqueIdentifier, true, isDeleted);
- }
+ BlobFile other = (BlobFile) o;
- /**
- * Given a {@link BlobCoreMetadata} instance, creates an equivalent one based on it but marked as deleted.
- * <p>
- * The new instance keeps all the rest of the metadata unchanged, including the {@link #uniqueIdentifier}.
- */
- public BlobCoreMetadata getDeletedOf() {
- assert !isDeleted;
- return new BlobCoreMetadata(sharedBlobName, blobFiles, blobFilesToDelete, generation, uniqueIdentifier, isCorrupt, true);
+ return Objects.equals(solrFileName, other.solrFileName) &&
+ Objects.equals(blobName, other.blobName) &&
+ Objects.equals(checksum, other.checksum) &&
+ Objects.equals(fileSize, other.fileSize);
}
- /**
- * Returns true if the Blob metadata was marked as deleted
- */
- public boolean getIsDeleted() {
- return isDeleted;
+ @Override
+ public int hashCode() {
+ return Objects.hash(solrFileName, blobName, fileSize, checksum);
}
- /**
- * Returns the core name corresponding to this metadata
- */
- public String getSharedBlobName() {
- return sharedBlobName;
+ public String getSolrFileName() {
+ return this.solrFileName;
}
- /**
- * Returns true if the Blob metadata was marked as corrupt. In which case, the core should not be pulled from the Blob Store
- * as it is useless.
- */
- public boolean getIsCorrupt() {
- return isCorrupt;
+ public String getBlobName() {
+ return this.blobName;
}
- /**
- * Unique identifier of this blob core metadata. Allows quickly seeing that the core metadata has changed without comparing
- * the whole content.<p>
- * {@link #getCorruptOf()} is the only call allowing the creation of two instances of {@link BlobCoreMetadata} having
- * the same unique identifier.
- */
- public String getUniqueIdentifier() {
- return uniqueIdentifier;
+ public long getFileSize() {
+ return this.fileSize;
}
- public long getGeneration() {
- return this.generation;
+ public long getChecksum() {
+ return this.checksum;
}
-
- public BlobFile[] getBlobFiles() {
- return blobFiles;
+ }
+
+ /**
+ * A file (or blob) stored in the blob store that should be deleted (after a certain "delay" to make sure it's not used
+ * by a conflicting update to the core metadata on the Blob...).
+ */
+ public static class BlobFileToDelete extends BlobFile {
+
+ // TODO using the delete timestamp for now but likely need something else:
+ // deleted sequence number allows letting multiple sequence numbers to pass before deleting, whereas
+ // an old delete (as told by its timestamp) might still be the latest update to the core if not a lot of indexing
+ // activity, so hard to judge a delete is safe based on this. Possibly delete sequence and delete timestamp are
+ // the safest bet, covering both cases of very high indexing activity cores (we might want to wait until timestamp
+ // ages a bit given sequence number can increase quickly yet we could have a race with a server doing a slow update)
+ // as well as slowly updating cores (if delete date is a week ago, even if sequence number hasn't changed, the
+ // likelyhood of a really really slow update by another server causing a race is low).
+ private final long deletedAt;
+
+ public BlobFileToDelete(String solrFileName, String blobName, long fileSize, long checksum, long deletedAt) {
+ super(solrFileName, blobName, fileSize, checksum);
+
+ this.deletedAt = deletedAt;
}
- public BlobFileToDelete[] getBlobFilesToDelete() {
- return blobFilesToDelete;
+ public BlobFileToDelete(BlobFile bf, long deletedAt) {
+ super(bf.solrFileName, bf.blobName, bf.fileSize, bf.checksum);
+
+ this.deletedAt = deletedAt;
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- BlobCoreMetadata that = (BlobCoreMetadata) o;
-
- if (this.generation != that.generation) return false;
- if (this.isCorrupt != that.isCorrupt) return false;
- if (this.isDeleted != that.isDeleted) return false;
- if (!this.uniqueIdentifier.equals(that.uniqueIdentifier)) return false;
- if (!this.sharedBlobName.equals(that.sharedBlobName)) return false;
-
- // blobFiles array is not ordered so not using Arrays.equals here but rather Set comparison (we also know all elements are distinct in the array)
- Set<BlobFile> thisFiles = new HashSet<>(Arrays.asList(this.blobFiles));
- Set<BlobFile> thatFiles = new HashSet<>(Arrays.asList(that.blobFiles));
- if (!thisFiles.equals(thatFiles)) return false;
-
- // same for the conf files
- Set<BlobFileToDelete> thisFilesToDelete = new HashSet<>(Arrays.asList(this.blobFilesToDelete));
- Set<BlobFileToDelete> thatFilesToDelete = new HashSet<>(Arrays.asList(that.blobFilesToDelete));
- return thisFilesToDelete.equals(thatFilesToDelete);
- }
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
- @Override
- public int hashCode() {
- return Objects.hash(sharedBlobName, uniqueIdentifier, generation,
- // The array of files is not ordered so need to compare as a set
- new HashSet<>(Arrays.asList(this.blobFiles)).hashCode(),
- new HashSet<>(Arrays.asList(this.blobFilesToDelete)).hashCode(),
- isCorrupt, isDeleted);
- }
+ BlobFileToDelete other = (BlobFileToDelete) o;
- @Override
- public String toString() {
- return "sharedBlobName=" + sharedBlobName + " generation=" + generation
- + " isCorrupt=" + isCorrupt + " uniqueIdentifier=" + uniqueIdentifier;
+ return deletedAt == other.deletedAt;
}
- /**
- * A file (or blob) stored in the blob store.
- */
- public static class BlobFile {
- /**
- * Name the file should have on a Solr server retrieving it, not including the core specific part of the filename (i.e. the path)
- */
- private final String solrFileName;
-
- /**
- * Name of the blob representing the file on the blob store. This will initially be an absolute path on the Blob
- * server (for compatibility with {@link org.apache.solr.store.blob.client.LocalStorageClient}) but eventually might not include
- * the core name if cores are organized into per core S3 buckets).
- */
- private final String blobName;
-
- private final long fileSize;
-
- /**
- * Lucene generated checksum of the file. It is used in addition to file size to compare local and blob files.
- */
- private final long checksum;
-
- public BlobFile(String solrFileName, String blobName, long fileSize, long checksum) {
- this.solrFileName = solrFileName;
- this.blobName = blobName;
- this.fileSize = fileSize;
- this.checksum = checksum;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- BlobFile other = (BlobFile) o;
-
- return Objects.equals(solrFileName, other.solrFileName) &&
- Objects.equals(blobName, other.blobName) &&
- Objects.equals(checksum, other.checksum) &&
- Objects.equals(fileSize, other.fileSize);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(solrFileName, blobName, fileSize, checksum);
- }
-
- public String getSolrFileName() {
- return this.solrFileName;
- }
-
- public String getBlobName() {
- return this.blobName;
- }
-
- public long getFileSize() {
- return this.fileSize;
- }
-
- public long getChecksum() {
- return this.checksum;
- }
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), deletedAt);
}
- /**
- * A file (or blob) stored in the blob store that should be deleted (after a certain "delay" to make sure it's not used
- * by a conflicting update to the core metadata on the Blob...).
- */
- public static class BlobFileToDelete extends BlobFile {
-
- // TODO using the delete timestamp for now but likely need something else:
- // deleted sequence number allows letting multiple sequence numbers to pass before deleting, whereas
- // an old delete (as told by its timestamp) might still be the latest update to the core if not a lot of indexing
- // activity, so hard to judge a delete is safe based on this. Possibly delete sequence and delete timestamp are
- // the safest bet, covering both cases of very high indexing activity cores (we might want to wait until timestamp
- // ages a bit given sequence number can increase quickly yet we could have a race with a server doing a slow update)
- // as well as slowly updating cores (if delete date is a week ago, even if sequence number hasn't changed, the
- // likelyhood of a really really slow update by another server causing a race is low).
- private final long deletedAt;
-
- public BlobFileToDelete(String solrFileName, String blobName, long fileSize, long checksum, long deletedAt) {
- super(solrFileName, blobName, fileSize, checksum);
-
- this.deletedAt = deletedAt;
- }
-
- public BlobFileToDelete(BlobFile bf, long deletedAt) {
- super(bf.solrFileName, bf.blobName, bf.fileSize, bf.checksum);
-
- this.deletedAt = deletedAt;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
-
- BlobFileToDelete other = (BlobFileToDelete) o;
-
- return deletedAt == other.deletedAt;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), deletedAt);
- }
-
- public long getDeletedAt() {
- return this.deletedAt;
- }
+ public long getDeletedAt() {
+ return this.deletedAt;
}
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java
index bc43464..503cbf0 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/BlobCoreMetadataBuilder.java
@@ -6,98 +6,98 @@ import java.util.*;
* Builder for {@link BlobCoreMetadata}.
*/
public class BlobCoreMetadataBuilder {
- /**
- * Generation number in metadata of cores not existing on the Blob Store.
- */
- public static final long UNDEFINED_VALUE = -1L;
+ /**
+ * Generation number in metadata of cores not existing on the Blob Store.
+ */
+ public static final long UNDEFINED_VALUE = -1L;
- final private String sharedBlobName;
- final private long generation;
- final private Set<BlobCoreMetadata.BlobFile> blobFiles;
- final private Set<BlobCoreMetadata.BlobFileToDelete> blobFilesToDelete;
+ final private String sharedBlobName;
+ final private long generation;
+ final private Set<BlobCoreMetadata.BlobFile> blobFiles;
+ final private Set<BlobCoreMetadata.BlobFileToDelete> blobFilesToDelete;
- public BlobCoreMetadataBuilder(String sharedBlobName, long generation) {
- this.sharedBlobName = sharedBlobName;
- this.generation= generation;
- this.blobFiles = new HashSet<>();
- this.blobFilesToDelete = new HashSet<>();
- }
+ public BlobCoreMetadataBuilder(String sharedBlobName, long generation) {
+ this.sharedBlobName = sharedBlobName;
+ this.generation= generation;
+ this.blobFiles = new HashSet<>();
+ this.blobFilesToDelete = new HashSet<>();
+ }
- /**
- * Builder used for "cloning" then modifying an existing instance of {@link BlobCoreMetadata}.
- * The new generation has to be passed in because it is final and can't be set later.
- */
- public BlobCoreMetadataBuilder(BlobCoreMetadata bcm, long generation) {
- this.sharedBlobName = bcm.getSharedBlobName();
- this.generation = generation;
- this.blobFiles = new HashSet<>(Arrays.asList(bcm.getBlobFiles()));
- this.blobFilesToDelete = new HashSet<>(Arrays.asList(bcm.getBlobFilesToDelete()));
- }
+ /**
+ * Builder used for "cloning" then modifying an existing instance of {@link BlobCoreMetadata}.
+ * The new generation has to be passed in because it is final and can't be set later.
+ */
+ public BlobCoreMetadataBuilder(BlobCoreMetadata bcm, long generation) {
+ this.sharedBlobName = bcm.getSharedBlobName();
+ this.generation = generation;
+ this.blobFiles = new HashSet<>(Arrays.asList(bcm.getBlobFiles()));
+ this.blobFilesToDelete = new HashSet<>(Arrays.asList(bcm.getBlobFilesToDelete()));
+ }
- public String getSharedBlobName() {
- return this.sharedBlobName;
- }
+ public String getSharedBlobName() {
+ return this.sharedBlobName;
+ }
- /**
- * Builds a {@link BlobCoreMetadata} for a non existing core of a given name.
- */
- static public BlobCoreMetadata buildEmptyCoreMetadata(String sharedBlobName) {
- return (new BlobCoreMetadataBuilder(sharedBlobName, UNDEFINED_VALUE)).build();
- }
+ /**
+ * Builds a {@link BlobCoreMetadata} for a non existing core of a given name.
+ */
+ static public BlobCoreMetadata buildEmptyCoreMetadata(String sharedBlobName) {
+ return (new BlobCoreMetadataBuilder(sharedBlobName, UNDEFINED_VALUE)).build();
+ }
- /**
- * Adds a file to the set of "active" files listed in the metadata
- */
- public BlobCoreMetadataBuilder addFile(BlobCoreMetadata.BlobFile f) {
- this.blobFiles.add(f);
- return this;
- }
+ /**
+ * Adds a file to the set of "active" files listed in the metadata
+ */
+ public BlobCoreMetadataBuilder addFile(BlobCoreMetadata.BlobFile f) {
+ this.blobFiles.add(f);
+ return this;
+ }
- /**
- * Removes a file from the set of "active" files listed in the metadata
- */
- public BlobCoreMetadataBuilder removeFile(BlobCoreMetadata.BlobFile f) {
- boolean removed = this.blobFiles.remove(f);
- assert removed; // If we remove things that are not there, likely a bug in our code
- return this;
- }
+ /**
+ * Removes a file from the set of "active" files listed in the metadata
+ */
+ public BlobCoreMetadataBuilder removeFile(BlobCoreMetadata.BlobFile f) {
+ boolean removed = this.blobFiles.remove(f);
+ assert removed; // If we remove things that are not there, likely a bug in our code
+ return this;
+ }
- /**
- * Adds a file to the set of files to delete listed in the metadata<p>
- * This method should always be called with {@link #removeFile(BlobCoreMetadata.BlobFile)} above. Possibly it's
- * better to only have a single method doing both operations (TODO).
- */
- public BlobCoreMetadataBuilder addFileToDelete(BlobCoreMetadata.BlobFileToDelete f) {
- this.blobFilesToDelete.add(f);
- return this;
- }
+ /**
+ * Adds a file to the set of files to delete listed in the metadata<p>
+ * This method should always be called with {@link #removeFile(BlobCoreMetadata.BlobFile)} above. Possibly it's
+ * better to only have a single method doing both operations (TODO).
+ */
+ public BlobCoreMetadataBuilder addFileToDelete(BlobCoreMetadata.BlobFileToDelete f) {
+ this.blobFilesToDelete.add(f);
+ return this;
+ }
- /**
- * Returns an iterator on the set of files to delete.
- * The returned iterator will be used to remove files from the set (as they are enqueued for hard delete from the Blob store).
- */
- public Iterator<BlobCoreMetadata.BlobFileToDelete> getDeletedFilesIterator() {
- return this.blobFilesToDelete.iterator();
- }
+ /**
+ * Returns an iterator on the set of files to delete.
+ * The returned iterator will be used to remove files from the set (as they are enqueued for hard delete from the Blob store).
+ */
+ public Iterator<BlobCoreMetadata.BlobFileToDelete> getDeletedFilesIterator() {
+ return this.blobFilesToDelete.iterator();
+ }
- /**
- * Removes a file from the set of "deleted" files listed in the metadata
- */
- public BlobCoreMetadataBuilder removeFilesFromDeleted(Set<BlobCoreMetadata.BlobFileToDelete> files) {
- int originalSize = this.blobFilesToDelete.size();
- boolean removed = this.blobFilesToDelete.removeAll(files);
- int totalRemoved = originalSize - this.blobFilesToDelete.size();
-
- // If we remove things that are not there, likely a bug in our code
- assert removed && (totalRemoved == files.size());
- return this;
- }
+ /**
+ * Removes a file from the set of "deleted" files listed in the metadata
+ */
+ public BlobCoreMetadataBuilder removeFilesFromDeleted(Set<BlobCoreMetadata.BlobFileToDelete> files) {
+ int originalSize = this.blobFilesToDelete.size();
+ boolean removed = this.blobFilesToDelete.removeAll(files);
+ int totalRemoved = originalSize - this.blobFilesToDelete.size();
+
+ // If we remove things that are not there, likely a bug in our code
+ assert removed && (totalRemoved == files.size());
+ return this;
+ }
- public BlobCoreMetadata build() {
- // TODO make this fail if we find more than one segments_N files.
- BlobCoreMetadata.BlobFile[] blobFilesArray = this.blobFiles.toArray(new BlobCoreMetadata.BlobFile[this.blobFiles.size()]);
- BlobCoreMetadata.BlobFileToDelete[] blobFilesToDeleteArray = this.blobFilesToDelete.toArray(new BlobCoreMetadata.BlobFileToDelete[this.blobFilesToDelete.size()]);
+ public BlobCoreMetadata build() {
+ // TODO make this fail if we find more than one segments_N files.
+ BlobCoreMetadata.BlobFile[] blobFilesArray = this.blobFiles.toArray(new BlobCoreMetadata.BlobFile[this.blobFiles.size()]);
+ BlobCoreMetadata.BlobFileToDelete[] blobFilesToDeleteArray = this.blobFilesToDelete.toArray(new BlobCoreMetadata.BlobFileToDelete[this.blobFilesToDelete.size()]);
- return new BlobCoreMetadata(this.sharedBlobName, blobFilesArray, blobFilesToDeleteArray, generation);
- }
+ return new BlobCoreMetadata(this.sharedBlobName, blobFilesArray, blobFilesToDeleteArray, generation);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/BlobException.java b/solr/core/src/java/org/apache/solr/store/blob/client/BlobException.java
index 4c34aef..372ddec 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/BlobException.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/BlobException.java
@@ -20,15 +20,15 @@ package org.apache.solr.store.blob.client;
* Parent class of blob store related issues. Likely to change and maybe disappear but good enough for a PoC.
*/
public class BlobException extends Exception {
- public BlobException(Throwable cause) {
- super(cause);
- }
+ public BlobException(Throwable cause) {
+ super(cause);
+ }
- public BlobException(String message) {
- super(message);
- }
+ public BlobException(String message) {
+ super(message);
+ }
- public BlobException(String message, Throwable cause) {
- super(message, cause);
- }
+ public BlobException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/BlobServiceException.java b/solr/core/src/java/org/apache/solr/store/blob/client/BlobServiceException.java
index 66ebcaa..792043d 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/BlobServiceException.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/BlobServiceException.java
@@ -24,15 +24,15 @@ package org.apache.solr.store.blob.client;
*/
public class BlobServiceException extends BlobException {
- public BlobServiceException(Throwable cause) {
- super(cause);
- }
+ public BlobServiceException(Throwable cause) {
+ super(cause);
+ }
- public BlobServiceException(String message) {
- super(message);
- }
+ public BlobServiceException(String message) {
+ super(message);
+ }
- public BlobServiceException(String message, Throwable cause) {
- super(message, cause);
- }
+ public BlobServiceException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java b/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
index 280839e..5bf07db 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/CoreStorageClient.java
@@ -10,140 +10,140 @@ import java.util.List;
*/
public interface CoreStorageClient {
- /**
- * Replaces the special CORE_METADATA_BLOB_FILENAME blob on the blob store for the core by a new version passed as a
- * {@link BlobCoreMetadata} instance.
- *
- * @param sharedStoreName name of the shared shard index data to write the metadata file for
- * @param blobCoreMetadataName name of metadata file to write to the blob store
- * @param bcm blob metadata to be serialized and written to the blob store
- */
- public void pushCoreMetadata(String sharedStoreName, String blobCoreMetadataName, BlobCoreMetadata bcm) throws BlobException;
+ /**
+ * Replaces the special CORE_METADATA_BLOB_FILENAME blob on the blob store for the core by a new version passed as a
+ * {@link BlobCoreMetadata} instance.
+ *
+ * @param sharedStoreName name of the shared shard index data to write the metadata file for
+ * @param blobCoreMetadataName name of metadata file to write to the blob store
+ * @param bcm blob metadata to be serialized and written to the blob store
+ */
+ public void pushCoreMetadata(String sharedStoreName, String blobCoreMetadataName, BlobCoreMetadata bcm) throws BlobException;
- /**
- * Reads the special CORE_METADATA_BLOB_FILENAME blob on the blob store for the core and returns the corresponding
- * {@link BlobCoreMetadata} object.
- *
- * @param sharedStoreName name of the shared shard index data to get metadata for
- * @param blobCoreMetadataName name of metadata file to pull from the blob store
- * @return <code>null</code> if the core does not exist on the Blob store or method {@link #pushCoreMetadata} was
- * never called for it. Otherwise returns the latest value written using {@link #pushCoreMetadata} ("latest" here
- * based on the consistency model of the underlying store, in practice the last value written by any server given the
- * strong consistency of the Salesforce S3 implementation).
- */
- public BlobCoreMetadata pullCoreMetadata(String sharedStoreName, String blobCoreMetadataName) throws BlobException;
+ /**
+ * Reads the special CORE_METADATA_BLOB_FILENAME blob on the blob store for the core and returns the corresponding
+ * {@link BlobCoreMetadata} object.
+ *
+ * @param sharedStoreName name of the shared shard index data to get metadata for
+ * @param blobCoreMetadataName name of metadata file to pull from the blob store
+ * @return <code>null</code> if the core does not exist on the Blob store or method {@link #pushCoreMetadata} was
+ * never called for it. Otherwise returns the latest value written using {@link #pushCoreMetadata} ("latest" here
+ * based on the consistency model of the underlying store, in practice the last value written by any server given the
+ * strong consistency of the Salesforce S3 implementation).
+ */
+ public BlobCoreMetadata pullCoreMetadata(String sharedStoreName, String blobCoreMetadataName) throws BlobException;
- /**
- * Returns an input stream for the given blob. The caller must close the stream when done.
- *
- * @param path the blob file key for the file to be pulled
- * @return the blob's input stream
- */
- public InputStream pullStream(String path) throws BlobException;
+ /**
+ * Returns an input stream for the given blob. The caller must close the stream when done.
+ *
+ * @param path the blob file key for the file to be pulled
+ * @return the blob's input stream
+ */
+ public InputStream pullStream(String path) throws BlobException;
- /**
- * Writes to the external store using an input stream for the given core. The unique
- * path to the written blob is returned.
- *
- * @param blobName name of the core to be pulled from the store
- * @param is input stream of the core
- * @param contentLength size of the stream's data
- * @param fileNamePrefix have this string followed by a "." be how the "filename" of the written blob starts
- * @return the unique path to the written blob. Expected to be of the form /path1/.../pathn/_filenamePrefix_._random string_
- */
- public String pushStream(String blobName, InputStream is, long contentLength, String fileNamePrefix) throws BlobException;
+ /**
+ * Writes to the external store using an input stream for the given core. The unique
+ * path to the written blob is returned.
+ *
+ * @param blobName name of the core to be pulled from the store
+ * @param is input stream of the core
+ * @param contentLength size of the stream's data
+ * @param fileNamePrefix have this string followed by a "." be how the "filename" of the written blob starts
+ * @return the unique path to the written blob. Expected to be of the form /path1/.../pathn/_filenamePrefix_._random string_
+ */
+ public String pushStream(String blobName, InputStream is, long contentLength, String fileNamePrefix) throws BlobException;
- /**
- * Checks if the shard index data with the given shard metadata file exists
- *
- * @param sharedStoreName name of the shared shard index data to check exists
- * @param blobCoreMetadataName name of metadata file to check exists
- * @return true if the core has blobs
- */
- public boolean coreMetadataExists(String sharedStoreName, String blobCoreMetadataName) throws BlobException;
+ /**
+ * Checks if the shard index data with the given shard metadata file exists
+ *
+ * @param sharedStoreName name of the shared shard index data to check exists
+ * @param blobCoreMetadataName name of metadata file to check exists
+ * @return true if the core has blobs
+ */
+ public boolean coreMetadataExists(String sharedStoreName, String blobCoreMetadataName) throws BlobException;
- /**
- * Deletes all blob files associated with this blobName.
- *
- * @param blobName core to delete
- */
- public void deleteCore(String blobName) throws BlobException;
+ /**
+ * Deletes all blob files associated with this blobName.
+ *
+ * @param blobName core to delete
+ */
+ public void deleteCore(String blobName) throws BlobException;
- /**
- * Batch delete blob files from the blob store. Any blob file path that specifies a non-existent blob file will
- * not be treated as an error and should return success.
- *
- * @param paths list of blob file keys to the files to be deleted
- */
- public void deleteBlobs(Collection<String> paths) throws BlobException;
+ /**
+ * Batch delete blob files from the blob store. Any blob file path that specifies a non-existent blob file will
+ * not be treated as an error and should return success.
+ *
+ * @param paths list of blob file keys to the files to be deleted
+ */
+ public void deleteBlobs(Collection<String> paths) throws BlobException;
+
+ /**
+ * Retrieves an identifier for the cloud service providing the blobstore
+ *
+ * @return string identifying the service provider
+ */
+ public BlobstoreProviderType getStorageProvider();
- /**
- * Retrieves an identifier for the cloud service providing the blobstore
- *
- * @return string identifying the service provider
- */
- public BlobstoreProviderType getStorageProvider();
+ /**
+ * Returns an identifier for the geographical region a bucket is located in. Most blob storage providers utilize the concept of regions. In the unlikely case that we use a provider that
+ * doesn't, we should update this method (and log lines that mostly use this) to reflect a more generic terminology.
+ *
+ * @return string identifying the bucket's geographical region
+ */
+ public String getBucketRegion();
- /**
- * Returns an identifier for the geographical region a bucket is located in. Most blob storage providers utilize the concept of regions. In the unlikely case that we use a provider that
- * doesn't, we should update this method (and log lines that mostly use this) to reflect a more generic terminology.
- *
- * @return string identifying the bucket's geographical region
- */
- public String getBucketRegion();
+ /**
+ * Returns the name of the bucket that this client will interact with
+ *
+ * @return string name of the bucket this client is configured with
+ */
+ public String getBucketName();
- /**
- * Returns the name of the bucket that this client will interact with
- *
- * @return string name of the bucket this client is configured with
- */
- public String getBucketName();
+ /**
+ * Returns the the end point this client is configured with
+ *
+ * @return string name of the endpoint this client is configured with
+ */
+ public String getEndpoint();
- /**
- * Returns the the end point this client is configured with
- *
- * @return string name of the endpoint this client is configured with
- */
- public String getEndpoint();
+ /**
+ * Tests whether or not a bucket exists. Throws an exception if something is wrong or fa
+ *
+ * @return true if it does
+ */
+ public boolean doesBucketExist() throws BlobException;
- /**
- * Tests whether or not a bucket exists. Throws an exception if something is wrong or fa
- *
- * @return true if it does
- */
- public boolean doesBucketExist() throws BlobException;
+ /**
+ * Closes any resources used by the core storage client
+ */
+ public void shutdown();
+
+ /**
+ * Lists the blob file names of all of files listed under a given core name's blob store
+ * hierarchy that are older than the given timestamp value in milliseconds. Important to
+ * note that that the wall clock of your caller will vary with that of the blob store service
+ * such that you'll likely see mismatch if you're trying to list very recent files.
+ *
+ * This method is intended for observing significantly older modified files where clock skew
+ * is less of an issue.
+ *
+ * @param blobName the core to be listed
+ * @param timestamp timestamp in milliseconds
+ * @return the list of blob files
+ */
+ public List<String> listCoreBlobFilesOlderThan(String blobName, long timestamp) throws BlobException;
- /**
- * Closes any resources used by the core storage client
- */
- public void shutdown();
-
- /**
- * Lists the blob file names of all of files listed under a given core name's blob store
- * hierarchy that are older than the given timestamp value in milliseconds. Important to
- * note that that the wall clock of your caller will vary with that of the blob store service
- * such that you'll likely see mismatch if you're trying to list very recent files.
- *
- * This method is intended for observing significantly older modified files where clock skew
- * is less of an issue.
- *
- * @param blobName the core to be listed
- * @param timestamp timestamp in milliseconds
- * @return the list of blob files
- */
- public List<String> listCoreBlobFilesOlderThan(String blobName, long timestamp) throws BlobException;
-
- /**
- * Lists the common delimiter-terminated prefixes by blob keys beginning with the
- * provided prefix string.
- *
- * e.g. "core1/name1", "core1/name2" and "core2/name2"
- * Passing the prefix argument "core" should return
- * "core1/" and "core2/" i.e. the common delimiter-terminated
- * prefixes shared by blob keys in the blob store.
- *
- * @param prefix of the blob key to list by
- * @return the list of common prefixes
- */
- public List<String> listCommonBlobPrefix(String prefix) throws BlobException;
+ /**
+ * Lists the common delimiter-terminated prefixes by blob keys beginning with the
+ * provided prefix string.
+ *
+ * e.g. "core1/name1", "core1/name2" and "core2/name2"
+ * Passing the prefix argument "core" should return
+ * "core1/" and "core2/" i.e. the common delimiter-terminated
+ * prefixes shared by blob keys in the blob store.
+ *
+ * @param prefix of the blob key to list by
+ * @return the list of common prefixes
+ */
+ public List<String> listCommonBlobPrefix(String prefix) throws BlobException;
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/client/ToFromJson.java b/solr/core/src/java/org/apache/solr/store/blob/client/ToFromJson.java
index dd9b142..7c21577 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/client/ToFromJson.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/client/ToFromJson.java
@@ -23,22 +23,22 @@ import com.google.gson.GsonBuilder;
* Serializing/deserializing Json (mostly for {@link org.apache.solr.store.blob.client.BlobCoreMetadata}).
*/
public class ToFromJson<T> {
- /** Create easier to (human) read but a bit longer json output */
- static final boolean PRETTY_JSON = true;
+ /** Create easier to (human) read but a bit longer json output */
+ static final boolean PRETTY_JSON = true;
- /**
- * Builds an object instance from a String representation of the Json.
- */
- public T fromJson(String input, Class<T> c) throws Exception {
- Gson gson = new Gson();
- return gson.fromJson(input, c);
- }
+ /**
+ * Builds an object instance from a String representation of the Json.
+ */
+ public T fromJson(String input, Class<T> c) throws Exception {
+ Gson gson = new Gson();
+ return gson.fromJson(input, c);
+ }
- /**
- * Returns the Json String of the passed object instance.
- */
- public String toJson(T t) throws Exception {
- Gson gson = PRETTY_JSON ? new GsonBuilder().setPrettyPrinting().create() : new Gson();
- return gson.toJson(t);
- }
+ /**
+ * Returns the Json String of the passed object instance.
+ */
+ public String toJson(T t) throws Exception {
+ Gson gson = PRETTY_JSON ? new GsonBuilder().setPrettyPrinting().create() : new Gson();
+ return gson.toJson(t);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
index fceb569..c352e50 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
@@ -41,358 +41,356 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
/**
- * Class to sync between local and blob copies of a core using {@link org.apache.solr.store.blob.metadata.CorePushPull}
+ * Class to sync between local and blob copies of a core using
+ * {@link org.apache.solr.store.blob.metadata.CorePushPull}
*/
public class BlobCoreSyncer {
-
- private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- /**
- * Threads wait for at most this duration before giving up on async pull to finish and returning with a PullInProgressException.
- */
- public static final long PULL_WAITING_MS = TimeUnit.SECONDS.toMillis(5);
-
- /**
- * Max number of threads for a core that can concurrently wait for the pull
- * to complete instead of returning a PullInProgressException right away
- */
- public static final int MAX_PULL_WAITING_PER_CORE = 5;
-
- /**
- * Allow at most that number of threads to wait for async pulls to finish over all cores instead of returning
- * a PullInProgressException right away.
- */
- public static final int MAX_PULL_WAITING_TOTAL = 20;
-
- /** "Skipping pulling core" string is checked on Core App in SolrExchange.onResponseComplete()
- * and defined there in constant SolrExchange.CORE_BEING_PULLED. Change here, change there!
- * Use this string as prefix of any message returned in a PullInProgressException.
- */
- private static final String SKIPPING_PULLING_CORE = "Skipping pulling core";
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- @GuardedBy("coreSyncsInFlight")
- private int total_waiting_threads = 0;
-
- /** The shared store name for the core currently being pulled from blob. Value is collection of objects used for synchronization by all waiting threads.
- * If both the locks on this map and on a specific SyncOnPullWait in the map are needed, the lock on the map must be acquired first.
- */
- @GuardedBy("itself")
- private final Map<String, Collection<SyncOnPullWait>> coreSyncsInFlight = Maps.newHashMap();
-
- /**
- * @return Total number of threads across all cores waiting for their respective core to be pulled from blob store
- */
- @VisibleForTesting
- protected int getTotalWaitingThreads() {
- synchronized (coreSyncsInFlight) {
- return total_waiting_threads;
- }
- }
-
- /**
- * Each thread waiting for async pull to complete uses its own new instance of this class. This is done
- * so that the pulling thread upon completion can do two things:
- * <ol><li>
- * Notify each waiting (or about to wait) thread that the pull is done,
- * </li><li>
- * Pass pull results to the waiting thread (for example indicate if an exception was thrown and the pull failed).
- * </li></ol>
- * A data structure in which all waiter threads share the same lock object (and the same flag/data indicating completion
- * and its result) can be considered simpler but has the problem of when can this data structure be freed, often resorting
- * to relatively complex code using weak references.<p>
- * In the implementation done here, the entry in the map of cores to "waiting lists" ({@link BlobCoreSyncer#coreSyncsInFlight})
- * can be removed at any time once all waiter threads have been notified, then the actual data structure for each
- * waiter will be reclaimable by the JVM when the waiter has finished using it, without complex tricks.
- */
- private static class SyncOnPullWait {
- private final CountDownLatch latch = new CountDownLatch(1);
- private Exception exception = null; // If non null once latch is counted down (to 0), exception encountered by pulling thread
- }
-
- /**
- * Returns a _hint_ that the given core might be locally empty because it is awaiting pull from Blob store.
- * This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
- * Because of that, in method {@link #pull(PushPullData, boolean, boolean, CoreContainer)} we need to check again.
- */
- public boolean isEmptyCoreAwaitingPull(CoreContainer cores, String coreName) {
- return CorePullerFeeder.isEmptyCoreAwaitingPull(cores, coreName);
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Threads wait for at most this duration before giving up on async pull to finish and returning with a PullInProgressException.
+ */
+ public static final long PULL_WAITING_MS = TimeUnit.SECONDS.toMillis(5);
+
+ /**
+ * Max number of threads for a core that can concurrently wait for the pull
+ * to complete instead of returning a PullInProgressException right away
+ */
+ public static final int MAX_PULL_WAITING_PER_CORE = 5;
+
+ /**
+ * Allow at most that number of threads to wait for async pulls to finish over all cores instead of returning
+ * a PullInProgressException right away.
+ */
+ public static final int MAX_PULL_WAITING_TOTAL = 20;
+
+ /** "Skipping pulling core" string is checked on Core App in SolrExchange.onResponseComplete()
+ * and defined there in constant SolrExchange.CORE_BEING_PULLED. Change here, change there!
+ * Use this string as prefix of any message returned in a PullInProgressException.
+ */
+ private static final String SKIPPING_PULLING_CORE = "Skipping pulling core";
+
+ @GuardedBy("coreSyncsInFlight")
+ private int total_waiting_threads = 0;
+
+ /** The shared store name for the core currently being pulled from blob. Value is collection of objects used for synchronization by all waiting threads.
+ * If both the locks on this map and on a specific SyncOnPullWait in the map are needed, the lock on the map must be acquired first.
+ */
+ @GuardedBy("itself")
+ private final Map<String, Collection<SyncOnPullWait>> coreSyncsInFlight = Maps.newHashMap();
+
+ /**
+ * @return Total number of threads across all cores waiting for their respective core to be pulled from blob store
+ */
+ @VisibleForTesting
+ protected int getTotalWaitingThreads() {
+ synchronized (coreSyncsInFlight) {
+ return total_waiting_threads;
}
+ }
+
+ /**
+ * Each thread waiting for async pull to complete uses its own new instance of this class. This is done
+ * so that the pulling thread upon completion can do two things:
+ * <ol><li>
+ * Notify each waiting (or about to wait) thread that the pull is done,
+ * </li><li>
+ * Pass pull results to the waiting thread (for example indicate if an exception was thrown and the pull failed).
+ * </li></ol>
+ * A data structure in which all waiter threads share the same lock object (and the same flag/data indicating completion
+ * and its result) can be considered simpler but has the problem of when can this data structure be freed, often resorting
+ * to relatively complex code using weak references.<p>
+ * In the implementation done here, the entry in the map of cores to "waiting lists" ({@link BlobCoreSyncer#coreSyncsInFlight})
+ * can be removed at any time once all waiter threads have been notified, then the actual data structure for each
+ * waiter will be reclaimable by the JVM when the waiter has finished using it, without complex tricks.
+ */
+ private static class SyncOnPullWait {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private Exception exception = null; // If non null once latch is counted down (to 0), exception encountered by pulling thread
+ }
+
+ /**
+ * Returns a _hint_ that the given core might be locally empty because it is awaiting pull from Blob store.
+ * This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
+ * Because of that, in method {@link #pull(PushPullData, boolean, boolean, CoreContainer)} we need to check again.
+ */
+ public boolean isEmptyCoreAwaitingPull(CoreContainer cores, String coreName) {
+ return CorePullerFeeder.isEmptyCoreAwaitingPull(cores, coreName);
+ }
- public void pull(String coreName, String shardName, String collectionName, CoreContainer cores,
- boolean waitForSearcher, boolean emptyCoreAwaitingPull) {
- // Initialize variables
- SharedShardMetadataController sharedShardMetadataController = cores.getSharedStoreManager().getSharedShardMetadataController();
- DocCollection collection = cores.getZkController().getClusterState().getCollection(collectionName);
-
- Slice shard = collection.getSlicesMap().get(shardName);
- if (shard != null) {
- try {
- if (!collection.getActiveSlices().contains(shard)) {
- // unclear if there are side effects but logging for now
- log.warn("Pulling shard " + shardName + " that is inactive!");
- }
- log.info("Pulling for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
- // creates the metadata node if it doesn't exist
- sharedShardMetadataController.ensureMetadataNodeExists(collectionName, shardName);
-
- /*
- * Get the metadataSuffix value from ZooKeeper or from a cache if an entry exists for the
- * given collection and shardName. If the leader has already changed, the conditional update
- * later will fail and invalidate the cache entry if it exists.
- */
- VersionedData data = sharedShardMetadataController.readMetadataValue(collectionName, shardName,
- /* readFromCache */ false);
- Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
- String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
-
- String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
-
- PushPullData pushPullData = new PushPullData.Builder()
- .setCollectionName(collectionName)
- .setShardName(shardName)
- .setCoreName(coreName)
- .setSharedStoreName(sharedShardName)
- .setLastReadMetadataSuffix(metadataSuffix)
- .setNewMetadataSuffix(BlobStoreUtils.generateMetadataSuffix())
- .setZkVersion(data.getVersion())
- .build();
- pull(pushPullData, waitForSearcher, emptyCoreAwaitingPull, cores);
- } catch (Exception ex) {
- // wrap every thrown exception in a solr exception
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error trying to pull from blob store", ex);
+ public void pull(String coreName, String shardName, String collectionName, CoreContainer cores,
+ boolean waitForSearcher, boolean emptyCoreAwaitingPull) {
+ // Initialize variables
+ SharedShardMetadataController sharedShardMetadataController = cores.getSharedStoreManager().getSharedShardMetadataController();
+ DocCollection collection = cores.getZkController().getClusterState().getCollection(collectionName);
+
+ Slice shard = collection.getSlicesMap().get(shardName);
+ if (shard != null) {
+ try {
+ if (!collection.getActiveSlices().contains(shard)) {
+ // unclear if there are side effects but logging for now
+ log.warn("Pulling shard " + shardName + " that is inactive!");
}
+ log.info("Pulling for collection=" + collectionName + " shard=" + shardName + " coreName=" + coreName);
+ // creates the metadata node if it doesn't exist
+ sharedShardMetadataController.ensureMetadataNodeExists(collectionName, shardName);
+
+ /*
+ * Get the metadataSuffix value from ZooKeeper or from a cache if an entry exists for the
+ * given collection and shardName. If the leader has already changed, the conditional update
+ * later will fail and invalidate the cache entry if it exists.
+ */
+ VersionedData data = sharedShardMetadataController.readMetadataValue(collectionName, shardName,
+ /* readFromCache */ false);
+ Map<String, String> nodeUserData = (Map<String, String>) Utils.fromJSON(data.getData());
+ String metadataSuffix = nodeUserData.get(SharedShardMetadataController.SUFFIX_NODE_NAME);
+
+ String sharedShardName = (String) shard.get(ZkStateReader.SHARED_SHARD_NAME);
+
+ PushPullData pushPullData = new PushPullData.Builder()
+ .setCollectionName(collectionName)
+ .setShardName(shardName)
+ .setCoreName(coreName)
+ .setSharedStoreName(sharedShardName)
+ .setLastReadMetadataSuffix(metadataSuffix)
+ .setNewMetadataSuffix(BlobStoreUtils.generateMetadataSuffix())
+ .setZkVersion(data.getVersion())
+ .build();
+ pull(pushPullData, waitForSearcher, emptyCoreAwaitingPull, cores);
+ } catch (Exception ex) {
+ // wrap every thrown exception in a solr exception
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error trying to pull from blob store", ex);
+ }
+ }
+ }
+
+ /**
+ * This method is used to pull in updates from blob when there is no local copy of the core available and when a thread
+ * needs the core to be available (i.e. waits for it).<p>
+ *
+ * A parameter <code>waitForSearcher</code> is therefore added and is always <code>true</code>
+ * as of today, but it is added so any additional calls to this method need to decide if they want wait for searcher or not.
+ * Note that when multiple threads wait for the same core, currently only the puller will create the index searcher and
+ * either wait for it to be available (waitForSearcher) or not (!waitForSearcher). If another thread waiting for the pull to complete wanted
+ * a waitForSearcher and the puller thread did not, the code should be modified. This doesn't happen as of today because this
+ * method is always called with waitForSearcher being true...
+ *
+ * TODO if Blob is not available on pull, we might treat this as a missing core exception from Core app and reindex. Likely need to treat Blob unavailable differently from no such core on blob
+ *
+ * @param emptyCoreAwaitingPull <code>true</code> if this pull method is called after the core got created empty locally
+ * but before the actual pull happened, in which case this method is called to wait until
+ * the pull finishes to avoid erroneously returning no results.
+ *
+ * @throws PullInProgressException In case a thread does not wait or times out before the async pull is finished
+ */
+ public void pull(PushPullData pushPullData, boolean waitForSearcher, boolean emptyCoreAwaitingPull, CoreContainer cores) throws PullInProgressException {
+ // Is there another thread already working on the async pull?
+ final boolean pullAlreadyInProgress;
+ // Indicates if thread waits for the pull to finish or too many waiters already
+ final boolean iWait;
+ // When iWait (is true), myWait is the object to use for the wait. Note pull thread might have completed before
+ // we start wait, so we need to test the state of the myWait object first.
+ final SyncOnPullWait myWait;
+
+ // Capturing the number of waiting threads to log the reason we can't wait... This is mostly for autobuild debug and likely to be removed afterwards. -1 should never be logged
+ int countCoreWaiters = -1;
+ int countTotalWaiters = -1;
+
+ // Only can have only one thread working on async pull of this core (and we do no logging while holding the lock)
+ // Let's understand what our role and actions are while holding the global lock and then execute on them without the lock.
+ synchronized (coreSyncsInFlight) {
+ if (emptyCoreAwaitingPull && !isEmptyCoreAwaitingPull(cores, pushPullData.getCoreName())) {
+ // Core was observed empty awaiting pull and is no longer awaiting pull. This means the pull happened.
+ return;
+ }
+ countTotalWaiters = total_waiting_threads; // for logging
+
+ Collection<SyncOnPullWait> collectionOfWaiters = coreSyncsInFlight.get(pushPullData.getSharedStoreName());
+ if (collectionOfWaiters != null) {
+ // Somebody is already working on async pull of this core. If possible let's add ourselves to the list of those who wait (then wait)
+ pullAlreadyInProgress = true;
+ } else {
+ // We are the first thread trying to pull this core, before releasing the lock we must setup everything
+ // to let the world know and let itself and other threads be able to wait until async pull is done if/when they so decide.
+ collectionOfWaiters = new ArrayList<>(MAX_PULL_WAITING_PER_CORE);
+ coreSyncsInFlight.put(pushPullData.getSharedStoreName(), collectionOfWaiters);
+ pullAlreadyInProgress = false;
+ }
+ int waiters = collectionOfWaiters.size();
+
+ countCoreWaiters = waiters; // for logging
+
+ iWait = total_waiting_threads < MAX_PULL_WAITING_TOTAL && waiters < MAX_PULL_WAITING_PER_CORE;
+ if (iWait) {
+ myWait = new SyncOnPullWait();
+ // Increase total waiting count and add ourselves as waiting for the current core.
+ total_waiting_threads++;
+ collectionOfWaiters.add(myWait);
+ } else {
+ // We cannot throw pull in progress exception here because this whole synchronized block is calculating
+ // subsequent actions and we might need to enqueue a pull before throwing that exception i.e. (!pullAlreadyInProgress)
+ //
+ // We can make it work that way if we really want to(although I don't see a reason) if we separate out this waiters calculation
+ // into a separate synchronized block after the enqueuing of pull request block
+ myWait = null; // Not used but has to be set
}
}
- /**
- * This method is used to pull in updates from blob when there is no local copy of the core available and when a thread
- * needs the core to be available (i.e. waits for it).<p>
- *
- * A parameter <code>waitForSearcher</code> is therefore added and is always <code>true</code>
- * as of today, but it is added so any additional calls to this method need to decide if they want wait for searcher or not.
- * Note that when multiple threads wait for the same core, currently only the puller will create the index searcher and
- * either wait for it to be available (waitForSearcher) or not (!waitForSearcher). If another thread waiting for the pull to complete wanted
- * a waitForSearcher and the puller thread did not, the code should be modified. This doesn't happen as of today because this
- * method is always called with waitForSearcher being true...
- *
- * TODO if Blob is not available on pull, we might treat this as a missing core exception from Core app and reindex. Likely need to treat Blob unavailable differently from no such core on blob
- *
- * @param emptyCoreAwaitingPull <code>true</code> if this pull method is called after the core got created empty locally
- * but before the actual pull happened, in which case this method is called to wait until
- * the pull finishes to avoid erroneously returning no results.
- *
- * @throws PullInProgressException In case a thread does not wait or times out before the async pull is finished
- */
- public void pull(PushPullData pushPullData, boolean waitForSearcher, boolean emptyCoreAwaitingPull, CoreContainer cores) throws PullInProgressException {
- // Is there another thread already working on the async pull?
- final boolean pullAlreadyInProgress;
- // Indicates if thread waits for the pull to finish or too many waiters already
- final boolean iWait;
- // When iWait (is true), myWait is the object to use for the wait. Note pull thread might have completed before
- // we start wait, so we need to test the state of the myWait object first.
- final SyncOnPullWait myWait;
-
- // Capturing the number of waiting threads to log the reason we can't wait... This is mostly for autobuild debug and likely to be removed afterwards. -1 should never be logged
- int countCoreWaiters = -1;
- int countTotalWaiters = -1;
-
- // Only can have only one thread working on async pull of this core (and we do no logging while holding the lock)
- // Let's understand what our role and actions are while holding the global lock and then execute on them without the lock.
- synchronized (coreSyncsInFlight) {
- if (emptyCoreAwaitingPull && !isEmptyCoreAwaitingPull(cores, pushPullData.getCoreName())) {
- // Core was observed empty awaiting pull and is no longer awaiting pull. This means the pull happened.
- return;
- }
- countTotalWaiters = total_waiting_threads; // for logging
-
- Collection<SyncOnPullWait> collectionOfWaiters = coreSyncsInFlight.get(pushPullData.getSharedStoreName());
- if (collectionOfWaiters != null) {
- // Somebody is already working on async pull of this core. If possible let's add ourselves to the list of those who wait (then wait)
- pullAlreadyInProgress = true;
- } else {
- // We are the first thread trying to pull this core, before releasing the lock we must setup everything
- // to let the world know and let itself and other threads be able to wait until async pull is done if/when they so decide.
- collectionOfWaiters = new ArrayList<>(MAX_PULL_WAITING_PER_CORE);
- coreSyncsInFlight.put(pushPullData.getSharedStoreName(), collectionOfWaiters);
- pullAlreadyInProgress = false;
- }
- int waiters = collectionOfWaiters.size();
-
- countCoreWaiters = waiters; // for logging
-
- iWait = total_waiting_threads < MAX_PULL_WAITING_TOTAL && waiters < MAX_PULL_WAITING_PER_CORE;
- if (iWait) {
- myWait = new SyncOnPullWait();
- // Increase total waiting count and add ourselves as waiting for the current core.
- total_waiting_threads++;
- collectionOfWaiters.add(myWait);
- } else {
- // We cannot throw pull in progress exception here because this whole synchronized block is calculating
- // subsequent actions and we might need to enqueue a pull before throwing that exception i.e. (!pullAlreadyInProgress)
- //
- // We can make it work that way if we really want to(although I don't see a reason) if we separate out this waiters calculation
- // into a separate synchronized block after the enqueuing of pull request block
- myWait = null; // Not used but has to be set
- }
- }
-
- // Verify that emptyCoreAwaitingPull implies pullAlreadyInProgress: if the core was previously observed empty awaiting pull,
- // it means a thread is already on it (and if that thread has finished we would have returned earlier from this call)
- assert !emptyCoreAwaitingPull || pullAlreadyInProgress;
-
- if (!pullAlreadyInProgress) {
- // We are the first in here for that core so we get to enqueue async pull.
- // If we are successful in enqueuing then pullFinished callback will take care of notifying and clearing
- // out state around waiting threads(including this one if it end up waiting as well in the next block).
- // That callback will take care of both successful and failed pulls.
- // But if we fail in enqueuing then catch block here will declare pull as failed and will take care of notifying
- // and clearing out state around waiting threads (it is possible that another thread has already started waiting before this thread
- // get a chance to run this block)
- try {
- log.info("About to enqueue pull of core " + pushPullData.getSharedStoreName() + " (countTotalWaiters=" + countTotalWaiters + ")");
-
- // enqueue an async pull
- CorePullTracker corePullTracker = cores.getSharedStoreManager().getCorePullTracker();
- corePullTracker.enqueueForPull(pushPullData, true, waitForSearcher);
-
- } catch (Exception e) {
- // as mentioned above in case of failed enqueue we are responsible for clearing up all waiting state
- notifyEndOfPull(pushPullData.getSharedStoreName(), e);
- if (e instanceof InterruptedException) {
- // We swallow the InterruptedException and spit instead a SolrException.
- // Need to let layers higher up know an interruption happened.
- Thread.currentThread().interrupt();
- }
-
- String msg = "Failed to enqueue pull of core " + pushPullData.getSharedStoreName() + " from blob";
- SolrException se;
- log.warn(msg, e);
- if (e instanceof SolrException) {
- se = (SolrException) e;
- } else {
- // Wrapping in SolrException to percolate all the way to the catch Throwable at end of SolrDispatchFilter.doFilter()
- // A failed pull should not appear as a missing core otherwise Core App reindexes in another core...
- // TODO remains to be seen if all pull's() need to have this exception thrown or if some prefer silent failures.
- se = new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg, e);
- }
- throw se;
- }
+ // Verify that emptyCoreAwaitingPull implies pullAlreadyInProgress: if the core was previously observed empty awaiting pull,
+ // it means a thread is already on it (and if that thread has finished we would have returned earlier from this call)
+ assert !emptyCoreAwaitingPull || pullAlreadyInProgress;
+
+ if (!pullAlreadyInProgress) {
+ // We are the first in here for that core so we get to enqueue async pull.
+ // If we are successful in enqueuing then pullFinished callback will take care of notifying and clearing
+ // out state around waiting threads(including this one if it end up waiting as well in the next block).
+ // That callback will take care of both successful and failed pulls.
+ // But if we fail in enqueuing then catch block here will declare pull as failed and will take care of notifying
+ // and clearing out state around waiting threads (it is possible that another thread has already started waiting before this thread
+ // get a chance to run this block)
+ try {
+ log.info("About to enqueue pull of core " + pushPullData.getSharedStoreName() + " (countTotalWaiters=" + countTotalWaiters + ")");
+
+ // enqueue an async pull
+ CorePullTracker corePullTracker = cores.getSharedStoreManager().getCorePullTracker();
+ corePullTracker.enqueueForPull(pushPullData, true, waitForSearcher);
+
+ } catch (Exception e) {
+ // as mentioned above in case of failed enqueue we are responsible for clearing up all waiting state
+ notifyEndOfPull(pushPullData.getSharedStoreName(), e);
+ if (e instanceof InterruptedException) {
+ // We swallow the InterruptedException and spit instead a SolrException.
+ // Need to let layers higher up know an interruption happened.
+ Thread.currentThread().interrupt();
}
- // Now irrespective of us being a thread who initiated an async pull or just came after that, we might have
- // qualified for waiting for async pull to finish. If so we will just do that.
- if (iWait) {
- try {
- log.info("About to wait for pull of core " + pushPullData.getSharedStoreName() + " (countCoreWaiters=" + countCoreWaiters + " countTotalWaiters=" + countTotalWaiters + ")");
-
- // Let's wait a bit maybe the pull completes in which case we don't have to throw an exception back.
- // The other end of this lock activity happens in notifyEndOfPull() below
- try {
- // The pull might have completed by now (i.e. completed since we left the synchronized(coreSyncsInFlight) block)
- // and that's ok the await() below will not block if not needed
- myWait.latch.await(PULL_WAITING_MS, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted while waiting for core pull " + pushPullData.getSharedStoreName(), ie);
- }
-
- // Our wait finished...
- // Memory consistency effects of CountDownLatch: Until the count reaches zero, actions in a thread prior
- // to calling countDown() happen-before actions following a successful return from a corresponding await() in another thread.
- // We'll therefore see any updates done to the myWait object by the puller thread if it has finished pulling.
- if (myWait.latch.getCount() != 0) {
- throwPullInProgressException(pushPullData.getSharedStoreName(), "still in progress and didn't complete while we waited");
- } else {
- // Pull has completed. Success or failure?
- if (myWait.exception != null) {
- // We wrap in a SolrException the original exception from the puller thread.
- // (even if it's already a SolrException to make understanding what happens at runtime easier)
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Puller thread failed to pull", myWait.exception);
- }
-
- // The pull has finished during our wait.
- }
- } finally {
- // if wait is ended because of notifyEndOfPull() then it would have taken care of this state already and this essentially will be a no op.
- // But if it has ended because of some other reason e.g. InterruptedException, then we will remove this thread from list of waiters.
- //
- // pullFinished notification may come later but that should be ok since it will only see waiters that are still waiting or none.
- synchronized (coreSyncsInFlight) {
- Collection<SyncOnPullWait> collectionOfWaiters = coreSyncsInFlight.get(pushPullData.getSharedStoreName());
- if(collectionOfWaiters != null) {
- if(collectionOfWaiters.remove(myWait)){
- total_waiting_threads--;
- }
- }
- }
- }
+ String msg = "Failed to enqueue pull of core " + pushPullData.getSharedStoreName() + " from blob";
+ SolrException se;
+ log.warn(msg, e);
+ if (e instanceof SolrException) {
+ se = (SolrException) e;
} else {
- // Pull is in progress and we're not waiting for it to finish.
- throwPullInProgressException(pushPullData.getSharedStoreName(), "as it is already in progress and enough threads waiting");
+ // Wrapping in SolrException to percolate all the way to the catch Throwable at end of SolrDispatchFilter.doFilter()
+ // A failed pull should not appear as a missing core otherwise Core App reindexes in another core...
+ // TODO remains to be seen if all pull's() need to have this exception thrown or if some prefer silent failures.
+ se = new SolrException(SolrException.ErrorCode.SERVER_ERROR, msg, e);
}
+ throw se;
+ }
}
- /**
- * This is called whenever core from {@link CorePullTracker} finish its async pull(successfully or unsuccessfully)
- * We use this to notify all waiting threads for a core that their wait has ended (if there are some waiting).
- */
- public void finishedPull(String sharedStoreName, CoreSyncStatus status, BlobCoreMetadata blobMetadata, String message) {
- Exception pullException = null;
- final boolean isPullSuccessful = (status.isSuccess() ||
- // Following statuses are not considered success in strictest definition of pull but for BlobSyncer
- // they are not error either and we would let the normal query flow do its job (likely return missing core exception)
- status == CoreSyncStatus.BLOB_MISSING ||
- status == CoreSyncStatus.BLOB_DELETED_FOR_PULL ||
- // TODO: likely changes needed here for W-5388477 Blob store corruption repair
- status == CoreSyncStatus.BLOB_CORRUPT);
- if (!isPullSuccessful) {
- pullException = new SolrException(SolrException.ErrorCode.SERVER_ERROR, message);
- }
- notifyEndOfPull(sharedStoreName, pullException);
- }
+ // Now irrespective of us being a thread who initiated an async pull or just came after that, we might have
+ // qualified for waiting for async pull to finish. If so we will just do that.
+ if (iWait) {
+ try {
+ log.info("About to wait for pull of core " + pushPullData.getSharedStoreName() + " (countCoreWaiters=" + countCoreWaiters + " countTotalWaiters=" + countTotalWaiters + ")");
- private void throwPullInProgressException(String corename, String msgSuffix) throws PullInProgressException {
- String msg = SKIPPING_PULLING_CORE + " " + corename + " from blob " + msgSuffix;
- log.info(msg);
- // Note that longer term, this is the place where we could decide that if the async
- // pull was enqueued too long ago and nothing happened, then there's an issue worth gacking/alerting for.
- throw new PullInProgressException(msg);
+ // Let's wait a bit maybe the pull completes in which case we don't have to throw an exception back.
+ // The other end of this lock activity happens in notifyEndOfPull() below
+ try {
+ // The pull might have completed by now (i.e. completed since we left the synchronized(coreSyncsInFlight) block)
+ // and that's ok the await() below will not block if not needed
+ myWait.latch.await(PULL_WAITING_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Interrupted while waiting for core pull " + pushPullData.getSharedStoreName(), ie);
+ }
- }
+ // Our wait finished...
+ // Memory consistency effects of CountDownLatch: Until the count reaches zero, actions in a thread prior
+ // to calling countDown() happen-before actions following a successful return from a corresponding await() in another thread.
+ // We'll therefore see any updates done to the myWait object by the puller thread if it has finished pulling.
+ if (myWait.latch.getCount() != 0) {
+ throwPullInProgressException(pushPullData.getSharedStoreName(), "still in progress and didn't complete while we waited");
+ } else {
+ // Pull has completed. Success or failure?
+ if (myWait.exception != null) {
+ // We wrap in a SolrException the original exception from the puller thread.
+ // (even if it's already a SolrException to make understanding what happens at runtime easier)
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Puller thread failed to pull", myWait.exception);
+ }
- /**
- * Called by the puller thread once pull has completed (successfully or not), to notify all waiter threads of the issue
- * of the pull and to clean up this core in bookkeeping related to in-progress pulls({@link #pullEnded(String)}).
- * Also serves the purpose of being a memory barrier so that the waiting threads can check their SyncOnPullWait instances
- * for updates.
- */
- private void notifyEndOfPull(String sharedStoreName, Exception e) {
- final Collection<SyncOnPullWait> collectionOfWaiters = pullEnded(sharedStoreName);
- if (collectionOfWaiters != null) {
- for (SyncOnPullWait w : collectionOfWaiters) {
- // Need to set the exception before counting down on the latch because of the memory barrier effect of countDown()
- w.exception = e;
- w.latch.countDown();
- }
+ // The pull has finished during our wait.
}
- }
-
- /**
- * Cleans up the core from bookkeeping related to in-progress pulls and returns the collection of waiters for that core.
- * Collection of returned waiters could be null as well.
- */
- private Collection<SyncOnPullWait> pullEnded(String sharedStoreName) {
- final Collection<SyncOnPullWait> collectionOfWaiters;
+ } finally {
+ // if wait is ended because of notifyEndOfPull() then it would have taken care of this state already and this essentially will be a no op.
+ // But if it has ended because of some other reason e.g. InterruptedException, then we will remove this thread from list of waiters.
+ //
+ // pullFinished notification may come later but that should be ok since it will only see waiters that are still waiting or none.
synchronized (coreSyncsInFlight) {
- // Note that threads waiting for the pull to finish have references on their individual SyncOnPullWait instances,
- // so removing the entry from coreSyncsInFlight before the waiter threads are done waiting is ok.
- collectionOfWaiters = coreSyncsInFlight.remove(sharedStoreName);
- if(collectionOfWaiters != null) {
- total_waiting_threads -= collectionOfWaiters.size();
+ Collection<SyncOnPullWait> collectionOfWaiters = coreSyncsInFlight.get(pushPullData.getSharedStoreName());
+ if (collectionOfWaiters != null) {
+ if (collectionOfWaiters.remove(myWait)){
+ total_waiting_threads--;
}
+ }
}
- return collectionOfWaiters;
+ }
+ } else {
+ // Pull is in progress and we're not waiting for it to finish.
+ throwPullInProgressException(pushPullData.getSharedStoreName(), "as it is already in progress and enough threads waiting");
+ }
+ }
+
+ /**
+ * This is called whenever core from {@link CorePullTracker} finish its async pull(successfully or unsuccessfully)
+ * We use this to notify all waiting threads for a core that their wait has ended (if there are some waiting).
+ */
+ public void finishedPull(String sharedStoreName, CoreSyncStatus status, BlobCoreMetadata blobMetadata, String message) {
+ Exception pullException = null;
+ final boolean isPullSuccessful = (status.isSuccess() ||
+ // Following statuses are not considered success in strictest definition of pull but for BlobSyncer
+ // they are not error either and we would let the normal query flow do its job (likely return missing core exception)
+ status == CoreSyncStatus.BLOB_MISSING ||
+ status == CoreSyncStatus.BLOB_DELETED_FOR_PULL ||
+ // TODO: likely changes needed here for W-5388477 Blob store corruption repair
+ status == CoreSyncStatus.BLOB_CORRUPT);
+ if (!isPullSuccessful) {
+ pullException = new SolrException(SolrException.ErrorCode.SERVER_ERROR, message);
+ }
+ notifyEndOfPull(sharedStoreName, pullException);
+ }
+
+ private void throwPullInProgressException(String corename, String msgSuffix) throws PullInProgressException {
+ String msg = SKIPPING_PULLING_CORE + " " + corename + " from blob " + msgSuffix;
+ log.info(msg);
+ // Note that longer term, this is the place where we could decide that if the async
+ // pull was enqueued too long ago and nothing happened, then there's an issue worth gacking/alerting for.
+ throw new PullInProgressException(msg);
+ }
+
+ /**
+ * Called by the puller thread once pull has completed (successfully or not), to notify all waiter threads of the issue
+ * of the pull and to clean up this core in bookkeeping related to in-progress pulls({@link #pullEnded(String)}).
+ * Also serves the purpose of being a memory barrier so that the waiting threads can check their SyncOnPullWait instances
+ * for updates.
+ */
+ private void notifyEndOfPull(String sharedStoreName, Exception e) {
+ final Collection<SyncOnPullWait> collectionOfWaiters = pullEnded(sharedStoreName);
+ if (collectionOfWaiters != null) {
+ for (SyncOnPullWait w : collectionOfWaiters) {
+ // Need to set the exception before counting down on the latch because of the memory barrier effect of countDown()
+ w.exception = e;
+ w.latch.countDown();
+ }
+ }
+ }
+
+ /**
+ * Cleans up the core from bookkeeping related to in-progress pulls and returns the collection of waiters for that core.
+ * Collection of returned waiters could be null as well.
+ */
+ private Collection<SyncOnPullWait> pullEnded(String sharedStoreName) {
+ final Collection<SyncOnPullWait> collectionOfWaiters;
+ synchronized (coreSyncsInFlight) {
+ // Note that threads waiting for the pull to finish have references on their individual SyncOnPullWait instances,
+ // so removing the entry from coreSyncsInFlight before the waiter threads are done waiting is ok.
+ collectionOfWaiters = coreSyncsInFlight.remove(sharedStoreName);
+ if (collectionOfWaiters != null) {
+ total_waiting_threads -= collectionOfWaiters.size();
+ }
}
+ return collectionOfWaiters;
+ }
}
\ No newline at end of file
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
index bd3562b..88d76ad 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/ServerSideMetadata.java
@@ -34,227 +34,227 @@ import com.google.common.collect.ImmutableSet;
* and by implementing it separately we can add additional metadata to it as needed.
*/
public class ServerSideMetadata {
-
- /**
- * Files composing the core. They are are referenced from the core's current commit point's segments_N file
- * which is ALSO included in this collection.
- */
- private final ImmutableCollection<CoreFileData> latestCommitFiles;
-
- /**
- * Index files related to current and previous commit points(if any).
- * These files do not matter when pushing contents to blob but they do matter if blob content being pulled conflicts with them.
- */
- private final ImmutableCollection<CoreFileData> allCommitsFiles;
-
- /**
- * Hash of the directory content used to make sure the content doesn't change as we proceed to pull new files from Blob
- * (if we need to pull new files from Blob)
- */
- private final String directoryHash;
-
- /**
- * Generation number of the local index.
- * This generation number is only meant to identify a scenario where local index generation number is higher than
- * what we have in blob. In that scenario we would switch index to a new directory when pulling contents from blob.
- * Because in the presence of higher generation number locally, blob contents cannot establish their legitimacy.
- */
- private final long generation;
- private final SolrCore core;
- private final String coreName;
- private final CoreContainer container;
-
- /**
- * Given a core name, builds the local metadata
- *
- *
- * @throws Exception if core corresponding to <code>coreName</code> can't be found.
- */
- public ServerSideMetadata(String coreName, CoreContainer container) throws Exception {
- this.coreName = coreName;
- this.container = container;
- this.core = container.getCore(coreName);
-
- if (core == null) {
- throw new Exception("Can't find core " + coreName);
- }
-
- try {
- IndexCommit latestCommit = core.getDeletionPolicy().getLatestCommit();
- if (latestCommit == null) {
- throw new BlobException("Core " + coreName + " has no available commit point");
- }
-
- generation = latestCommit.getGeneration();
-
- // Work around possible bug returning same file multiple times by using a set here
- // See org.apache.solr.handler.ReplicationHandler.getFileList()
- ImmutableCollection.Builder<CoreFileData> latestCommitBuilder = new ImmutableSet.Builder<>();
- ImmutableCollection.Builder<CoreFileData> allCommitsBuilder;
-
- Directory coreDir = core.getDirectoryFactory().get(core.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
- try {
- // Capture now the hash and verify again if we need to pull content from the Blob store into this directory,
- // to make sure there are no local changes at the same time that might lead to a corruption in case of interaction
- // with the download.
- directoryHash = getSolrDirectoryHash(coreDir);
-
- buildCommitFiles(coreDir, latestCommit, latestCommitBuilder);
-
- // A note on listCommits says that it does not guarantee consistent results if a commit is in progress.
- // But in blob context we serialize commits and pulls by proper locking therefore we should be good here.
- List<IndexCommit> allCommits = DirectoryReader.listCommits(coreDir);
-
- // optimization: normally we would only be dealing with one commit point. In that case just reuse latest commit files builder.
- if (allCommits.size() > 1 ){
- allCommitsBuilder = new ImmutableSet.Builder<>();
- for (IndexCommit commit: allCommits) {
- buildCommitFiles(coreDir, commit, allCommitsBuilder);
- }
- } else {
- // we should always have a commit point as verified in the beginning of this method.
- assert allCommits.size() == 1 && allCommits.get(0).equals(latestCommit);
- allCommitsBuilder = latestCommitBuilder;
- }
- } finally {
- core.getDirectoryFactory().release(coreDir);
- }
- latestCommitFiles = latestCommitBuilder.build();
- allCommitsFiles = allCommitsBuilder.build();
- } finally {
- core.close();
- }
+
+ /**
+ * Files composing the core. They are are referenced from the core's current commit point's segments_N file
+ * which is ALSO included in this collection.
+ */
+ private final ImmutableCollection<CoreFileData> latestCommitFiles;
+
+ /**
+ * Index files related to current and previous commit points(if any).
+ * These files do not matter when pushing contents to blob but they do matter if blob content being pulled conflicts with them.
+ */
+ private final ImmutableCollection<CoreFileData> allCommitsFiles;
+
+ /**
+ * Hash of the directory content used to make sure the content doesn't change as we proceed to pull new files from Blob
+ * (if we need to pull new files from Blob)
+ */
+ private final String directoryHash;
+
+ /**
+ * Generation number of the local index.
+ * This generation number is only meant to identify a scenario where local index generation number is higher than
+ * what we have in blob. In that scenario we would switch index to a new directory when pulling contents from blob.
+ * Because in the presence of higher generation number locally, blob contents cannot establish their legitimacy.
+ */
+ private final long generation;
+ private final SolrCore core;
+ private final String coreName;
+ private final CoreContainer container;
+
+ /**
+ * Given a core name, builds the local metadata
+ *
+ *
+ * @throws Exception if core corresponding to <code>coreName</code> can't be found.
+ */
+ public ServerSideMetadata(String coreName, CoreContainer container) throws Exception {
+ this.coreName = coreName;
+ this.container = container;
+ this.core = container.getCore(coreName);
+
+ if (core == null) {
+ throw new Exception("Can't find core " + coreName);
}
- private void buildCommitFiles(Directory coreDir, IndexCommit commit, ImmutableCollection.Builder<CoreFileData> builder) throws IOException {
- for (String fileName : commit.getFileNames()) {
- // Note we add here all segment related files as well as the commit point's segments_N file
- // Note commit points do not contain lock (write.lock) files.
- try (final IndexInput indexInput = coreDir.openInput(fileName, IOContext.READONCE)) {
- long length = indexInput.length();
- long checksum = CodecUtil.retrieveChecksum(indexInput);
- builder.add(new CoreFileData(fileName, length, checksum));
- }
+ try {
+ IndexCommit latestCommit = core.getDeletionPolicy().getLatestCommit();
+ if (latestCommit == null) {
+ throw new BlobException("Core " + coreName + " has no available commit point");
+ }
+
+ generation = latestCommit.getGeneration();
+
+ // Work around possible bug returning same file multiple times by using a set here
+ // See org.apache.solr.handler.ReplicationHandler.getFileList()
+ ImmutableCollection.Builder<CoreFileData> latestCommitBuilder = new ImmutableSet.Builder<>();
+ ImmutableCollection.Builder<CoreFileData> allCommitsBuilder;
+
+ Directory coreDir = core.getDirectoryFactory().get(core.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
+ try {
+ // Capture now the hash and verify again if we need to pull content from the Blob store into this directory,
+ // to make sure there are no local changes at the same time that might lead to a corruption in case of interaction
+ // with the download.
+ directoryHash = getSolrDirectoryHash(coreDir);
+
+ buildCommitFiles(coreDir, latestCommit, latestCommitBuilder);
+
+ // A note on listCommits says that it does not guarantee consistent results if a commit is in progress.
+ // But in blob context we serialize commits and pulls by proper locking therefore we should be good here.
+ List<IndexCommit> allCommits = DirectoryReader.listCommits(coreDir);
+
+ // optimization: normally we would only be dealing with one commit point. In that case just reuse latest commit files builder.
+ if (allCommits.size() > 1 ){
+ allCommitsBuilder = new ImmutableSet.Builder<>();
+ for (IndexCommit commit: allCommits) {
+ buildCommitFiles(coreDir, commit, allCommitsBuilder);
+ }
+ } else {
+ // we should always have a commit point as verified in the beginning of this method.
+ assert allCommits.size() == 1 && allCommits.get(0).equals(latestCommit);
+ allCommitsBuilder = latestCommitBuilder;
}
+ } finally {
+ core.getDirectoryFactory().release(coreDir);
+ }
+ latestCommitFiles = latestCommitBuilder.build();
+ allCommitsFiles = allCommitsBuilder.build();
+ } finally {
+ core.close();
}
-
- public String getCoreName() {
- return this.coreName;
- }
-
- public CoreContainer getCoreContainer() {
- return this.container;
+ }
+
+ private void buildCommitFiles(Directory coreDir, IndexCommit commit, ImmutableCollection.Builder<CoreFileData> builder) throws IOException {
+ for (String fileName : commit.getFileNames()) {
+ // Note we add here all segment related files as well as the commit point's segments_N file
+ // Note commit points do not contain lock (write.lock) files.
+ try (final IndexInput indexInput = coreDir.openInput(fileName, IOContext.READONCE)) {
+ long length = indexInput.length();
+ long checksum = CodecUtil.retrieveChecksum(indexInput);
+ builder.add(new CoreFileData(fileName, length, checksum));
+ }
}
-
- public long getGeneration() {
- return this.generation;
+ }
+
+ public String getCoreName() {
+ return this.coreName;
+ }
+
+ public CoreContainer getCoreContainer() {
+ return this.container;
+ }
+
+ public long getGeneration() {
+ return this.generation;
+ }
+
+ public String getDirectoryHash() {
+ return this.directoryHash;
+ }
+
+ public ImmutableCollection<CoreFileData> getLatestCommitFiles(){
+ return this.latestCommitFiles;
+ }
+
+ public ImmutableCollection<CoreFileData> getAllCommitsFiles() {
+ return this.allCommitsFiles;
+ }
+
+ /**
+ * Returns <code>true</code> if the contents of the directory passed into this method is identical to the contents of
+ * the directory of the Solr core of this instance, taken at instance creation time.<p>
+ *
+ * Passing in the Directory (expected to be the directory of the same core used during construction) because it seems
+ * safer than trying to get it again here...
+ */
+ public boolean isSameDirectoryContent(Directory coreDir) throws NoSuchAlgorithmException, IOException {
+ return directoryHash.equals(getSolrDirectoryHash(coreDir));
+ }
+
+ /**
+ * Computes a hash of a Solr Directory in order to make sure the directory doesn't change as we pull content into it (if we need to
+ * pull content into it)
+ */
+ private String getSolrDirectoryHash(Directory coreDir) throws NoSuchAlgorithmException, IOException {
+ MessageDigest digest = MessageDigest.getInstance("sha1"); // not sure MD5 is available in Solr jars
+
+ String[] filesNames = coreDir.listAll();
+ // Computing the hash requires items to be submitted in the same order...
+ Arrays.sort(filesNames);
+
+ for (String fileName : filesNames) {
+ // .lock files come and go. Ignore them (we're closing the Index Writer before adding any pulled files to the Core)
+ if (!fileName.endsWith(".lock")) {
+ // Hash the file name and file size so we can tell if any file has changed (or files appeared or vanished)
+ digest.update(fileName.getBytes());
+ try {
+ digest.update(Long.toString(coreDir.fileLength(fileName)).getBytes());
+ } catch (FileNotFoundException fnf) {
+ // The file was deleted between the listAll() and the check, use an impossible size to not match a digest
+ // for which the file is completely present or completely absent.
+ digest.update(Long.toString(-42).getBytes());
+ }
+ }
}
- public String getDirectoryHash() {
- return this.directoryHash;
+ final String hash = new String(Hex.encodeHex(digest.digest()));
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "collectionName=" + core.getCoreDescriptor().getCollectionName() +
+ " shardName=" + core.getCoreDescriptor().getCloudDescriptor().getShardId() +
+ " coreName=" + core.getName() +
+ " generation=" + generation;
+ }
+
+ /**
+ * Information we capture per local core file (segments_N file *included*)
+ */
+ public static class CoreFileData {
+ /** Local file name, no path */
+ private final String fileName;
+ /** Size in bytes */
+ private final long fileSize;
+ private final long checksum;
+
+ CoreFileData(String fileName, long fileSize, long checksum) {
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ this.checksum = checksum;
}
- public ImmutableCollection<CoreFileData> getLatestCommitFiles(){
- return this.latestCommitFiles;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CoreFileData other = (CoreFileData) o;
+
+ return Objects.equals(fileName, other.fileName) &&
+ Objects.equals(fileSize, other.fileSize) &&
+ Objects.equals(checksum, other.checksum);
}
- public ImmutableCollection<CoreFileData> getAllCommitsFiles() {
- return this.allCommitsFiles;
+ public String getFileName() {
+ return fileName;
}
- /**
- * Returns <code>true</code> if the contents of the directory passed into this method is identical to the contents of
- * the directory of the Solr core of this instance, taken at instance creation time.<p>
- *
- * Passing in the Directory (expected to be the directory of the same core used during construction) because it seems
- * safer than trying to get it again here...
- */
- public boolean isSameDirectoryContent(Directory coreDir) throws NoSuchAlgorithmException, IOException {
- return directoryHash.equals(getSolrDirectoryHash(coreDir));
+ public long getFileSize() {
+ return fileSize;
}
- /**
- * Computes a hash of a Solr Directory in order to make sure the directory doesn't change as we pull content into it (if we need to
- * pull content into it)
- */
- private String getSolrDirectoryHash(Directory coreDir) throws NoSuchAlgorithmException, IOException {
- MessageDigest digest = MessageDigest.getInstance("sha1"); // not sure MD5 is available in Solr jars
-
- String[] filesNames = coreDir.listAll();
- // Computing the hash requires items to be submitted in the same order...
- Arrays.sort(filesNames);
-
- for (String fileName : filesNames) {
- // .lock files come and go. Ignore them (we're closing the Index Writer before adding any pulled files to the Core)
- if (!fileName.endsWith(".lock")) {
- // Hash the file name and file size so we can tell if any file has changed (or files appeared or vanished)
- digest.update(fileName.getBytes());
- try {
- digest.update(Long.toString(coreDir.fileLength(fileName)).getBytes());
- } catch (FileNotFoundException fnf) {
- // The file was deleted between the listAll() and the check, use an impossible size to not match a digest
- // for which the file is completely present or completely absent.
- digest.update(Long.toString(-42).getBytes());
- }
- }
- }
-
- final String hash = new String(Hex.encodeHex(digest.digest()));
- return hash;
+ public long getChecksum() {
+ return checksum;
}
@Override
- public String toString() {
- return "collectionName=" + core.getCoreDescriptor().getCollectionName() +
- " shardName=" + core.getCoreDescriptor().getCloudDescriptor().getShardId() +
- " coreName=" + core.getName() +
- " generation=" + generation;
- }
-
- /**
- * Information we capture per local core file (segments_N file *included*)
- */
- public static class CoreFileData {
- /** Local file name, no path */
- private final String fileName;
- /** Size in bytes */
- private final long fileSize;
- private final long checksum;
-
- CoreFileData(String fileName, long fileSize, long checksum) {
- this.fileName = fileName;
- this.fileSize = fileSize;
- this.checksum = checksum;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- CoreFileData other = (CoreFileData) o;
-
- return Objects.equals(fileName, other.fileName) &&
- Objects.equals(fileSize, other.fileSize) &&
- Objects.equals(checksum, other.checksum);
- }
-
- public String getFileName() {
- return fileName;
- }
-
- public long getFileSize() {
- return fileSize;
- }
-
- public long getChecksum() {
- return checksum;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(fileName, fileSize, checksum);
- }
+ public int hashCode() {
+ return Objects.hash(fileName, fileSize, checksum);
}
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
index 2bc8054..b01ee94 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtil.java
@@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory;
*/
public class SharedStoreResolutionUtil {
- private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static String SEGMENTS_N_PREFIX = "segments_";
@@ -153,7 +153,7 @@ public class SharedStoreResolutionUtil {
" localSize=%s blobSize=%s localChecksum=%s blobCheckSum=%s",
distant.getSharedBlobName(), local.getCoreName(), bf.getSolrFileName(), bf.getBlobName(),
cf.getFileSize(), bf.getFileSize(), cf.getChecksum(), bf.getChecksum());
- logger.info(message);
+ log.info(message);
localConflictingWithBlob = true;
}
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
index 3212cdf..a50139a 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobDeleteManager.java
@@ -1,5 +1,6 @@
package org.apache.solr.store.blob.process;
+import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
@@ -11,12 +12,16 @@ import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.solr.store.blob.client.CoreStorageClient;
import org.apache.solr.store.blob.metadata.CorePushPull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Manager of blobs (files) to delete, putting them in a queue (if space left on the queue) then consumed and processed
* by {@link BlobDeleterTask}
*/
public class BlobDeleteManager {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Limit to the number of blob files to delete accepted on the delete queue (and lost in case of server crash). When
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
index b92693e..0bae5f6 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
@@ -34,109 +34,108 @@ import org.slf4j.LoggerFactory;
*/
public abstract class CoreSyncFeeder implements Runnable, Closeable {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- protected final CoreContainer cores;
-
- /**
- * Maximum number of elements in the queue, NOT counting re-inserts after failures. Total queue size might therefore
- * exceed this value by the number of syncThreads which is around 5 or 10...
- * <p>
- * Note that this queue sits behind other tracking queues (see {@link CoreUpdateTracker} and
- * {@link CorePullTracker}). The other queue has to be large, this one does not.
- */
- protected static final int ALMOST_MAX_WORKER_QUEUE_SIZE = 2000;
-
- /**
- * When a transient error occurs, the number of attempts to sync with Blob before giving up. Attempts are spaced by
- * at least 10 seconds (see {@link CorePullTask#MIN_RETRY_DELAY_MS}) which means we'll retry for at least 90 seconds
- * before giving up. This is something to adjust as the implementation of the delay between retries is cleaned up,
- * see {@link CorePullTask}.
- */
- protected static final int MAX_ATTEMPTS = 10;
-
- private final int numSyncThreads;
-
- /**
- * Used to interrupt close()
- */
- private volatile Thread executionThread;
- private volatile boolean closed = false;
-
- protected CoreSyncFeeder(CoreContainer cores, int numSyncThreads) {
- this.numSyncThreads = numSyncThreads;
- this.cores = cores;
- }
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected final CoreContainer cores;
+
+ /**
+ * Maximum number of elements in the queue, NOT counting re-inserts after failures. Total queue size might therefore
+ * exceed this value by the number of syncThreads which is around 5 or 10...
+ * <p>
+ * Note that this queue sits behind other tracking queues (see {@link CoreUpdateTracker} and
+ * {@link CorePullTracker}). The other queue has to be large, this one does not.
+ */
+ protected static final int ALMOST_MAX_WORKER_QUEUE_SIZE = 2000;
+
+ /**
+ * When a transient error occurs, the number of attempts to sync with Blob before giving up. Attempts are spaced by
+ * at least 10 seconds (see {@link CorePullTask#MIN_RETRY_DELAY_MS}) which means we'll retry for at least 90 seconds
+ * before giving up. This is something to adjust as the implementation of the delay between retries is cleaned up,
+ * see {@link CorePullTask}.
+ */
+ protected static final int MAX_ATTEMPTS = 10;
+
+ private final int numSyncThreads;
+
+ /**
+ * Used to interrupt close()
+ */
+ private volatile Thread executionThread;
+ private volatile boolean closed = false;
+
+ protected CoreSyncFeeder(CoreContainer cores, int numSyncThreads) {
+ this.numSyncThreads = numSyncThreads;
+ this.cores = cores;
+ }
+
+ @Override
+ public void run() {
+ // Record where we run so we can be interrupted from close(). If we get interrupted before this point we will
+ // not
+ // close anything and not log anything, but that's ok we haven't started anything either.
+ this.executionThread = Thread.currentThread();
+ // If close() executed before runningFeeder was set, we need to exit
+ if (closed) { return; }
+ try {
+ // We'll be submitting tasks to a queue from which threads are going to pick them up and execute them.
+ // PeerSyncer uses a ThreadPoolExecutor for this. Here instead we explicitly start the threads to
+ // simplify our life (and the code), because by not using an executor we're not forced to use a
+ // BlockingQueue<Runnable> as the work queue and instead use a DeduplicatingList.
+ NamedThreadFactory threadFactory = new NamedThreadFactory(getMonsterThreadName());
+ // TODO set daemon?
+ Set<Thread> syncerThreads = new HashSet<>();
+ for (int i = 0; i < numSyncThreads; i++) {
+ Thread t = threadFactory.newThread(this.getSyncer());
+ syncerThreads.add(t);
+ }
- @Override
- public void run() {
- // Record where we run so we can be interrupted from close(). If we get interrupted before this point we will
- // not
- // close anything and not log anything, but that's ok we haven't started anything either.
- this.executionThread = Thread.currentThread();
- // If close() executed before runningFeeder was set, we need to exit
- if (closed) { return; }
- try {
- // We'll be submitting tasks to a queue from which threads are going to pick them up and execute them.
- // PeerSyncer uses a ThreadPoolExecutor for this. Here instead we explicitly start the threads to
- // simplify our life (and the code), because by not using an executor we're not forced to use a
- // BlockingQueue<Runnable> as the work queue and instead use a DeduplicatingList.
- NamedThreadFactory threadFactory = new NamedThreadFactory(getMonsterThreadName());
- // TODO set daemon?
- Set<Thread> syncerThreads = new HashSet<>();
- for (int i = 0; i < numSyncThreads; i++) {
- Thread t = threadFactory.newThread(this.getSyncer());
- syncerThreads.add(t);
- }
-
- try {
- // Starting the threads after having created them so that we can interrupt all threads in the finally
- for (Thread t : syncerThreads) {
- t.start();
- }
-
- feedTheMonsters();
- } catch (Throwable e) {
- if (!closed) {
- log.error("CoreSyncFeeder thread encountered an error and is exiting "
- + "while close() was not called. " + ExceptionUtils.getStackTrace(e));
- }
- } finally {
- // If we stop, have our syncer "thread pool" stop as well since there's not much they can do anyway
- // then...
- for (Thread t : syncerThreads) {
- t.interrupt();
- }
- }
-
- } finally {
- this.executionThread = null;
+ try {
+ // Starting the threads after having created them so that we can interrupt all threads in the finally
+ for (Thread t : syncerThreads) {
+ t.start();
}
- }
-
- boolean shouldContinueRunning() {
- return !this.cores.isShutDown();
- }
- @Override
- public void close() {
- if (!closed) {
- closed = true;
- Thread thread = this.executionThread;
- if (thread != null) {
- this.executionThread = null; // race to set to null but ok to try to interrupt twice
- log.info(String.format("Closing CoreSyncFeeder; interrupting execution thread %s.", thread.getName()));
- thread.interrupt();
- } else {
- log.warn("Closing CoreSyncFeeder before any syncer thread was started. Weird.");
+ feedTheMonsters();
+ } catch (Throwable e) {
+ if (!closed) {
+ log.error("CoreSyncFeeder thread encountered an error and is exiting "
+ + "while close() was not called. " + ExceptionUtils.getStackTrace(e));
+ }
+ } finally {
+ // If we stop, have our syncer "thread pool" stop as well since there's not much they can do anyway
+ // then...
+ for (Thread t : syncerThreads) {
+ t.interrupt();
}
}
+ } finally {
+ this.executionThread = null;
+ }
+ }
+
+ boolean shouldContinueRunning() {
+ return !this.cores.isShutDown();
+ }
+
+ @Override
+ public void close() {
+ if (!closed) {
+ closed = true;
+ Thread thread = this.executionThread;
+ if (thread != null) {
+ this.executionThread = null; // race to set to null but ok to try to interrupt twice
+ log.info(String.format("Closing CoreSyncFeeder; interrupting execution thread %s.", thread.getName()));
+ thread.interrupt();
+ } else {
+ log.warn("Closing CoreSyncFeeder before any syncer thread was started. Weird.");
+ }
}
+ }
- abstract String getMonsterThreadName();
+ abstract String getMonsterThreadName();
- abstract void feedTheMonsters() throws InterruptedException;
+ abstract void feedTheMonsters() throws InterruptedException;
- abstract Runnable getSyncer();
+ abstract Runnable getSyncer();
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java b/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
index 48d065e..4a428c2 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/util/BlobStoreUtils.java
@@ -1,6 +1,4 @@
package org.apache.solr.store.blob.util;
-import static org.junit.Assert.assertTrue;
-
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.UUID;
@@ -57,7 +55,7 @@ public class BlobStoreUtils {
* @throws SolrException if the local core was not successfully sync'd.
*/
public static void syncLocalCoreWithSharedStore(String collectionName, String coreName, String shardName, CoreContainer coreContainer) throws SolrException {
- assertTrue(coreContainer.isZooKeeperAware());
+ assert coreContainer.isZooKeeperAware();
ZkController zkController = coreContainer.getZkController();
SharedShardMetadataController sharedMetadataController = coreContainer.getSharedStoreManager().getSharedShardMetadataController();
diff --git a/solr/core/src/java/org/apache/solr/store/blob/util/DeduplicatingList.java b/solr/core/src/java/org/apache/solr/store/blob/util/DeduplicatingList.java
index df46227..ed03fa1 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/util/DeduplicatingList.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/util/DeduplicatingList.java
@@ -24,161 +24,161 @@ import java.util.Map;
*/
public class DeduplicatingList<K, V extends DeduplicatingList.Deduplicatable<K>> {
- /**
- * We need the "V" parameter of {@link DeduplicatingList} to be able to provide us the "K" parameter used as key.
- */
- public interface Deduplicatable<DK> {
- DK getDedupeKey();
+ /**
+ * We need the "V" parameter of {@link DeduplicatingList} to be able to provide us the "K" parameter used as key.
+ */
+ public interface Deduplicatable<DK> {
+ DK getDedupeKey();
+ }
+
+ /**
+ * Given v1 and v2 (corresponding to type V in {@link DeduplicatingList}) to be stored in the list and
+ * that have the same key (as per {@link Deduplicatable#getDedupeKey()}, parameter MK here), returns a merged value that
+ * has the same key and that can replace the two entries in the list (based on domain specific knowledge that this is ok).<p>
+ * The behaviour of this method is undefined and totally unpredictable if v1 and v2 do not have the same key. Be warned :)
+ */
+ public interface Merger<MK, MV extends DeduplicatingList.Deduplicatable<MK>> {
+ MV merge(MV v1, MV v2);
+ }
+
+ /**
+ * Building a linked list from scratch here to allow access to specific list entries (tracked in another
+ * data structure) in constant time.
+ * Instances must only be accessed with {@link #lock} held, as they are mutable.
+ */
+ static private class TaskListNode<TV> {
+ TaskListNode next;
+ TV elem;
+
+ TaskListNode(TaskListNode next, TV elem) {
+ this.next = next;
+ this.elem = elem;
}
-
- /**
- * Given v1 and v2 (corresponding to type V in {@link DeduplicatingList}) to be stored in the list and
- * that have the same key (as per {@link Deduplicatable#getDedupeKey()}, parameter MK here), returns a merged value that
- * has the same key and that can replace the two entries in the list (based on domain specific knowledge that this is ok).<p>
- * The behaviour of this method is undefined and totally unpredictable if v1 and v2 do not have the same key. Be warned :)
- */
- public interface Merger<MK, MV extends DeduplicatingList.Deduplicatable<MK>> {
- MV merge(MV v1, MV v2);
- }
-
- /**
- * Building a linked list from scratch here to allow access to specific list entries (tracked in another
- * data structure) in constant time.
- * Instances must only be accessed with {@link #lock} held, as they are mutable.
- */
- static private class TaskListNode<TV> {
- TaskListNode next;
- TV elem;
-
- TaskListNode(TaskListNode next, TV elem) {
- this.next = next;
- this.elem = elem;
+ }
+
+ /**
+ * Guards access to all the rest of the data here.
+ */
+ private final Object lock = new Object();
+
+ private final int almostMaxListSize;
+ private final DeduplicatingList.Merger<K, V> merger;
+
+ private TaskListNode<V> head = null;
+ private TaskListNode<V> tail = null;
+
+ private final Map<K, TaskListNode<V>> keyToListNode = new HashMap<>();
+
+ private int size = 0;
+
+ /**
+ * Builds an empty list of a given maximum size.
+ * @param almostMaxListSize The maximum number of "new" elements accepted in the list, excluding element reenqueues (of which
+ * there are expected to be very few and related to number of processing threads in a thread pool for
+ * example). When that number is reached, {@link #addDeduplicated} blocks until the List size
+ * is reduced enough.
+ */
+ public DeduplicatingList(int almostMaxListSize, DeduplicatingList.Merger<K, V> merger) {
+ this.almostMaxListSize = almostMaxListSize;
+ this.merger = merger;
+ }
+
+ /**
+ * Adds an entry to the tail of the list unless there's already an existing entry in the list the added entry can be merged with.
+ * If that's the case, the added entry is merged instead of being added.
+ * @param isReenqueue <ul><li>when <code>true</code>, the put is allowed to make the list exceed its {@link #almostMaxListSize}. In practice
+ * we'll exceed at most by a few units.</li>
+ * <li>when <code>false</code>, if the list is too full, the call blocks waiting until the insert can be done</li></ul>
+ */
+ public void addDeduplicated(final V value, boolean isReenqueue) throws InterruptedException {
+ synchronized (lock) {
+ TaskListNode<V> existingEntry = keyToListNode.get(value.getDedupeKey());
+
+ if (existingEntry != null) {
+ // We already have an entry for that core, merge.
+ existingEntry.elem = merger.merge(existingEntry.elem, value);
+
+ assert existingEntry.elem != null;
+ } else {
+ // Wait (if needed) until the insert can be done
+ while (!isReenqueue && size >= almostMaxListSize) {
+ lock.wait();
}
- }
-
- /**
- * Guards access to all the rest of the data here.
- */
- private final Object lock = new Object();
-
- private final int almostMaxListSize;
- private final DeduplicatingList.Merger<K, V> merger;
- private TaskListNode<V> head = null;
- private TaskListNode<V> tail = null;
-
- private final Map<K, TaskListNode<V>> keyToListNode = new HashMap<>();
-
- private int size = 0;
-
- /**
- * Builds an empty list of a given maximum size.
- * @param almostMaxListSize The maximum number of "new" elements accepted in the list, excluding element reenqueues (of which
- * there are expected to be very few and related to number of processing threads in a thread pool for
- * example). When that number is reached, {@link #addDeduplicated} blocks until the List size
- * is reduced enough.
- */
- public DeduplicatingList(int almostMaxListSize, DeduplicatingList.Merger<K, V> merger) {
- this.almostMaxListSize = almostMaxListSize;
- this.merger = merger;
+ addToTail(value);
+ }
}
-
- /**
- * Adds an entry to the tail of the list unless there's already an existing entry in the list the added entry can be merged with.
- * If that's the case, the added entry is merged instead of being added.
- * @param isReenqueue <ul><li>when <code>true</code>, the put is allowed to make the list exceed its {@link #almostMaxListSize}. In practice
- * we'll exceed at most by a few units.</li>
- * <li>when <code>false</code>, if the list is too full, the call blocks waiting until the insert can be done</li></ul>
- */
- public void addDeduplicated(final V value, boolean isReenqueue) throws InterruptedException {
- synchronized (lock) {
- TaskListNode<V> existingEntry = keyToListNode.get(value.getDedupeKey());
-
- if (existingEntry != null) {
- // We already have an entry for that core, merge.
- existingEntry.elem = merger.merge(existingEntry.elem, value);
-
- assert existingEntry.elem != null;
- } else {
- // Wait (if needed) until the insert can be done
- while (!isReenqueue && size >= almostMaxListSize) {
- lock.wait();
- }
-
- addToTail(value);
- }
- }
+ }
+
+ /**
+ * @return the value sitting at the head of the list or blocks waiting until there's one to return if the list is empty.
+ */
+ public V removeFirst() throws InterruptedException {
+ synchronized (lock) {
+ while (size == 0) {
+ lock.wait();
+ }
+
+ return removeHead();
}
-
- /**
- * @return the value sitting at the head of the list or blocks waiting until there's one to return if the list is empty.
- */
- public V removeFirst() throws InterruptedException {
- synchronized (lock) {
- while (size == 0) {
- lock.wait();
- }
-
- return removeHead();
- }
+ }
+
+ /**
+ * @return number of entries in the list
+ */
+ public int size() {
+ synchronized (lock) {
+ return size;
}
-
- /**
- * @return number of entries in the list
- */
- public int size() {
- synchronized (lock) {
- return size;
- }
+ }
+
+ /**
+ * The key for the value being added should not already be present in the list.
+ * The {@link #lock} must be held when calling this method.
+ */
+ private void addToTail(final V value) {
+ TaskListNode<V> newNode = new TaskListNode<>(null, value);
+
+ if (tail == null) {
+ // List is empty
+ assert head == null;
+ assert size == 0;
+ head = newNode;
+ // Wake up threads blocked in getTask(), if any
+ lock.notifyAll();
+ } else {
+ // List not empty (means nobody is blocked sleeping)
+ assert head != null;
+ tail.next = newNode;
}
- /**
- * The key for the value being added should not already be present in the list.
- * The {@link #lock} must be held when calling this method.
- */
- private void addToTail(final V value) {
- TaskListNode<V> newNode = new TaskListNode<>(null, value);
-
- if (tail == null) {
- // List is empty
- assert head == null;
- assert size == 0;
- head = newNode;
- // Wake up threads blocked in getTask(), if any
- lock.notifyAll();
- } else {
- // List not empty (means nobody is blocked sleeping)
- assert head != null;
- tail.next = newNode;
- }
-
- TaskListNode<V> old = keyToListNode.put(value.getDedupeKey(), newNode);
- assert old == null;
-
- tail = newNode;
- size++;
+ TaskListNode<V> old = keyToListNode.put(value.getDedupeKey(), newNode);
+ assert old == null;
+
+ tail = newNode;
+ size++;
+ }
+
+ /**
+ * The {@link #lock} must be held while calling this method.
+ * This method can only be called if the list is not empty.
+ * @return the value at the head of the list (inserted before all other values into list).
+ */
+ private V removeHead() {
+ assert size > 0;
+ TaskListNode<V> oldHead = head;
+ head = oldHead.next;
+ if (head == null) {
+ // list now empty, we've removed the last element
+ assert size == 1;
+ assert tail == oldHead;
+
+ tail = null;
}
+ size--;
+ TaskListNode<V> fromSet = keyToListNode.remove(oldHead.elem.getDedupeKey());
+ assert oldHead == fromSet;
- /**
- * The {@link #lock} must be held while calling this method.
- * This method can only be called if the list is not empty.
- * @return the value at the head of the list (inserted before all other values into list).
- */
- private V removeHead() {
- assert size > 0;
- TaskListNode<V> oldHead = head;
- head = oldHead.next;
- if (head == null) {
- // list now empty, we've removed the last element
- assert size == 1;
- assert tail == oldHead;
-
- tail = null;
- }
- size--;
- TaskListNode<V> fromSet = keyToListNode.remove(oldHead.elem.getDedupeKey());
- assert oldHead == fromSet;
-
- return oldHead.elem;
- }
+ return oldHead.elem;
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index d3c1087..9b44ef4 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -18,7 +18,6 @@
package org.apache.solr.update.processor;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
-import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -1096,7 +1095,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
String coreName = req.getCore().getName();
String shardName = cloudDesc.getShardId();
String collectionName = cloudDesc.getCollectionName();
- assertEquals(replicaType, Replica.Type.SHARED);
+ assert Replica.Type.SHARED.equals(replicaType);
// Peers and subShardLeaders should only forward the update request to leader replica,
// hence not need to sync with the blob store at this point.
if (!isLeader || isSubShardLeader) {
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
index 6121199..9ccbf39 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/CorePushPullTest.java
@@ -54,6 +54,7 @@ public class CorePushPullTest extends SolrTestCaseJ4 {
@BeforeClass
public static void setupTest() throws Exception {
+ assumeWorkingMockito();
// set up the temp directory for a local blob store
localBlobDir = createTempDir("tempDir");
System.setProperty(LocalStorageClient.BLOB_STORE_LOCAL_FS_ROOT_DIR_PROPERTY, localBlobDir.resolve("LocalBlobStore/").toString());
diff --git a/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java b/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
index acc07ea..d145d7c 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/metadata/SharedStoreResolutionUtilTest.java
@@ -25,13 +25,13 @@ import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.store.blob.client.BlobCoreMetadata;
import org.apache.solr.store.blob.client.BlobCoreMetadata.BlobFile;
import org.apache.solr.store.blob.client.BlobCoreMetadataBuilder;
import org.apache.solr.store.blob.metadata.ServerSideMetadata.CoreFileData;
import org.apache.solr.store.blob.metadata.SharedStoreResolutionUtil.SharedMetadataResolutionResult;
-import org.apache.solr.store.shared.SolrCloudSharedStoreTestCase;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -40,10 +40,10 @@ import com.google.common.collect.ImmutableSet;
/**
* Unit tests for {@link SharedStoreResolutionUtil}.
*/
-public class SharedStoreResolutionUtilTest extends SolrCloudSharedStoreTestCase {
-
+public class SharedStoreResolutionUtilTest extends SolrTestCaseJ4 {
+
@BeforeClass
- public static void beforeClass() {
+ public static void setupTests() {
assumeWorkingMockito();
}
diff --git a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java b/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
index d1d5803..3cb3c86 100644
--- a/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
+++ b/solr/core/src/test/org/apache/solr/store/blob/util/BlobStoreUtilsTest.java
@@ -58,7 +58,8 @@ public class BlobStoreUtilsTest extends SolrCloudSharedStoreTestCase {
private static CoreStorageClient storageClient;
@BeforeClass
- public static void setupTestClass() throws Exception {
+ public static void setupTestClass() throws Exception {
+ assumeWorkingMockito();
sharedStoreRootPath = createTempDir("tempDir");
storageClient = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
diff --git a/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java b/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java
index 041cd52..845e780 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/metadata/SharedShardMetadataControllerTest.java
@@ -50,7 +50,8 @@ public class SharedShardMetadataControllerTest extends SolrCloudTestCase {
static SolrCloudManager cloudManager;
@BeforeClass
- public static void setupCluster() throws Exception {
+ public static void setupCluster() throws Exception {
+ assumeWorkingMockito();
configureCluster(1)
.addConfig("conf", configset("cloud-minimal"))
.configure();
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedZkUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedZkUpdateProcessorTest.java
index 0a5f9e6..254180f 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedZkUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedZkUpdateProcessorTest.java
@@ -57,7 +57,8 @@ public class DistributedZkUpdateProcessorTest extends SolrCloudSharedStoreTestCa
private static CoreStorageClient storageClient;
@BeforeClass
- public static void setupTestClass() throws Exception {
+ public static void setupTestClass() throws Exception {
+ assumeWorkingMockito();
sharedStoreRootPath = createTempDir("tempDir");
storageClient = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);