You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/07/17 14:48:47 UTC
svn commit: r1504104 -
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Author: jukka
Date: Wed Jul 17 12:48:47 2013
New Revision: 1504104
URL: http://svn.apache.org/r1504104
Log:
OAK-788: File backend for the SegmentMK
Keep only up to a thousand Segment instances ready at a time to avoid running out of memory.
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1504104&r1=1504103&r2=1504104&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Jul 17 12:48:47 2013
@@ -20,6 +20,8 @@ import static com.google.common.base.Cha
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayListWithCapacity;
+import static com.google.common.collect.Lists.newLinkedList;
+import static com.google.common.collect.Maps.newHashMap;
import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
@@ -30,9 +32,11 @@ import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import org.apache.jackrabbit.oak.plugins.segment.Journal;
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
@@ -45,7 +49,8 @@ import org.apache.jackrabbit.oak.spi.sta
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
public class FileStore implements SegmentStore {
@@ -70,16 +75,59 @@ public class FileStore implements Segmen
private static final byte[] PADDING_BYTES = new byte[SEGMENT_SIZE];
- private final Map<String, Journal> journals = Maps.newHashMap();
+ private class SegmentReference {
+
+ private final UUID id;
+
+ private final ByteBuffer buffer;
+
+ private final int position;
+
+ SegmentReference(UUID id, ByteBuffer buffer, int position) {
+ this.id = id;
+ this.buffer = checkNotNull(buffer);
+ this.position = position;
+ }
+
+ UUID getSegmentId() {
+ return id;
+ }
+
+ Segment getSegment() {
+ ByteBuffer ro = buffer.asReadOnlyBuffer();
+ ro.position(position);
+
+ int length = ro.getInt();
+ int count = ro.getInt();
+
+ checkState(id.equals(new UUID(ro.getLong(), ro.getLong())));
+
+ Collection<UUID> referencedIds = newArrayListWithCapacity(count);
+ for (int i = 0; i < count; i++) {
+ SegmentReference reference =
+ references.get(new UUID(ro.getLong(), ro.getLong()));
+ checkState(reference != null);
+ referencedIds.add(reference.getSegmentId());
+ }
+
+ ro.limit(ro.position() + length);
+ return new Segment(FileStore.this, id, ro.slice(), referencedIds,
+ Collections.<String, RecordId>emptyMap(),
+ Collections.<Template, RecordId>emptyMap());
+ }
+
+ }
private final File directory;
- private int index;
- private MappedByteBuffer rw;
- private ByteBuffer ro;
+ private final LinkedList<MappedByteBuffer> files = newLinkedList();
- private final ConcurrentMap<UUID, Segment> segments =
- Maps.newConcurrentMap();
+ private final Map<String, Journal> journals = newHashMap();
+
+ private final Map<UUID, SegmentReference> references = newHashMap();
+
+ private final Cache<UUID, Segment> segments =
+ CacheBuilder.newBuilder().maximumSize(1000).build();
public FileStore(File directory, NodeState root) throws IOException {
// http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#64bit_detection
@@ -90,9 +138,24 @@ public class FileStore implements Segmen
checkNotNull(directory).mkdirs();
this.directory = directory;
- this.index = 0;
- while (loadSegments()) {
- this.index++;
+
+ for (int i = 0; true; i++) {
+ String name = String.format(FILE_NAME_FORMAT, i);
+ File file = new File(directory, name);
+ if (file.isFile()) {
+ RandomAccessFile f = new RandomAccessFile(file, "rw");
+ try {
+ files.add(f.getChannel().map(READ_WRITE, 0, FILE_SIZE));
+ } finally {
+ f.close();
+ }
+ } else {
+ break;
+ }
+ }
+
+ for (MappedByteBuffer file : files) {
+ loadSegments(file);
}
if (!journals.containsKey("root")) {
@@ -110,87 +173,56 @@ public class FileStore implements Segmen
this(new File(directory));
}
- public void close() {
- rw.force();
-
- segments.clear();
- rw = null;
- ro = null;
-
+ public synchronized void close() {
+ if (!files.isEmpty()) {
+ files.getLast().force();
+ }
+ files.clear();
+ references.clear();
+ segments.invalidateAll();
+ segments.cleanUp();
System.gc();
}
- private boolean loadSegments() throws IOException {
- String name = String.format(FILE_NAME_FORMAT, index);
- File file = new File(directory, name);
- long size = FILE_SIZE;
- if (file.isFile()) {
- size = file.length();
- }
- RandomAccessFile f = new RandomAccessFile(file, "rw");
- try {
- rw = f.getChannel().map(READ_WRITE, 0, size);
- ro = rw.asReadOnlyBuffer();
-
- while (ro.remaining() >= 4 * SEGMENT_SIZE) {
- // skip tar header and get the magic bytes; TODO: verify?
- long magic = ro.getLong(ro.position() + SEGMENT_SIZE);
- if (magic == SEGMENT_MAGIC) {
- ro.position(ro.position() + SEGMENT_SIZE + 8);
-
- int length = ro.getInt();
- int count = ro.getInt();
-
- UUID segmentId = new UUID(ro.getLong(), ro.getLong());
- Collection<UUID> referencedSegmentIds =
- newArrayListWithCapacity(count);
- for (int i = 0; i < count; i++) {
- referencedSegmentIds.add(
- new UUID(ro.getLong(), ro.getLong()));
- }
-
- ro.limit(ro.position() + length);
- ByteBuffer data = ro.slice();
- ro.limit(rw.limit());
-
- Segment segment = new Segment(
- this, segmentId, data, referencedSegmentIds,
- Collections.<String, RecordId>emptyMap(),
- Collections.<Template, RecordId>emptyMap());
- segments.put(segmentId, segment);
-
- // advance to next entry in the file
- ro.position((ro.position() + length + 0x1ff) & ~0x1ff);
- } else if (magic == JOURNAL_MAGIC) {
- ro.position(ro.position() + SEGMENT_SIZE + 8);
-
- int count = ro.getInt();
- for (int i = 0; i < count; i++) {
- byte[] n = new byte[ro.getInt()];
- ro.get(n);
- SegmentNodeState h = new SegmentNodeState(this, new RecordId(
- new UUID(ro.getLong(), ro.getLong()),
- ro.getInt()));
- journals.put(
- new String(n, UTF_8),
- new FileJournal(this, h));
- }
-
- // advance to next entry in the file
- ro.position((ro.position() + 0x1ff) & ~0x1ff);
- } else if (magic == PADDING_MAGIC) {
- return true;
- } else {
- // still space for more segments: position the write
- // buffer at this point and return false to stop looking
- // for more entries
- rw.position(ro.position());
- return false;
+ private void loadSegments(MappedByteBuffer ro) throws IOException {
+ while (ro.remaining() >= 4 * SEGMENT_SIZE) {
+ // skip tar header and get the magic bytes; TODO: verify?
+ long magic = ro.getLong(ro.position() + SEGMENT_SIZE);
+ if (magic == SEGMENT_MAGIC) {
+ int position = ro.position() + SEGMENT_SIZE + 8;
+ int length = ro.getInt(position);
+ int count = ro.getInt(position + 4);
+ UUID id = new UUID(
+ ro.getLong(position + 8),
+ ro.getLong(position + 16));
+ references.put(id, new SegmentReference(id, ro, position));
+
+ // advance to next entry in the file
+ position += 24 + count * 16 + length;
+ ro.position((position + 0x1ff) & ~0x1ff);
+ } else if (magic == JOURNAL_MAGIC) {
+ ro.position(ro.position() + SEGMENT_SIZE + 8);
+
+ int count = ro.getInt();
+ for (int i = 0; i < count; i++) {
+ byte[] n = new byte[ro.getInt()];
+ ro.get(n);
+ SegmentReference reference = references.get(
+ new UUID(ro.getLong(), ro.getLong()));
+ checkState(reference != null);
+ SegmentNodeState h = new SegmentNodeState(this, new RecordId(
+ reference.getSegmentId(), ro.getInt()));
+ journals.put(
+ new String(n, UTF_8),
+ new FileJournal(this, h));
}
+
+ // advance to next entry in the file
+ ro.position((ro.position() + 0x1ff) & ~0x1ff);
+ } else {
+ // last entry encountered
+ return;
}
- return true;
- } finally {
- f.close();
}
}
@@ -205,12 +237,18 @@ public class FileStore implements Segmen
}
@Override
- public Segment readSegment(UUID id) {
- Segment segment = segments.get(id);
- if (segment != null) {
- return segment;
- } else {
- throw new IllegalArgumentException("Segment not found: " + id);
+ public synchronized Segment readSegment(final UUID id) {
+ try {
+ return segments.get(id, new Callable<Segment>() {
+ @Override
+ public Segment call() throws Exception {
+ SegmentReference reference = references.get(id);
+ checkState(reference != null);
+ return reference.getSegment();
+ }
+ });
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Failed to load segment " + id, e);
}
}
@@ -221,11 +259,12 @@ public class FileStore implements Segmen
Map<String, RecordId> strings, Map<Template, RecordId> templates) {
int size = 8 + 4 + 4 + 16 + 16 * referencedSegmentIds.size() + length;
- prepare(size);
+ MappedByteBuffer rw = prepare(size);
rw.put(createTarHeader(segmentId.toString(), size));
-
rw.putLong(SEGMENT_MAGIC);
+ int position = rw.position();
+
rw.putInt(length);
rw.putInt(referencedSegmentIds.size());
rw.putLong(segmentId.getMostSignificantBits());
@@ -235,28 +274,31 @@ public class FileStore implements Segmen
rw.putLong(referencedSegmentId.getLeastSignificantBits());
}
- ro.position(rw.position());
+ ByteBuffer ro = rw.asReadOnlyBuffer();
+ ro.limit(ro.position() + length);
+ ro = ro.slice();
+
rw.put(data, offset, length);
- ro.limit(rw.position());
- ByteBuffer buffer = ro.slice();
- ro.limit(rw.limit());
int n = rw.position() % SEGMENT_SIZE;
if (n > 0) {
rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - n);
}
- Segment segment = new Segment(
- this, segmentId, buffer,
- referencedSegmentIds, strings, templates);
- checkState(segments.put(segmentId, segment) == null);
+ SegmentReference previous = references.put(
+ segmentId, new SegmentReference(segmentId, rw, position));
+ checkState(previous == null);
+
+ segments.put(segmentId, new Segment(
+ this, segmentId, ro, referencedSegmentIds, strings, templates));
}
@Override
- public void deleteSegment(UUID segmentId) {
- if (segments.remove(segmentId) == null) {
+ public synchronized void deleteSegment(UUID segmentId) {
+ if (references.remove(segmentId) == null) {
throw new IllegalStateException("Missing segment: " + segmentId);
}
+ segments.invalidate(segmentId);
}
synchronized void writeJournals() {
@@ -265,7 +307,7 @@ public class FileStore implements Segmen
size += 4 + name.getBytes(UTF_8).length + 16 + 4;
}
- prepare(size);
+ MappedByteBuffer rw = prepare(size);
rw.put(createTarHeader(JOURNALS_UUID, size));
@@ -287,37 +329,48 @@ public class FileStore implements Segmen
}
}
- private void prepare(int size) {
- int segments = (size + SEGMENT_SIZE - 1) / SEGMENT_SIZE;
- if ((1 + segments + 2) * SEGMENT_SIZE > rw.remaining()) {
- if (rw.remaining() >= 3 * SEGMENT_SIZE) {
+ private synchronized MappedByteBuffer prepare(int size) {
+ if (!files.isEmpty()) {
+ MappedByteBuffer last = files.getLast();
+
+ // Check if there's still enough room in the last open file
+ int segments = (size + SEGMENT_SIZE - 1) / SEGMENT_SIZE;
+ if ((1 + segments + 2) * SEGMENT_SIZE <= last.remaining()) {
+ return last;
+ }
+
+ // No more room, so finish the last file with padding as needed
+ if (last.remaining() >= 3 * SEGMENT_SIZE) {
// Add a padding entry to avoid problems during reopening
- rw.put(createTarHeader(
+ last.put(createTarHeader(
PADDING_UUID,
- rw.remaining() - 3 * SEGMENT_SIZE));
- if (rw.remaining() > 2 * SEGMENT_SIZE) {
- rw.putLong(PADDING_MAGIC);
- rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - 8);
+ last.remaining() - 3 * SEGMENT_SIZE));
+ if (last.remaining() > 2 * SEGMENT_SIZE) {
+ last.putLong(PADDING_MAGIC);
+ last.put(PADDING_BYTES, 0, SEGMENT_SIZE - 8);
}
}
- while (rw.remaining() > 0) {
- rw.put(PADDING_BYTES);
+ while (last.remaining() > 0) {
+ last.put(PADDING_BYTES);
}
- rw.force();
+ last.force();
+ }
- String name = String.format(FILE_NAME_FORMAT, ++index);
+ // Need to start a new file
+ try {
+ String name = String.format(FILE_NAME_FORMAT, files.size());
File file = new File(directory, name);
+ RandomAccessFile f = new RandomAccessFile(file, "rw");
try {
- RandomAccessFile f = new RandomAccessFile(file, "rw");
- try {
- rw = f.getChannel().map(READ_WRITE, 0, FILE_SIZE);
- ro = rw.asReadOnlyBuffer();
- } finally {
- f.close();
- }
- } catch (IOException e) {
- throw new RuntimeException("Unable to create a new segment", e);
+ MappedByteBuffer buffer =
+ f.getChannel().map(READ_WRITE, 0, FILE_SIZE);
+ files.add(buffer);
+ return buffer;
+ } finally {
+ f.close();
}
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to create a new segment", e);
}
}