You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/10/30 20:45:07 UTC
[lucene-solr] 02/11: @W-6635251 Refactor async pull code to avoid
static initialized data structures from colliding in unit tests (#390)
This is an automated email from the ASF dual-hosted git repository.
yonik pushed a commit to branch jira/SOLR-13101
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit ce12e737e17225fed8d623a18fdf5ab52df825fe
Author: Andy Vuong <a....@salesforce.com>
AuthorDate: Thu Sep 26 11:58:12 2019 -0400
@W-6635251 Refactor async pull code to avoid static initialized data structures from colliding in unit tests (#390)
* Refactor async pulls to use non-static initialized pulling data structures
* Update jenkin file to run tests on blob
---
Jenkinsfile | 0
.../java/org/apache/solr/servlet/HttpSolrCall.java | 3 +-
.../solr/store/blob/metadata/BlobCoreSyncer.java | 30 ++++++-----
.../solr/store/blob/process/BlobProcessUtil.java | 6 ++-
.../solr/store/blob/process/CorePullTask.java | 54 ++++++++------------
.../solr/store/blob/process/CorePullerFeeder.java | 59 ++++++++++++++++++++--
.../solr/store/blob/process/CoreSyncFeeder.java | 6 +--
.../solr/store/shared/SharedStoreManager.java | 7 +++
.../shared/SimpleSharedStoreEndToEndPullTest.java | 52 ++++++++++---------
9 files changed, 136 insertions(+), 81 deletions(-)
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..e69de29
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index adfbc5d..56235fa 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -305,7 +305,8 @@ public class HttpSolrCall {
if (replica != null) {
String coreName = replica.getCoreName();
String shardName = getShardName(collectionName, coreName);
- BlobCoreSyncer.pull(coreName, shardName, collectionName, cores, true, false);
+ BlobCoreSyncer syncer = cores.getSharedStoreManager().getBlobCoreSyncer();
+ syncer.pull(coreName, shardName, collectionName, cores, true, false);
core = cores.getCore(coreName);
if (!retry) {
action = RETRY;
diff --git a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
index 520a361..b281b27 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/metadata/BlobCoreSyncer.java
@@ -45,6 +45,8 @@ import java.lang.invoke.MethodHandles;
*/
public class BlobCoreSyncer {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
/**
* Threads wait for at most this duration before giving up on async pull to finish and returning with a PullInProgressException.
*/
@@ -70,20 +72,20 @@ public class BlobCoreSyncer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ @GuardedBy("coreSyncsInFlight")
+ private int total_waiting_threads = 0;
+
/** The shared store name for the core currently being pulled from blob. Value is collection of objects used for synchronization by all waiting threads.
* If both the locks on this map and on a specific SyncOnPullWait in the map are needed, the lock on the map must be acquired first.
*/
@GuardedBy("itself")
- private static final Map<String, Collection<SyncOnPullWait>> coreSyncsInFlight = Maps.newHashMap();
-
- @GuardedBy("coreSyncsInFlight")
- private static int total_waiting_threads = 0;
+ private final Map<String, Collection<SyncOnPullWait>> coreSyncsInFlight = Maps.newHashMap();
/**
* @return Total number of threads across all cores waiting for their respective core to be pulled from blob store
*/
@VisibleForTesting
- protected static int getTotalWaitingThreads() {
+ protected int getTotalWaitingThreads() {
synchronized (coreSyncsInFlight) {
return total_waiting_threads;
}
@@ -114,11 +116,11 @@ public class BlobCoreSyncer {
* This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
* Because of that, in method {@link #pull(PushPullData, boolean, boolean, CoreContainer)} we need to check again.
*/
- public static boolean isEmptyCoreAwaitingPull(String coreName) {
- return CorePullTask.isEmptyCoreAwaitingPull(coreName);
+ public boolean isEmptyCoreAwaitingPull(CoreContainer cores, String coreName) {
+ return CorePullerFeeder.isEmptyCoreAwaitingPull(cores, coreName);
}
- public static void pull(String coreName, String shardName, String collectionName, CoreContainer cores,
+ public void pull(String coreName, String shardName, String collectionName, CoreContainer cores,
boolean waitForSearcher, boolean emptyCoreAwaitingPull) {
// Initialize variables
SharedShardMetadataController sharedShardMetadataController = cores.getSharedStoreManager().getSharedShardMetadataController();
@@ -183,7 +185,7 @@ public class BlobCoreSyncer {
*
* @throws PullInProgressException In case a thread does not wait or times out before the async pull is finished
*/
- public static void pull(PushPullData pushPullData, boolean waitForSearcher, boolean emptyCoreAwaitingPull, CoreContainer cores) throws PullInProgressException {
+ public void pull(PushPullData pushPullData, boolean waitForSearcher, boolean emptyCoreAwaitingPull, CoreContainer cores) throws PullInProgressException {
// Is there another thread already working on the async pull?
final boolean pullAlreadyInProgress;
// Indicates if thread waits for the pull to finish or too many waiters already
@@ -199,7 +201,7 @@ public class BlobCoreSyncer {
// Only can have only one thread working on async pull of this core (and we do no logging while holding the lock)
// Let's understand what our role and actions are while holding the global lock and then execute on them without the lock.
synchronized (coreSyncsInFlight) {
- if (emptyCoreAwaitingPull && !isEmptyCoreAwaitingPull(pushPullData.getCoreName())) {
+ if (emptyCoreAwaitingPull && !isEmptyCoreAwaitingPull(cores, pushPullData.getCoreName())) {
// Core was observed empty awaiting pull and is no longer awaiting pull. This means the pull happened.
return;
}
@@ -336,7 +338,7 @@ public class BlobCoreSyncer {
* This is called whenever core from {@link CorePullTracker} finish its async pull(successfully or unsuccessfully)
* We use this to notify all waiting threads for a core that their wait has ended (if there are some waiting).
*/
- public static void finishedPull(String sharedStoreName, CoreSyncStatus status, BlobCoreMetadata blobMetadata, String message) {
+ public void finishedPull(String sharedStoreName, CoreSyncStatus status, BlobCoreMetadata blobMetadata, String message) {
Exception pullException = null;
final boolean isPullSuccessful = (status.isSuccess() ||
// Following statuses are not considered success in strictest definition of pull but for BlobSyncer
@@ -351,7 +353,7 @@ public class BlobCoreSyncer {
notifyEndOfPull(sharedStoreName, pullException);
}
- private static void throwPullInProgressException(String corename, String msgSuffix) throws PullInProgressException {
+ private void throwPullInProgressException(String corename, String msgSuffix) throws PullInProgressException {
String msg = SKIPPING_PULLING_CORE + " " + corename + " from blob " + msgSuffix;
log.info(msg);
// Note that longer term, this is the place where we could decide that if the async
@@ -366,7 +368,7 @@ public class BlobCoreSyncer {
* Also serves the purpose of being a memory barrier so that the waiting threads can check their SyncOnPullWait instances
* for updates.
*/
- private static void notifyEndOfPull(String sharedStoreName, Exception e) {
+ private void notifyEndOfPull(String sharedStoreName, Exception e) {
final Collection<SyncOnPullWait> collectionOfWaiters = pullEnded(sharedStoreName);
if (collectionOfWaiters != null) {
for (SyncOnPullWait w : collectionOfWaiters) {
@@ -381,7 +383,7 @@ public class BlobCoreSyncer {
* Cleans up the core from bookkeeping related to in-progress pulls and returns the collection of waiters for that core.
* Collection of returned waiters could be null as well.
*/
- private static Collection<SyncOnPullWait> pullEnded(String sharedStoreName) {
+ private Collection<SyncOnPullWait> pullEnded(String sharedStoreName) {
final Collection<SyncOnPullWait> collectionOfWaiters;
synchronized (coreSyncsInFlight) {
// Note that threads waiting for the pull to finish have references on their individual SyncOnPullWait instances,
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java b/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java
index 164818d..7df8aa4 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/BlobProcessUtil.java
@@ -44,6 +44,10 @@ public class BlobProcessUtil {
runningFeeder = initializeCorePullerFeeder(cpf);
}
+ public CorePullerFeeder getCorePullerFeeder() {
+ return runningFeeder;
+ }
+
/**
* Shutdown background blob puller process
*/
@@ -71,10 +75,10 @@ public class BlobProcessUtil {
*/
private void shutdownCorePullerFeeder() {
final CoreSyncFeeder rf = runningFeeder;
- runningFeeder = null;
if (rf != null) {
log.info("Shutting down CorePullerFeeder");
rf.close();
}
+ runningFeeder = null;
}
}
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
index e8af082..6450135 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullTask.java
@@ -43,8 +43,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
/**
* Code for pulling updates on a specific core to the Blob store. see {@CorePushTask} for the push version of this.
@@ -60,52 +58,42 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
*/
private static final long MIN_RETRY_DELAY_MS = 20000;
- /** Cores currently being pulled and timestamp of pull start (to identify stuck ones in logs) */
- private static final HashMap<String, Long> pullsInFlight = Maps.newHashMap();
-
- /** Cores unknown locally that got created as part of the pull process but for which no data has been pulled yet
- * from Blob store. If we ignore this transitory state, these cores can be accessed locally and simply look empty.
- * We'd rather treat threads attempting to access such cores like threads attempting to access an unknown core and
- * do a pull (or more likely wait for an ongoing pull to finish).<p>
- *
- * When this lock has to be taken as well as {@link #pullsInFlight}, then {@link #pullsInFlight} has to be taken first.
- * Reading this set implies acquiring the monitor of the set (as if @GuardedBy("itself")), but writing to the set
- * additionally implies holding the {@link #pullsInFlight}. This guarantees that while {@link #pullsInFlight}
- * is held, no element in the set is changing.
- */
- private static final Set<String> coresCreatedNotPulledYet = Sets.newHashSet();
-
private final CoreContainer coreContainer;
private final PullCoreInfo pullCoreInfo;
+
+ /**
+ * Data structures injected as dependencies that track the core pulls occurring
+ * in flight and the cores that have been created and not pulled. These should
+ * be passed in via a constructor from CorePullerFeeder where they are defined
+ * and are unique per CorePullerFeeder (itself a singleton).
+ */
+ private final HashMap<String, Long> pullsInFlight;
+ private final Set<String> coresCreatedNotPulledYet;
+
private final long queuedTimeMs;
private int attempts;
private long lastAttemptTimestamp;
private final PullCoreCallback callback;
- CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, PullCoreCallback callback) {
- this(coreContainer, pullCoreInfo, System.currentTimeMillis(), 0, 0L, callback);
+ CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, PullCoreCallback callback,
+ HashMap<String, Long> pullsInFlight, Set<String> coresCreatedNotPulledYet) {
+ this(coreContainer, pullCoreInfo, System.currentTimeMillis(), 0, 0L,
+ callback, pullsInFlight, coresCreatedNotPulledYet);
}
private CorePullTask(CoreContainer coreContainer, PullCoreInfo pullCoreInfo, long queuedTimeMs, int attempts,
- long lastAttemptTimestamp, PullCoreCallback callback) {
+ long lastAttemptTimestamp, PullCoreCallback callback, HashMap<String, Long> pullsInFlight,
+ Set<String> coresCreatedNotPulledYet) {
this.coreContainer = coreContainer;
this.pullCoreInfo = pullCoreInfo;
this.queuedTimeMs = queuedTimeMs;
this.attempts = attempts;
this.lastAttemptTimestamp = lastAttemptTimestamp;
this.callback = callback;
+ this.pullsInFlight = pullsInFlight;
+ this.coresCreatedNotPulledYet = coresCreatedNotPulledYet;
}
-
- /**
- * Returns a _hint_ that the given core might be locally empty because it is awaiting pull from Blob store.
- * This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
- */
- public static boolean isEmptyCoreAwaitingPull(String corename) {
- synchronized (coresCreatedNotPulledYet) {
- return coresCreatedNotPulledYet.contains(corename);
- }
- }
-
+
/**
* Needed for the {@link CorePullTask} to be used in a {@link DeduplicatingList}.
*/
@@ -164,7 +152,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
// We merge the tasks.
return new CorePullTask(task1.coreContainer, mergedPullCoreInfo,
Math.min(task1.queuedTimeMs, task2.queuedTimeMs), mergedAttempts, mergedLatAttemptsTimestamp,
- task1.callback);
+ task1.callback, task1.pullsInFlight, task1.coresCreatedNotPulledYet);
}
}
@@ -303,7 +291,7 @@ public class CorePullTask implements DeduplicatingList.Deduplicatable<String> {
// The following call can fail if blob is corrupt (in non trivial ways, trivial ways are identified by other cases)
// pull was successful
- if(isEmptyCoreAwaitingPull(pullCoreInfo.getCoreName())){
+ if (CorePullerFeeder.isEmptyCoreAwaitingPull(coreContainer, pullCoreInfo.getCoreName())) {
// the javadoc for pulledBlob suggests that it is only meant to be called if we pulled from scratch
// therefore only limiting this call when we created the local core for this pull ourselves
// BlobTransientLog.get().getCorruptCoreTracker().pulledBlob(pullCoreInfo.coreName, blobMetadata);
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
index b950d67..50bdff2 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CorePullerFeeder.java
@@ -17,6 +17,8 @@
package org.apache.solr.store.blob.process;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Set;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.store.blob.client.BlobCoreMetadata;
@@ -26,6 +28,9 @@ import org.apache.solr.store.blob.util.DeduplicatingList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
/**
* A pull version of {@link CoreSyncFeeder} then will continually ({@link #feedTheMonsters()}) to load up a work queue (
* {@link #pullTaskQueue}) with such tasks {@link CorePullTask} to keep the created threads busy :) The tasks will be
@@ -35,19 +40,54 @@ import org.slf4j.LoggerFactory;
public class CorePullerFeeder extends CoreSyncFeeder {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final CorePullTask.PullCoreCallback callback;
- protected final DeduplicatingList<String, CorePullTask> pullTaskQueue;
protected static final String PULLER_THREAD_PREFIX = "puller";
private static final int numPullerThreads = 5; // TODO : make configurable
+ private final CorePullTask.PullCoreCallback callback;
+
+ protected final DeduplicatingList<String, CorePullTask> pullTaskQueue;
+
+ /**
+ * Cores currently being pulled and timestamp of pull start (to identify stuck ones in logs)
+ *
+ * Note, it is the client's responsibility to synchronize accesses
+ */
+ private final HashMap<String, Long> pullsInFlight = Maps.newHashMap();
+
+ /** Cores unknown locally that got created as part of the pull process but for which no data has been pulled yet
+ * from Blob store. If we ignore this transitory state, these cores can be accessed locally and simply look empty.
+ * We'd rather treat threads attempting to access such cores like threads attempting to access an unknown core and
+ * do a pull (or more likely wait for an ongoing pull to finish).<p>
+ *
+ * When this lock has to be taken as well as {@link #pullsInFlight}, then {@link #pullsInFlight} has to be taken first.
+ * Reading this set implies acquiring the monitor of the set (as if @GuardedBy("itself")), but writing to the set
+ * additionally implies holding the {@link #pullsInFlight}. This guarantees that while {@link #pullsInFlight}
+ * is held, no element in the set is changing.
+ *
+ * Note, it is the client's responsibility to synchronize accesses
+ */
+ private final Set<String> coresCreatedNotPulledYet = Sets.newHashSet();
+
protected CorePullerFeeder(CoreContainer cores) {
super(cores, numPullerThreads);
this.pullTaskQueue = new DeduplicatingList<>(ALMOST_MAX_WORKER_QUEUE_SIZE, new CorePullTask.PullTaskMerger());
this.callback = new CorePullResult();
}
+ /**
+ * Returns a _hint_ that the given core might be locally empty because it is awaiting pull from Blob store.
+ * This is just a hint because as soon as the lock is released when the method returns, the status of the core could change.
+ */
+ public static boolean isEmptyCoreAwaitingPull(CoreContainer cores, String corename) {
+ CorePullerFeeder cpf = cores.getSharedStoreManager().getBlobProcessManager().getCorePullerFeeder();
+ Set<String> coresCreatedNotPulledYet = cpf.getCoresCreatedNotPulledYet();
+ synchronized (coresCreatedNotPulledYet) {
+ return coresCreatedNotPulledYet.contains(corename);
+ }
+ }
+
@Override
public Runnable getSyncer() {
return new CorePullerThread(this, pullTaskQueue);
@@ -62,6 +102,14 @@ public class CorePullerFeeder extends CoreSyncFeeder {
return callback;
}
+ protected HashMap<String, Long> getPullsInFlight() {
+ return pullsInFlight;
+ }
+
+ protected Set<String> getCoresCreatedNotPulledYet() {
+ return coresCreatedNotPulledYet;
+ }
+
@Override
void feedTheMonsters() throws InterruptedException {
CorePullTracker tracker = cores.getSharedStoreManager().getCorePullTracker();
@@ -73,7 +121,7 @@ public class CorePullerFeeder extends CoreSyncFeeder {
PullCoreInfo pci = tracker.getCoreToPull();
// Add the core to the list consumed by the thread doing the actual work
- CorePullTask pt = new CorePullTask(cores, pci, getCorePullTaskCallback());
+ CorePullTask pt = new CorePullTask(cores, pci, getCorePullTaskCallback(), pullsInFlight, coresCreatedNotPulledYet);
pullTaskQueue.addDeduplicated(pt, /* isReenqueue */ false);
syncsEnqueuedSinceLastLog++;
@@ -95,7 +143,7 @@ public class CorePullerFeeder extends CoreSyncFeeder {
* deduplicated on core name (the same core requiring two pulls from Blob will only be recorded one if the first
* pull has not been processed yet).
*/
- static class PullCoreInfo extends PushPullData implements DeduplicatingList.Deduplicatable<String> {
+ public static class PullCoreInfo extends PushPullData implements DeduplicatingList.Deduplicatable<String> {
private final boolean waitForSearcher;
private final boolean createCoreIfAbsent;
@@ -182,7 +230,8 @@ public class CorePullerFeeder extends CoreSyncFeeder {
log.warn(String.format("Pulling core %s failed. Giving up. Last status=%s attempts=%s . %s",
pullCoreInfo.getSharedStoreName(), status, pullTask.getAttempts(), message == null ? "" : message));
}
- BlobCoreSyncer.finishedPull(pullCoreInfo.getSharedStoreName(), status, blobMetadata, message);
+ BlobCoreSyncer syncer = cores.getSharedStoreManager().getBlobCoreSyncer();
+ syncer.finishedPull(pullCoreInfo.getSharedStoreName(), status, blobMetadata, message);
} catch (InterruptedException ie) {
close();
throw ie;
diff --git a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
index 820b4ee..c9cef36 100644
--- a/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
+++ b/solr/core/src/java/org/apache/solr/store/blob/process/CoreSyncFeeder.java
@@ -17,17 +17,16 @@
package org.apache.solr.store.blob.process;
import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.solr.core.CoreContainer;
-
//import com.force.commons.util.concurrent.NamedThreadFactory; difference?
import org.apache.lucene.util.NamedThreadFactory;
+import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
/**
* A {@link Runnable} that will start a set of threads {@link CorePullerThread} to process tasks
@@ -131,6 +130,7 @@ public abstract class CoreSyncFeeder implements Runnable, Closeable {
this.executionThread = null; // race to set to null but ok to try to interrupt twice
log.info(String.format("Closing CoreSyncFeeder; interrupting execution thread %s.", thread.getName()));
thread.interrupt();
+
} else {
log.warn("Closing CoreSyncFeeder before any syncer thread was started. Weird.");
}
diff --git a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
index 3dd525a..8a17f2b 100644
--- a/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
+++ b/solr/core/src/java/org/apache/solr/store/shared/SharedStoreManager.java
@@ -17,6 +17,7 @@
package org.apache.solr.store.shared;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.store.blob.metadata.BlobCoreSyncer;
import org.apache.solr.store.blob.process.BlobDeleteManager;
import org.apache.solr.store.blob.process.BlobProcessUtil;
import org.apache.solr.store.blob.process.CorePullTracker;
@@ -38,11 +39,13 @@ public class SharedStoreManager {
private BlobDeleteManager blobDeleteManager;
private BlobProcessUtil blobProcessUtil;
private CorePullTracker corePullTracker;
+ private BlobCoreSyncer blobCoreSyncer;
public SharedStoreManager(ZkController controller) {
zkController = controller;
// initialize BlobProcessUtil with the SharedStoreManager for background processes to be ready
blobProcessUtil = new BlobProcessUtil(zkController.getCoreContainer());
+ blobCoreSyncer = new BlobCoreSyncer();
}
@VisibleForTesting
@@ -106,4 +109,8 @@ public class SharedStoreManager {
corePullTracker = new CorePullTracker();
return corePullTracker ;
}
+
+ public BlobCoreSyncer getBlobCoreSyncer() {
+ return blobCoreSyncer;
+ }
}
\ No newline at end of file
diff --git a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
index 4a7921e..2c1bd7a 100644
--- a/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
+++ b/solr/core/src/test/org/apache/solr/store/shared/SimpleSharedStoreEndToEndPullTest.java
@@ -75,23 +75,21 @@ public class SimpleSharedStoreEndToEndPullTest extends SolrCloudSharedStoreTestC
setupCluster(2);
CloudSolrClient cloudClient = cluster.getSolrClient();
- // setup the test harnesses
- Map<String, CountDownLatch> cdlMap = new HashMap<>();
+ // this map tracks the async pull queues per solr process
+ Map<String, Map<String, CountDownLatch>> solrProcessesTaskTracker = new HashMap<>();
- CountDownLatch latch1 = new CountDownLatch(1);
JettySolrRunner solrProcess1 = cluster.getJettySolrRunner(0);
CoreStorageClient storageClient1 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient1), solrProcess1);
- configureTestBlobProcessForNode(solrProcess1, setupCallback(latch1));
+ Map<String, CountDownLatch> asyncPullLatches1 = configureTestBlobProcessForNode(solrProcess1);
- CountDownLatch latch2 = new CountDownLatch(1);
JettySolrRunner solrProcess2 = cluster.getJettySolrRunner(1);
CoreStorageClient storageClient2 = setupLocalBlobStoreClient(sharedStoreRootPath, DEFAULT_BLOB_DIR_NAME);
setupTestSharedClientForNode(getBlobStorageProviderTestInstance(storageClient2), solrProcess2);
- configureTestBlobProcessForNode(solrProcess2, setupCallback(latch2));
+ Map<String, CountDownLatch> asyncPullLatches2 = configureTestBlobProcessForNode(solrProcess2);
- cdlMap.put(solrProcess1.getNodeName(), latch1);
- cdlMap.put(solrProcess2.getNodeName(), latch2);
+ solrProcessesTaskTracker.put(solrProcess1.getNodeName(), asyncPullLatches1);
+ solrProcessesTaskTracker.put(solrProcess2.getNodeName(), asyncPullLatches2);
String collectionName = "sharedCollection";
int maxShardsPerNode = 1;
@@ -138,14 +136,20 @@ public class SimpleSharedStoreEndToEndPullTest extends SolrCloudSharedStoreTestC
assertEquals(1, resp.getResults().getNumFound());
assertEquals("1", (String) resp.getResults().get(0).getFieldValue("id"));
+ // we want to wait until the pull completes so set up a count down latch for the follower's
+ // core that we'll wait until pull finishes for
+ CountDownLatch latch = new CountDownLatch(1);
+ Map<String, CountDownLatch> asyncPullTasks = solrProcessesTaskTracker.get(followerReplica.getNodeName());
+ asyncPullTasks.put(followerReplica.getCoreName(), latch);
+
// query the follower directly to trigger the pull, this query should yield no results
- // as we don't wait
+ // as it returns immediately
followerDirectClient = getHttpSolrClient(followerReplica.getBaseUrl() + "/" + followerReplica.getCoreName());
resp = followerDirectClient.query(params);
assertEquals(0, resp.getResults().getNumFound());
// wait until pull is finished
- CountDownLatch latch = cdlMap.get(followerReplica.getNodeName());
+ //CountDownLatch latch = asyncPullTasks.get(followerReplica.getCoreName());
assertTrue(latch.await(120, TimeUnit.SECONDS));
// do another query to verify we've pulled everything
@@ -164,26 +168,26 @@ public class SimpleSharedStoreEndToEndPullTest extends SolrCloudSharedStoreTestC
}
}
- private BlobProcessUtil configureTestBlobProcessForNode(JettySolrRunner runner, PullCoreCallback callback) {
- CorePullerFeeder cpf = new CorePullerFeeder(runner.getCoreContainer()) {
+ private Map<String, CountDownLatch> configureTestBlobProcessForNode(JettySolrRunner runner) {
+ Map<String, CountDownLatch> asyncPullTracker = new HashMap<>();
+
+ CorePullerFeeder cpf = new CorePullerFeeder(runner.getCoreContainer()) {
@Override
protected CorePullTask.PullCoreCallback getCorePullTaskCallback() {
- return callback;
+ return new PullCoreCallback() {
+ @Override
+ public void finishedPull(CorePullTask pullTask, BlobCoreMetadata blobMetadata, CoreSyncStatus status,
+ String message) throws InterruptedException {
+ CountDownLatch latch = asyncPullTracker.get(pullTask.getPullCoreInfo().getCoreName());
+ latch.countDown();
+ }
+ };
}
};
+
BlobProcessUtil testUtil = new BlobProcessUtil(runner.getCoreContainer(), cpf);
setupTestBlobProcessUtilForNode(testUtil, runner);
- return testUtil;
+ return asyncPullTracker;
}
- private PullCoreCallback setupCallback(CountDownLatch latch) {
- return new PullCoreCallback() {
- @Override
- public void finishedPull(CorePullTask pullTask, BlobCoreMetadata blobMetadata, CoreSyncStatus status,
- String message) throws InterruptedException {
- assertTrue(status.isSuccess());
- latch.countDown();
- }
- };
- }
}