You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by yo...@apache.org on 2008/10/24 18:50:46 UTC
svn commit: r707683 - in /lucene/solr/trunk/src/java/org/apache/solr:
core/IndexDeletionPolicyWrapper.java handler/ReplicationHandler.java
handler/SnapPuller.java
Author: yonik
Date: Fri Oct 24 09:50:45 2008
New Revision: 707683
URL: http://svn.apache.org/viewvc?rev=707683&view=rev
Log:
SOLR-561: thread safe fixes, commit point reservation fixes
Modified:
lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
Modified: lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java?rev=707683&r1=707682&r2=707683&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/core/IndexDeletionPolicyWrapper.java Fri Oct 24 09:50:45 2008
@@ -19,10 +19,10 @@
* @see org.apache.lucene.index.IndexDeletionPolicy
*/
public class IndexDeletionPolicyWrapper implements IndexDeletionPolicy {
- private IndexDeletionPolicy deletionPolicy;
- private Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
- private Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
- private IndexCommit latestCommit;
+ private final IndexDeletionPolicy deletionPolicy;
+ private volatile Map<Long, IndexCommit> solrVersionVsCommits = new ConcurrentHashMap<Long, IndexCommit>();
+ private final Map<Long, Long> reserves = new ConcurrentHashMap<Long,Long>();
+ private volatile IndexCommit latestCommit;
public IndexDeletionPolicyWrapper(IndexDeletionPolicy deletionPolicy) {
this.deletionPolicy = deletionPolicy;
@@ -51,7 +51,20 @@
* @param reserveTime time in milliseconds for which the commit point is to be reserved
*/
public void setReserveDuration(Long indexVersion, long reserveTime) {
- reserves.put(indexVersion, System.currentTimeMillis() + reserveTime);
+ long timeToSet = System.currentTimeMillis() + reserveTime;
+ for(;;) {
+ Long previousTime = reserves.put(indexVersion, timeToSet);
+
+ // this is the common success case: the older time didn't exist, or
+ // came before the new time.
+ if (previousTime == null || previousTime <= timeToSet) break;
+
+ // At this point, we overwrote a longer reservation, so we want to restore the older one.
+ // the problem is that an even longer reservation may come in concurrently
+ // and we don't want to overwrite that one too. We simply keep retrying in a loop
+ // with the maximum time value we have seen.
+ timeToSet = previousTime;
+ }
}
private void cleanReserves() {
Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java?rev=707683&r1=707682&r2=707683&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/ReplicationHandler.java Fri Oct 24 09:50:45 2008
@@ -96,7 +96,7 @@
private Integer reserveCommitDuration = SnapPuller.readInterval("00:00:10");
- private IndexCommit indexCommitPoint;
+ private volatile IndexCommit indexCommitPoint;
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
rsp.setHttpCaching(false);
@@ -107,9 +107,10 @@
return;
}
if (command.equals(CMD_INDEX_VERSION)) {
- if (indexCommitPoint != null) {
- rsp.add(CMD_INDEX_VERSION, indexCommitPoint.getVersion());
- rsp.add(GENERATION, indexCommitPoint.getGeneration());
+ IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't change
+ if (commitPoint != null) {
+ rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
+ rsp.add(GENERATION, commitPoint.getGeneration());
} else {
// must never happen
rsp.add(CMD_INDEX_VERSION, 0L);
@@ -201,9 +202,8 @@
void doSnapPull() {
if (!isSlave)
return;
- if (snapPullLock.isLocked())
+ if (!snapPullLock.tryLock())
return;
- snapPullLock.lock();
try {
snapPuller.fetchLatestIndex(core);
} catch (Exception e) {
@@ -214,7 +214,6 @@
}
boolean isReplicating() {
- boolean b = snapPullLock.isLocked();
return snapPullLock.isLocked();
}
@@ -445,9 +444,10 @@
long[] versionAndGeneration = getIndexVersion();
details.add(CMD_INDEX_VERSION, versionAndGeneration[0]);
details.add(GENERATION, versionAndGeneration[1]);
- if (isMaster && indexCommitPoint != null) {
- details.add("replicatable" + CMD_INDEX_VERSION, indexCommitPoint.getVersion());
- details.add("replicatable" + GENERATION, indexCommitPoint.getGeneration());
+ IndexCommit commit = indexCommitPoint; // make a copy so it won't change
+ if (isMaster && commit != null) {
+ details.add("replicatable" + CMD_INDEX_VERSION, commit.getVersion());
+ details.add("replicatable" + GENERATION, commit.getGeneration());
}
if (isSlave) {
Modified: lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java?rev=707683&r1=707682&r2=707683&view=diff
==============================================================================
--- lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/solr/trunk/src/java/org/apache/solr/handler/SnapPuller.java Fri Oct 24 09:50:45 2008
@@ -58,35 +58,35 @@
public class SnapPuller {
private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
- private String masterUrl;
+ private final String masterUrl;
- private ReplicationHandler replicationHandler;
+ private final ReplicationHandler replicationHandler;
- private Integer pollInterval;
+ private final Integer pollInterval;
private String pollIntervalStr;
private ScheduledExecutorService executorService;
- private long executorStartTime;
+ private volatile long executorStartTime;
- private long replicationStartTime;
+ private volatile long replicationStartTime;
- private SolrCore solrCore;
+ private final SolrCore solrCore;
- private List<Map<String, Object>> filesToDownload;
+ private volatile List<Map<String, Object>> filesToDownload;
- private List<Map<String, Object>> confFilesToDownload;
+ private volatile List<Map<String, Object>> confFilesToDownload;
- private List<Map<String, Object>> filesDownloaded;
+ private volatile List<Map<String, Object>> filesDownloaded;
- private List<Map<String, Object>> confFilesDownloaded;
+ private volatile List<Map<String, Object>> confFilesDownloaded;
- private Map<String, Object> currentFile;
+ private volatile Map<String, Object> currentFile;
- private FileFetcher fileFetcher;
+ private volatile FileFetcher fileFetcher;
- private boolean stop = false;
+ private volatile boolean stop = false;
/**
* Disable the timer task for polling
@@ -214,7 +214,7 @@
IndexCommit commit;
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
try {
- searcherRefCounted = core.getSearcher();
+ searcherRefCounted = core.getNewestSearcher(false);
commit = searcherRefCounted.get().getReader().getIndexCommit();
} finally {
if (searcherRefCounted != null)
@@ -587,6 +587,7 @@
List<Map<String, Object>> getConfFilesDownloaded() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesDownloaded;
+ // NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}