You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/08/25 11:09:15 UTC
svn commit: r1697616 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Author: mreutegg
Date: Tue Aug 25 09:09:14 2015
New Revision: 1697616
URL: http://svn.apache.org/r1697616
Log:
OAK-3283: Background read does not close StringSort
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1697616&r1=1697615&r2=1697616&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Aug 25 09:09:14 2015
@@ -76,6 +76,7 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.UncheckedExecutionException;
import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
import org.apache.jackrabbit.oak.commons.json.JsopReader;
import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
@@ -1802,118 +1803,123 @@ public final class DocumentNodeStore
Revision otherSeen = Revision.newRevision(0);
StringSort externalSort = JournalEntry.newSorter();
-
- Map<Revision, Revision> externalChanges = Maps.newHashMap();
- for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
- int machineId = e.getKey();
- if (machineId == clusterId) {
- // ignore own lastRev
- continue;
- }
- Revision r = e.getValue();
- Revision last = lastKnownRevision.get(machineId);
- if (last == null || r.compareRevisionTime(last) > 0) {
- lastKnownRevision.put(machineId, r);
- // OAK-2345
- // only consider as external change if
- // - the revision changed for the machineId
- // or
- // - the revision is within the time frame we remember revisions
- if (last != null
- || r.getTimestamp() > revisionPurgeMillis()) {
- externalChanges.put(r, otherSeen);
- }
- // collect external changes
- if (last != null && externalSort != null) {
- // add changes for this particular clusterId to the externalSort
- try {
- fillExternalChanges(externalSort, last, r, store);
- } catch (IOException e1) {
- LOG.error("backgroundRead: Exception while reading external changes from journal: "+e1, e1);
- externalSort = null;
+
+ try {
+ Map<Revision, Revision> externalChanges = Maps.newHashMap();
+ for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
+ int machineId = e.getKey();
+ if (machineId == clusterId) {
+ // ignore own lastRev
+ continue;
+ }
+ Revision r = e.getValue();
+ Revision last = lastKnownRevision.get(machineId);
+ if (last == null || r.compareRevisionTime(last) > 0) {
+ lastKnownRevision.put(machineId, r);
+ // OAK-2345
+ // only consider as external change if
+ // - the revision changed for the machineId
+ // or
+ // - the revision is within the time frame we remember revisions
+ if (last != null
+ || r.getTimestamp() > revisionPurgeMillis()) {
+ externalChanges.put(r, otherSeen);
+ }
+ // collect external changes
+ if (last != null && externalSort != null) {
+ // add changes for this particular clusterId to the externalSort
+ try {
+ fillExternalChanges(externalSort, last, r, store);
+ } catch (IOException e1) {
+ LOG.error("backgroundRead: Exception while reading external changes from journal: " + e1, e1);
+ IOUtils.closeQuietly(externalSort);
+ externalSort = null;
+ }
}
}
}
- }
- stats.readHead = clock.getTime() - time;
- time = clock.getTime();
+ stats.readHead = clock.getTime() - time;
+ time = clock.getTime();
- if (!externalChanges.isEmpty()) {
- // invalidate caches
- if (externalSort == null) {
- // if no externalSort available, then invalidate the classic way: everything
- stats.cacheStats = store.invalidateCache();
- docChildrenCache.invalidateAll();
- } else {
- try {
- externalSort.sort();
- stats.cacheStats = store.invalidateCache(pathToId(externalSort));
- // OAK-3002: only invalidate affected items (using journal)
- long origSize = docChildrenCache.size();
- if (origSize == 0) {
- // if docChildrenCache is empty, don't bother
- // calling invalidateAll either way
- // (esp calling invalidateAll(Iterable) will
- // potentially iterate over all keys even though
- // there's nothing to be deleted)
- LOG.trace("backgroundRead: docChildrenCache nothing to invalidate");
- } else {
- // however, if the docChildrenCache is not empty,
- // use the invalidateAll(Iterable) variant,
- // passing it a Iterable<StringValue>, as that's
- // what is contained in the cache
- docChildrenCache.invalidateAll(asStringValueIterable(externalSort));
- long newSize = docChildrenCache.size();
- LOG.trace("backgroundRead: docChildrenCache invalidation result: orig: {}, new: {} ", origSize, newSize);
- }
- } catch (Exception ioe) {
- LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+ if (!externalChanges.isEmpty()) {
+ // invalidate caches
+ if (externalSort == null) {
+ // if no externalSort available, then invalidate the classic way: everything
stats.cacheStats = store.invalidateCache();
docChildrenCache.invalidateAll();
+ } else {
+ try {
+ externalSort.sort();
+ stats.cacheStats = store.invalidateCache(pathToId(externalSort));
+ // OAK-3002: only invalidate affected items (using journal)
+ long origSize = docChildrenCache.size();
+ if (origSize == 0) {
+ // if docChildrenCache is empty, don't bother
+ // calling invalidateAll either way
+ // (esp calling invalidateAll(Iterable) will
+ // potentially iterate over all keys even though
+ // there's nothing to be deleted)
+ LOG.trace("backgroundRead: docChildrenCache nothing to invalidate");
+ } else {
+ // however, if the docChildrenCache is not empty,
+ // use the invalidateAll(Iterable) variant,
+ // passing it a Iterable<StringValue>, as that's
+ // what is contained in the cache
+ docChildrenCache.invalidateAll(asStringValueIterable(externalSort));
+ long newSize = docChildrenCache.size();
+ LOG.trace("backgroundRead: docChildrenCache invalidation result: orig: {}, new: {} ", origSize, newSize);
+ }
+ } catch (Exception ioe) {
+ LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+ stats.cacheStats = store.invalidateCache();
+ docChildrenCache.invalidateAll();
+ }
}
- }
- stats.cacheInvalidationTime = clock.getTime() - time;
- time = clock.getTime();
+ stats.cacheInvalidationTime = clock.getTime() - time;
+ time = clock.getTime();
- // make sure update to revision comparator is atomic
- // and no local commit is in progress
- backgroundOperationLock.writeLock().lock();
- try {
- stats.lock = clock.getTime() - time;
+ // make sure update to revision comparator is atomic
+ // and no local commit is in progress
+ backgroundOperationLock.writeLock().lock();
+ try {
+ stats.lock = clock.getTime() - time;
- // the latest revisions of the current cluster node
- // happened before the latest revisions of other cluster nodes
- revisionComparator.add(newRevision(), headSeen);
- // then we saw other revisions
- for (Map.Entry<Revision, Revision> e : externalChanges.entrySet()) {
- revisionComparator.add(e.getKey(), e.getValue());
- }
+ // the latest revisions of the current cluster node
+ // happened before the latest revisions of other cluster nodes
+ revisionComparator.add(newRevision(), headSeen);
+ // then we saw other revisions
+ for (Map.Entry<Revision, Revision> e : externalChanges.entrySet()) {
+ revisionComparator.add(e.getKey(), e.getValue());
+ }
- Revision oldHead = headRevision;
- // the new head revision is after other revisions
- setHeadRevision(newRevision());
- if (dispatchChange) {
- time = clock.getTime();
- if (externalSort != null) {
- // then there were external changes and reading them
- // was successful -> apply them to the diff cache
- try {
- JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
- } catch (Exception e1) {
- LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+ Revision oldHead = headRevision;
+ // the new head revision is after other revisions
+ setHeadRevision(newRevision());
+ if (dispatchChange) {
+ time = clock.getTime();
+ if (externalSort != null) {
+ // then there were external changes and reading them
+ // was successful -> apply them to the diff cache
+ try {
+ JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
+ } catch (Exception e1) {
+ LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+ }
}
- }
- stats.populateDiffCache = clock.getTime() - time;
- time = clock.getTime();
+ stats.populateDiffCache = clock.getTime() - time;
+ time = clock.getTime();
- dispatcher.contentChanged(getRoot().fromExternalChange(), null);
+ dispatcher.contentChanged(getRoot().fromExternalChange(), null);
+ }
+ } finally {
+ backgroundOperationLock.writeLock().unlock();
}
- } finally {
- backgroundOperationLock.writeLock().unlock();
+ stats.dispatchChanges = clock.getTime() - time;
+ time = clock.getTime();
}
- stats.dispatchChanges = clock.getTime() - time;
- time = clock.getTime();
+ } finally {
+ IOUtils.closeQuietly(externalSort);
}
revisionComparator.purge(revisionPurgeMillis());
stats.purge = clock.getTime() - time;