You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/07/17 14:48:47 UTC

svn commit: r1504104 - /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Author: jukka
Date: Wed Jul 17 12:48:47 2013
New Revision: 1504104

URL: http://svn.apache.org/r1504104
Log:
OAK-788: File backend for the SegmentMK

Keep only up to a thousand Segment instances ready at a time to avoid running out of memory.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1504104&r1=1504103&r2=1504104&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Jul 17 12:48:47 2013
@@ -20,6 +20,8 @@ import static com.google.common.base.Cha
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Lists.newArrayListWithCapacity;
+import static com.google.common.collect.Lists.newLinkedList;
+import static com.google.common.collect.Maps.newHashMap;
 import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
@@ -30,9 +32,11 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.jackrabbit.oak.plugins.segment.Journal;
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
@@ -45,7 +49,8 @@ import org.apache.jackrabbit.oak.spi.sta
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 
 public class FileStore implements SegmentStore {
 
@@ -70,16 +75,59 @@ public class FileStore implements Segmen
 
     private static final byte[] PADDING_BYTES = new byte[SEGMENT_SIZE];
 
-    private final Map<String, Journal> journals = Maps.newHashMap();
+    private class SegmentReference {
+
+        private final UUID id;
+
+        private final ByteBuffer buffer;
+
+        private final int position;
+
+        SegmentReference(UUID id, ByteBuffer buffer, int position) {
+            this.id = id;
+            this.buffer = checkNotNull(buffer);
+            this.position = position;
+        }
+
+        UUID getSegmentId() {
+            return id;
+        }
+
+        Segment getSegment() {
+            ByteBuffer ro = buffer.asReadOnlyBuffer();
+            ro.position(position);
+
+            int length = ro.getInt();
+            int count = ro.getInt();
+
+            checkState(id.equals(new UUID(ro.getLong(), ro.getLong())));
+
+            Collection<UUID> referencedIds = newArrayListWithCapacity(count);
+            for (int i = 0; i < count; i++) {
+                SegmentReference reference =
+                        references.get(new UUID(ro.getLong(), ro.getLong()));
+                checkState(reference != null);
+                referencedIds.add(reference.getSegmentId());
+            }
+
+            ro.limit(ro.position() + length);
+            return new Segment(FileStore.this, id, ro.slice(), referencedIds,
+                    Collections.<String, RecordId>emptyMap(),
+                    Collections.<Template, RecordId>emptyMap());
+        }
+
+    }
 
     private final File directory;
-    private int index;
 
-    private MappedByteBuffer rw;
-    private ByteBuffer ro;
+    private final LinkedList<MappedByteBuffer> files = newLinkedList();
 
-    private final ConcurrentMap<UUID, Segment> segments =
-            Maps.newConcurrentMap();
+    private final Map<String, Journal> journals = newHashMap();
+
+    private final Map<UUID, SegmentReference> references = newHashMap();
+
+    private final Cache<UUID, Segment> segments =
+            CacheBuilder.newBuilder().maximumSize(1000).build();
 
     public FileStore(File directory, NodeState root) throws IOException {
         // http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#64bit_detection
@@ -90,9 +138,24 @@ public class FileStore implements Segmen
 
         checkNotNull(directory).mkdirs();
         this.directory = directory;
-        this.index = 0;
-        while (loadSegments()) {
-            this.index++;
+
+        for (int i = 0; true; i++) {
+            String name = String.format(FILE_NAME_FORMAT, i);
+            File file = new File(directory, name);
+            if (file.isFile()) {
+                RandomAccessFile f = new RandomAccessFile(file, "rw");
+                try {
+                    files.add(f.getChannel().map(READ_WRITE, 0, FILE_SIZE));
+                } finally {
+                    f.close();
+                }
+            } else {
+                break;
+            }
+        }
+
+        for (MappedByteBuffer file : files) {
+            loadSegments(file);
         }
 
         if (!journals.containsKey("root")) {
@@ -110,87 +173,56 @@ public class FileStore implements Segmen
         this(new File(directory));
     }
 
-    public void close() {
-        rw.force();
-
-        segments.clear();
-        rw = null;
-        ro = null;
-
+    public synchronized void close() {
+        if (!files.isEmpty()) {
+            files.getLast().force();
+        }
+        files.clear();
+        references.clear();
+        segments.invalidateAll();
+        segments.cleanUp();
         System.gc();
     }
 
-    private boolean loadSegments() throws IOException {
-        String name = String.format(FILE_NAME_FORMAT, index);
-        File file = new File(directory, name);
-        long size = FILE_SIZE;
-        if (file.isFile()) {
-            size = file.length();
-        }
-        RandomAccessFile f = new RandomAccessFile(file, "rw");
-        try {
-            rw = f.getChannel().map(READ_WRITE, 0, size);
-            ro = rw.asReadOnlyBuffer();
-
-            while (ro.remaining() >= 4 * SEGMENT_SIZE) {
-                // skip tar header and get the magic bytes; TODO: verify?
-                long magic = ro.getLong(ro.position() + SEGMENT_SIZE);
-                if (magic == SEGMENT_MAGIC) {
-                    ro.position(ro.position() + SEGMENT_SIZE + 8);
-
-                    int length = ro.getInt();
-                    int count = ro.getInt();
-
-                    UUID segmentId = new UUID(ro.getLong(), ro.getLong());
-                    Collection<UUID> referencedSegmentIds =
-                            newArrayListWithCapacity(count);
-                    for (int i = 0; i < count; i++) {
-                        referencedSegmentIds.add(
-                                new UUID(ro.getLong(), ro.getLong()));
-                    }
-
-                    ro.limit(ro.position() + length);
-                    ByteBuffer data = ro.slice();
-                    ro.limit(rw.limit());
-
-                    Segment segment = new Segment(
-                            this, segmentId, data, referencedSegmentIds,
-                            Collections.<String, RecordId>emptyMap(),
-                            Collections.<Template, RecordId>emptyMap());
-                    segments.put(segmentId, segment);
-
-                    // advance to next entry in the file
-                    ro.position((ro.position() + length + 0x1ff) & ~0x1ff);
-                } else if (magic == JOURNAL_MAGIC) {
-                    ro.position(ro.position() + SEGMENT_SIZE + 8);
-
-                    int count = ro.getInt();
-                    for (int i = 0; i < count; i++) {
-                        byte[] n = new byte[ro.getInt()];
-                        ro.get(n);
-                        SegmentNodeState h = new SegmentNodeState(this, new RecordId(
-                                new UUID(ro.getLong(), ro.getLong()),
-                                ro.getInt()));
-                        journals.put(
-                                new String(n, UTF_8),
-                                new FileJournal(this, h));
-                    }
-
-                    // advance to next entry in the file
-                    ro.position((ro.position() + 0x1ff) & ~0x1ff);
-                } else if (magic == PADDING_MAGIC) {
-                    return true;
-                } else {
-                    // still space for more segments: position the write
-                    // buffer at this point and return false to stop looking
-                    // for more entries
-                    rw.position(ro.position());
-                    return false;
+    private void loadSegments(MappedByteBuffer ro) throws IOException {
+        while (ro.remaining() >= 4 * SEGMENT_SIZE) {
+            // skip tar header and get the magic bytes; TODO: verify?
+            long magic = ro.getLong(ro.position() + SEGMENT_SIZE);
+            if (magic == SEGMENT_MAGIC) {
+                int position = ro.position() + SEGMENT_SIZE + 8;
+                int length = ro.getInt(position);
+                int count = ro.getInt(position + 4);
+                UUID id = new UUID(
+                        ro.getLong(position + 8),
+                        ro.getLong(position + 16));
+                references.put(id, new SegmentReference(id, ro, position));
+
+                // advance to next entry in the file
+                position += 24 + count * 16 + length;
+                ro.position((position + 0x1ff) & ~0x1ff);
+            } else if (magic == JOURNAL_MAGIC) {
+                ro.position(ro.position() + SEGMENT_SIZE + 8);
+
+                int count = ro.getInt();
+                for (int i = 0; i < count; i++) {
+                    byte[] n = new byte[ro.getInt()];
+                    ro.get(n);
+                    SegmentReference reference = references.get(
+                            new UUID(ro.getLong(), ro.getLong()));
+                    checkState(reference != null);
+                    SegmentNodeState h = new SegmentNodeState(this, new RecordId(
+                            reference.getSegmentId(), ro.getInt()));
+                    journals.put(
+                            new String(n, UTF_8),
+                            new FileJournal(this, h));
                 }
+
+                // advance to next entry in the file
+                ro.position((ro.position() + 0x1ff) & ~0x1ff);
+            } else {
+                // last entry encountered
+                return;
             }
-            return true;
-        } finally {
-            f.close();
         }
     }
 
@@ -205,12 +237,18 @@ public class FileStore implements Segmen
     }
 
     @Override
-    public Segment readSegment(UUID id) {
-        Segment segment = segments.get(id);
-        if (segment != null) {
-            return segment;
-        } else {
-            throw new IllegalArgumentException("Segment not found: " + id);
+    public synchronized Segment readSegment(final UUID id) {
+        try {
+            return segments.get(id, new Callable<Segment>() {
+                @Override
+                public Segment call() throws Exception {
+                    SegmentReference reference = references.get(id);
+                    checkState(reference != null);
+                    return reference.getSegment();
+                }
+            });
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Failed to load segment " + id, e);
         }
     }
 
@@ -221,11 +259,12 @@ public class FileStore implements Segmen
             Map<String, RecordId> strings, Map<Template, RecordId> templates) {
         int size = 8 + 4 + 4 + 16 + 16 * referencedSegmentIds.size() + length;
 
-        prepare(size);
+        MappedByteBuffer rw = prepare(size);
 
         rw.put(createTarHeader(segmentId.toString(), size));
-
         rw.putLong(SEGMENT_MAGIC);
+        int position = rw.position();
+
         rw.putInt(length);
         rw.putInt(referencedSegmentIds.size());
         rw.putLong(segmentId.getMostSignificantBits());
@@ -235,28 +274,31 @@ public class FileStore implements Segmen
             rw.putLong(referencedSegmentId.getLeastSignificantBits());
         }
 
-        ro.position(rw.position());
+        ByteBuffer ro = rw.asReadOnlyBuffer();
+        ro.limit(ro.position() + length);
+        ro = ro.slice();
+
         rw.put(data, offset, length);
-        ro.limit(rw.position());
-        ByteBuffer buffer = ro.slice();
-        ro.limit(rw.limit());
 
         int n = rw.position() % SEGMENT_SIZE;
         if (n > 0) {
             rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - n);
         }
 
-        Segment segment = new Segment(
-                this, segmentId, buffer,
-                referencedSegmentIds, strings, templates);
-        checkState(segments.put(segmentId, segment) == null);
+        SegmentReference previous = references.put(
+                segmentId, new SegmentReference(segmentId, rw, position));
+        checkState(previous == null);
+
+        segments.put(segmentId, new Segment(
+                this, segmentId, ro, referencedSegmentIds, strings, templates));
     }
 
     @Override
-    public void deleteSegment(UUID segmentId) {
-        if (segments.remove(segmentId) == null) {
+    public synchronized void deleteSegment(UUID segmentId) {
+        if (references.remove(segmentId) == null) {
             throw new IllegalStateException("Missing segment: " + segmentId);
         }
+        segments.invalidate(segmentId);
     }
 
     synchronized void writeJournals() {
@@ -265,7 +307,7 @@ public class FileStore implements Segmen
             size += 4 + name.getBytes(UTF_8).length + 16 + 4;
         }
 
-        prepare(size);
+        MappedByteBuffer rw = prepare(size);
 
         rw.put(createTarHeader(JOURNALS_UUID, size));
 
@@ -287,37 +329,48 @@ public class FileStore implements Segmen
         }
     }
 
-    private void prepare(int size) {
-        int segments = (size + SEGMENT_SIZE - 1) / SEGMENT_SIZE;
-        if ((1 + segments + 2) * SEGMENT_SIZE > rw.remaining()) {
-            if (rw.remaining() >= 3 * SEGMENT_SIZE) {
+    private synchronized MappedByteBuffer prepare(int size) {
+        if (!files.isEmpty()) {
+            MappedByteBuffer last = files.getLast();
+
+            // Check if there's still enough room in the last open file
+            int segments = (size + SEGMENT_SIZE - 1) / SEGMENT_SIZE;
+            if ((1 + segments + 2) * SEGMENT_SIZE <= last.remaining()) {
+                return last;
+            }
+
+            // No more room, so finish the last file with padding as needed
+            if (last.remaining() >= 3 * SEGMENT_SIZE) {
                 // Add a padding entry to avoid problems during reopening
-                rw.put(createTarHeader(
+                last.put(createTarHeader(
                         PADDING_UUID,
-                        rw.remaining() - 3 * SEGMENT_SIZE));
-                if (rw.remaining() > 2 * SEGMENT_SIZE) {
-                    rw.putLong(PADDING_MAGIC);
-                    rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - 8);
+                        last.remaining() - 3 * SEGMENT_SIZE));
+                if (last.remaining() > 2 * SEGMENT_SIZE) {
+                    last.putLong(PADDING_MAGIC);
+                    last.put(PADDING_BYTES, 0, SEGMENT_SIZE - 8);
                 }
             }
-            while (rw.remaining() > 0) {
-                rw.put(PADDING_BYTES);
+            while (last.remaining() > 0) {
+                last.put(PADDING_BYTES);
             }
-            rw.force();
+            last.force();
+        }
 
-            String name = String.format(FILE_NAME_FORMAT, ++index);
+        // Need to start a new file
+        try {
+            String name = String.format(FILE_NAME_FORMAT, files.size());
             File file = new File(directory, name);
+            RandomAccessFile f = new RandomAccessFile(file, "rw");
             try {
-                RandomAccessFile f = new RandomAccessFile(file, "rw");
-                try {
-                    rw = f.getChannel().map(READ_WRITE, 0, FILE_SIZE);
-                    ro = rw.asReadOnlyBuffer();
-                } finally {
-                    f.close();
-                }
-            } catch (IOException e) {
-                throw new RuntimeException("Unable to create a new segment", e);
+                MappedByteBuffer buffer =
+                        f.getChannel().map(READ_WRITE, 0, FILE_SIZE);
+                files.add(buffer);
+                return buffer;
+            } finally {
+                f.close();
             }
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to create a new segment", e);
         }
     }