You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2015/08/25 11:09:15 UTC

svn commit: r1697616 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java

Author: mreutegg
Date: Tue Aug 25 09:09:14 2015
New Revision: 1697616

URL: http://svn.apache.org/r1697616
Log:
OAK-3283: Background read does not close StringSort

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1697616&r1=1697615&r2=1697616&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java Tue Aug 25 09:09:14 2015
@@ -76,6 +76,7 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 
 import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
 import org.apache.jackrabbit.oak.commons.json.JsopReader;
 import org.apache.jackrabbit.oak.commons.json.JsopTokenizer;
@@ -1802,118 +1803,123 @@ public final class DocumentNodeStore
         Revision otherSeen = Revision.newRevision(0);
 
         StringSort externalSort = JournalEntry.newSorter();
-        
-        Map<Revision, Revision> externalChanges = Maps.newHashMap();
-        for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
-            int machineId = e.getKey();
-            if (machineId == clusterId) {
-                // ignore own lastRev
-                continue;
-            }
-            Revision r = e.getValue();
-            Revision last = lastKnownRevision.get(machineId);
-            if (last == null || r.compareRevisionTime(last) > 0) {
-                lastKnownRevision.put(machineId, r);
-                // OAK-2345
-                // only consider as external change if
-                // - the revision changed for the machineId
-                // or
-                // - the revision is within the time frame we remember revisions
-                if (last != null
-                        || r.getTimestamp() > revisionPurgeMillis()) {
-                    externalChanges.put(r, otherSeen);
-                }
-                // collect external changes
-                if (last != null && externalSort != null) {
-                    // add changes for this particular clusterId to the externalSort
-                    try {
-                        fillExternalChanges(externalSort, last, r, store);
-                    } catch (IOException e1) {
-                        LOG.error("backgroundRead: Exception while reading external changes from journal: "+e1, e1);
-                        externalSort = null;
+
+        try {
+            Map<Revision, Revision> externalChanges = Maps.newHashMap();
+            for (Map.Entry<Integer, Revision> e : lastRevMap.entrySet()) {
+                int machineId = e.getKey();
+                if (machineId == clusterId) {
+                    // ignore own lastRev
+                    continue;
+                }
+                Revision r = e.getValue();
+                Revision last = lastKnownRevision.get(machineId);
+                if (last == null || r.compareRevisionTime(last) > 0) {
+                    lastKnownRevision.put(machineId, r);
+                    // OAK-2345
+                    // only consider as external change if
+                    // - the revision changed for the machineId
+                    // or
+                    // - the revision is within the time frame we remember revisions
+                    if (last != null
+                            || r.getTimestamp() > revisionPurgeMillis()) {
+                        externalChanges.put(r, otherSeen);
+                    }
+                    // collect external changes
+                    if (last != null && externalSort != null) {
+                        // add changes for this particular clusterId to the externalSort
+                        try {
+                            fillExternalChanges(externalSort, last, r, store);
+                        } catch (IOException e1) {
+                            LOG.error("backgroundRead: Exception while reading external changes from journal: " + e1, e1);
+                            IOUtils.closeQuietly(externalSort);
+                            externalSort = null;
+                        }
                     }
                 }
             }
-        }
 
-        stats.readHead = clock.getTime() - time;
-        time = clock.getTime();
+            stats.readHead = clock.getTime() - time;
+            time = clock.getTime();
 
-        if (!externalChanges.isEmpty()) {
-            // invalidate caches
-            if (externalSort == null) {
-                // if no externalSort available, then invalidate the classic way: everything
-                stats.cacheStats = store.invalidateCache();
-                docChildrenCache.invalidateAll();
-            } else {
-                try {
-                    externalSort.sort();
-                    stats.cacheStats = store.invalidateCache(pathToId(externalSort));
-                    // OAK-3002: only invalidate affected items (using journal)
-                    long origSize = docChildrenCache.size();
-                    if (origSize == 0) {
-                        // if docChildrenCache is empty, don't bother
-                        // calling invalidateAll either way 
-                        // (esp calling invalidateAll(Iterable) will
-                        // potentially iterate over all keys even though
-                        // there's nothing to be deleted)
-                        LOG.trace("backgroundRead: docChildrenCache nothing to invalidate");
-                    } else {
-                        // however, if the docChildrenCache is not empty,
-                        // use the invalidateAll(Iterable) variant,
-                        // passing it a Iterable<StringValue>, as that's
-                        // what is contained in the cache
-                        docChildrenCache.invalidateAll(asStringValueIterable(externalSort));
-                        long newSize = docChildrenCache.size();
-                        LOG.trace("backgroundRead: docChildrenCache invalidation result: orig: {}, new: {} ", origSize, newSize);
-                    }
-                } catch (Exception ioe) {
-                    LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+            if (!externalChanges.isEmpty()) {
+                // invalidate caches
+                if (externalSort == null) {
+                    // if no externalSort available, then invalidate the classic way: everything
                     stats.cacheStats = store.invalidateCache();
                     docChildrenCache.invalidateAll();
+                } else {
+                    try {
+                        externalSort.sort();
+                        stats.cacheStats = store.invalidateCache(pathToId(externalSort));
+                        // OAK-3002: only invalidate affected items (using journal)
+                        long origSize = docChildrenCache.size();
+                        if (origSize == 0) {
+                            // if docChildrenCache is empty, don't bother
+                            // calling invalidateAll either way
+                            // (esp calling invalidateAll(Iterable) will
+                            // potentially iterate over all keys even though
+                            // there's nothing to be deleted)
+                            LOG.trace("backgroundRead: docChildrenCache nothing to invalidate");
+                        } else {
+                            // however, if the docChildrenCache is not empty,
+                            // use the invalidateAll(Iterable) variant,
+                            // passing it a Iterable<StringValue>, as that's
+                            // what is contained in the cache
+                            docChildrenCache.invalidateAll(asStringValueIterable(externalSort));
+                            long newSize = docChildrenCache.size();
+                            LOG.trace("backgroundRead: docChildrenCache invalidation result: orig: {}, new: {} ", origSize, newSize);
+                        }
+                    } catch (Exception ioe) {
+                        LOG.error("backgroundRead: got IOException during external sorting/cache invalidation (as a result, invalidating entire cache): "+ioe, ioe);
+                        stats.cacheStats = store.invalidateCache();
+                        docChildrenCache.invalidateAll();
+                    }
                 }
-            }
-            stats.cacheInvalidationTime = clock.getTime() - time;
-            time = clock.getTime();
+                stats.cacheInvalidationTime = clock.getTime() - time;
+                time = clock.getTime();
 
-            // make sure update to revision comparator is atomic
-            // and no local commit is in progress
-            backgroundOperationLock.writeLock().lock();
-            try {
-                stats.lock = clock.getTime() - time;
+                // make sure update to revision comparator is atomic
+                // and no local commit is in progress
+                backgroundOperationLock.writeLock().lock();
+                try {
+                    stats.lock = clock.getTime() - time;
 
-                // the latest revisions of the current cluster node
-                // happened before the latest revisions of other cluster nodes
-                revisionComparator.add(newRevision(), headSeen);
-                // then we saw other revisions
-                for (Map.Entry<Revision, Revision> e : externalChanges.entrySet()) {
-                    revisionComparator.add(e.getKey(), e.getValue());
-                }
+                    // the latest revisions of the current cluster node
+                    // happened before the latest revisions of other cluster nodes
+                    revisionComparator.add(newRevision(), headSeen);
+                    // then we saw other revisions
+                    for (Map.Entry<Revision, Revision> e : externalChanges.entrySet()) {
+                        revisionComparator.add(e.getKey(), e.getValue());
+                    }
 
-                Revision oldHead = headRevision;
-                // the new head revision is after other revisions
-                setHeadRevision(newRevision());
-                if (dispatchChange) {
-                    time = clock.getTime();
-                    if (externalSort != null) {
-                        // then there were external changes and reading them
-                        // was successful -> apply them to the diff cache
-                        try {
-                            JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
-                        } catch (Exception e1) {
-                            LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+                    Revision oldHead = headRevision;
+                    // the new head revision is after other revisions
+                    setHeadRevision(newRevision());
+                    if (dispatchChange) {
+                        time = clock.getTime();
+                        if (externalSort != null) {
+                            // then there were external changes and reading them
+                            // was successful -> apply them to the diff cache
+                            try {
+                                JournalEntry.applyTo(externalSort, diffCache, oldHead, headRevision);
+                            } catch (Exception e1) {
+                                LOG.error("backgroundRead: Exception while processing external changes from journal: "+e1, e1);
+                            }
                         }
-                    }
-                    stats.populateDiffCache = clock.getTime() - time;
-                    time = clock.getTime();
+                        stats.populateDiffCache = clock.getTime() - time;
+                        time = clock.getTime();
 
-                    dispatcher.contentChanged(getRoot().fromExternalChange(), null);
+                        dispatcher.contentChanged(getRoot().fromExternalChange(), null);
+                    }
+                } finally {
+                    backgroundOperationLock.writeLock().unlock();
                 }
-            } finally {
-                backgroundOperationLock.writeLock().unlock();
+                stats.dispatchChanges = clock.getTime() - time;
+                time = clock.getTime();
             }
-            stats.dispatchChanges = clock.getTime() - time;
-            time = clock.getTime();
+        } finally {
+            IOUtils.closeQuietly(externalSort);
         }
         revisionComparator.purge(revisionPurgeMillis());
         stats.purge = clock.getTime() - time;