You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/09/18 20:18:26 UTC
svn commit: r1524513 - in
/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file:
FileJournal.java FileStore.java TarFile.java
Author: jukka
Date: Wed Sep 18 18:18:25 2013
New Revision: 1524513
URL: http://svn.apache.org/r1524513
Log:
OAK-1001: SegmentMK: 32bit support for the file backend
Enabled the new 32bit code
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java?rev=1524513&r1=1524512&r2=1524513&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java Wed Sep 18 18:18:25 2013
@@ -16,6 +16,8 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.file;
+import java.io.IOException;
+
import org.apache.jackrabbit.oak.plugins.segment.RecordId;
import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryJournal;
import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -37,8 +39,12 @@ class FileJournal extends MemoryJournal
@Override
public synchronized boolean setHead(RecordId base, RecordId head) {
if (super.setHead(base, head)) {
- store.writeJournals();
- return true;
+ try {
+ store.writeJournals();
+ return true;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
} else {
return false;
}
@@ -47,6 +53,10 @@ class FileJournal extends MemoryJournal
@Override
public synchronized void merge() {
super.merge();
- store.writeJournals();
+ try {
+ store.writeJournals();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1524513&r1=1524512&r2=1524513&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Sep 18 18:18:25 2013
@@ -21,16 +21,12 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayListWithCapacity;
import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newConcurrentMap;
import static com.google.common.collect.Maps.newHashMap;
-import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
@@ -47,96 +43,32 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.segment.Template;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
public class FileStore implements SegmentStore {
- /** Logger instance */
- private static final Logger log = LoggerFactory.getLogger(FileStore.class);
-
private static final long SEGMENT_MAGIC = 0x4f616b0a527845ddL;
private static final long JOURNAL_MAGIC = 0xdf36544212c0cb24L;
- private static final long PADDING_MAGIC = 0x786da7779516c12L;
-
- private static final String JOURNALS_UUID = new UUID(0, 0).toString();
-
- private static final String PADDING_UUID = new UUID(-1, -1).toString();
+ static final UUID JOURNALS_UUID = new UUID(0, 0);
- private static final long FILE_SIZE = 256 * 1024 * 1024;
+ private static final int FILE_SIZE = 256 * 1024 * 1024;
private static final String FILE_NAME_FORMAT = "data%05d.tar";
- private static final int SEGMENT_SIZE = 0x200; // 512
-
- private static final byte[] PADDING_BYTES = new byte[SEGMENT_SIZE];
-
- private class SegmentReference {
-
- private final UUID id;
-
- private final ByteBuffer buffer;
-
- private final int position;
-
- SegmentReference(UUID id, ByteBuffer buffer, int position) {
- this.id = id;
- this.buffer = checkNotNull(buffer);
- this.position = position;
- }
-
- UUID getSegmentId() {
- return id;
- }
-
- Segment getSegment() {
- ByteBuffer ro = buffer.asReadOnlyBuffer();
- ro.position(position);
-
- int length = ro.getInt();
- int count = ro.getInt();
-
- checkState(id.equals(new UUID(ro.getLong(), ro.getLong())));
-
- Collection<UUID> referencedIds = newArrayListWithCapacity(count);
- for (int i = 0; i < count; i++) {
- SegmentReference reference =
- references.get(new UUID(ro.getLong(), ro.getLong()));
- checkState(reference != null);
- referencedIds.add(reference.getSegmentId());
- }
-
- ro.limit(ro.position() + length);
- return new Segment(FileStore.this, id, ro.slice(), referencedIds,
- Collections.<String, RecordId>emptyMap(),
- Collections.<Template, RecordId>emptyMap());
- }
-
- }
-
private final File directory;
- private final LinkedList<MappedByteBuffer> files = newLinkedList();
+ private final LinkedList<TarFile> files = newLinkedList();
private final Map<String, Journal> journals = newHashMap();
- private final Map<UUID, SegmentReference> references = newConcurrentMap();
-
private final Cache<UUID, Segment> segments =
CacheBuilder.newBuilder().maximumSize(1000).build();
public FileStore(File directory, NodeState root) throws IOException {
- // http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#64bit_detection
- if ("32".equals(System.getProperty("sun.arch.data.model"))) {
- log.warn("TarMK will only work with small repositories"
- + " in a 32 bit JVM. Consider switching to a 64 bit JVM.");
- }
-
checkNotNull(directory).mkdirs();
this.directory = directory;
@@ -144,19 +76,28 @@ public class FileStore implements Segmen
String name = String.format(FILE_NAME_FORMAT, i);
File file = new File(directory, name);
if (file.isFile()) {
- RandomAccessFile f = new RandomAccessFile(file, "rw");
- try {
- files.add(f.getChannel().map(READ_WRITE, 0, FILE_SIZE));
- } finally {
- f.close();
- }
+ files.add(new TarFile(file, FILE_SIZE));
} else {
break;
}
}
- for (MappedByteBuffer file : files) {
- loadSegments(file);
+ for (TarFile tar : files) {
+ ByteBuffer buffer = tar.readEntry(JOURNALS_UUID);
+ if (buffer != null) {
+ checkState(JOURNAL_MAGIC == buffer.getLong());
+ int count = buffer.getInt();
+ for (int i = 0; i < count; i++) {
+ byte[] b = new byte[buffer.getInt()];
+ buffer.get(b);
+ String name = new String(b, UTF_8);
+ RecordId recordId = new RecordId(
+ new UUID(buffer.getLong(), buffer.getLong()),
+ buffer.getInt());
+ journals.put(name, new FileJournal(
+ this, new SegmentNodeState(this, recordId)));
+ }
+ }
}
if (!journals.containsKey("root")) {
@@ -175,56 +116,17 @@ public class FileStore implements Segmen
}
public synchronized void close() {
- if (!files.isEmpty()) {
- files.getLast().force();
+ for (TarFile file : files) {
+ try {
+ file.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
files.clear();
- references.clear();
segments.invalidateAll();
segments.cleanUp();
- System.gc();
- }
-
- private void loadSegments(MappedByteBuffer ro) throws IOException {
- while (ro.remaining() >= 4 * SEGMENT_SIZE) {
- // skip tar header and get the magic bytes; TODO: verify?
- long magic = ro.getLong(ro.position() + SEGMENT_SIZE);
- if (magic == SEGMENT_MAGIC) {
- int position = ro.position() + SEGMENT_SIZE + 8;
- int length = ro.getInt(position);
- int count = ro.getInt(position + 4);
- UUID id = new UUID(
- ro.getLong(position + 8),
- ro.getLong(position + 16));
- references.put(id, new SegmentReference(id, ro, position));
-
- // advance to next entry in the file
- position += 24 + count * 16 + length;
- ro.position((position + 0x1ff) & ~0x1ff);
- } else if (magic == JOURNAL_MAGIC) {
- ro.position(ro.position() + SEGMENT_SIZE + 8);
-
- int count = ro.getInt();
- for (int i = 0; i < count; i++) {
- byte[] n = new byte[ro.getInt()];
- ro.get(n);
- SegmentReference reference = references.get(
- new UUID(ro.getLong(), ro.getLong()));
- checkState(reference != null);
- SegmentNodeState h = new SegmentNodeState(this, new RecordId(
- reference.getSegmentId(), ro.getInt()));
- journals.put(
- new String(n, UTF_8),
- new FileJournal(this, h));
- }
-
- // advance to next entry in the file
- ro.position((ro.position() + 0x1ff) & ~0x1ff);
- } else {
- // last entry encountered
- return;
- }
- }
+ System.gc(); // for any memory-mappings that are no longer used
}
@Override
@@ -243,9 +145,33 @@ public class FileStore implements Segmen
return segments.get(id, new Callable<Segment>() {
@Override
public Segment call() throws Exception {
- SegmentReference reference = references.get(id);
- checkState(reference != null);
- return reference.getSegment();
+ for (TarFile file : files) {
+ ByteBuffer buffer = file.readEntry(id);
+ if (buffer != null) {
+ checkState(SEGMENT_MAGIC == buffer.getLong());
+ int length = buffer.getInt();
+ int count = buffer.getInt();
+
+ checkState(id.equals(new UUID(
+ buffer.getLong(), buffer.getLong())));
+
+ Collection<UUID> referencedIds =
+ newArrayListWithCapacity(count);
+ for (int i = 0; i < count; i++) {
+ referencedIds.add(new UUID(
+ buffer.getLong(), buffer.getLong()));
+ }
+
+ buffer.limit(buffer.position() + length);
+ return new Segment(
+ FileStore.this, id,
+ buffer.slice(), referencedIds,
+ Collections.<String, RecordId>emptyMap(),
+ Collections.<Template, RecordId>emptyMap());
+ }
+ }
+ throw new IllegalStateException(
+ "Segment " + id + " not found");
}
});
} catch (ExecutionException e) {
@@ -259,174 +185,69 @@ public class FileStore implements Segmen
Collection<UUID> referencedSegmentIds,
Map<String, RecordId> strings, Map<Template, RecordId> templates) {
int size = 8 + 4 + 4 + 16 + 16 * referencedSegmentIds.size() + length;
+ ByteBuffer buffer = ByteBuffer.allocate(size);
- MappedByteBuffer rw = prepare(size);
-
- rw.put(createTarHeader(segmentId.toString(), size));
- rw.putLong(SEGMENT_MAGIC);
- int position = rw.position();
-
- rw.putInt(length);
- rw.putInt(referencedSegmentIds.size());
- rw.putLong(segmentId.getMostSignificantBits());
- rw.putLong(segmentId.getLeastSignificantBits());
+ buffer.putLong(SEGMENT_MAGIC);
+ buffer.putInt(length);
+ buffer.putInt(referencedSegmentIds.size());
+ buffer.putLong(segmentId.getMostSignificantBits());
+ buffer.putLong(segmentId.getLeastSignificantBits());
for (UUID referencedSegmentId : referencedSegmentIds) {
- rw.putLong(referencedSegmentId.getMostSignificantBits());
- rw.putLong(referencedSegmentId.getLeastSignificantBits());
+ buffer.putLong(referencedSegmentId.getMostSignificantBits());
+ buffer.putLong(referencedSegmentId.getLeastSignificantBits());
}
- ByteBuffer ro = rw.asReadOnlyBuffer();
- ro.limit(ro.position() + length);
- ro = ro.slice();
-
- rw.put(data, offset, length);
+ int pos = buffer.position();
+ buffer.put(data, offset, length);
- int n = rw.position() % SEGMENT_SIZE;
- if (n > 0) {
- rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - n);
+ try {
+ writeEntry(segmentId, buffer.array());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- SegmentReference previous = references.put(
- segmentId, new SegmentReference(segmentId, rw, position));
- checkState(previous == null);
-
+ buffer.position(pos);
segments.put(segmentId, new Segment(
- this, segmentId, ro, referencedSegmentIds, strings, templates));
+ this, segmentId, buffer.slice(),
+ referencedSegmentIds, strings, templates));
+ }
+
+ private void writeEntry(UUID segmentId, byte[] buffer)
+ throws IOException {
+ if (files.isEmpty() || !files.getLast().writeEntry(
+ segmentId, buffer, 0, buffer.length)) {
+ String name = String.format(FILE_NAME_FORMAT, files.size());
+ TarFile last = new TarFile(new File(directory, name), FILE_SIZE);
+ checkState(last.writeEntry(segmentId, buffer, 0, buffer.length));
+ files.add(last);
+ }
}
@Override
public void deleteSegment(UUID segmentId) {
- if (references.remove(segmentId) == null) {
- throw new IllegalStateException("Missing segment: " + segmentId);
- }
- segments.invalidate(segmentId);
+ // TODO: implement
}
- synchronized void writeJournals() {
+ synchronized void writeJournals() throws IOException {
int size = 8 + 4;
for (String name : journals.keySet()) {
size += 4 + name.getBytes(UTF_8).length + 16 + 4;
}
- MappedByteBuffer rw = prepare(size);
-
- rw.put(createTarHeader(JOURNALS_UUID, size));
-
- rw.putLong(JOURNAL_MAGIC);
- rw.putInt(journals.size());
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ buffer.putLong(JOURNAL_MAGIC);
+ buffer.putInt(journals.size());
for (Map.Entry<String, Journal> entry : journals.entrySet()) {
byte[] name = entry.getKey().getBytes(UTF_8);
- rw.putInt(name.length);
- rw.put(name);
+ buffer.putInt(name.length);
+ buffer.put(name);
RecordId head = entry.getValue().getHead();
- rw.putLong(head.getSegmentId().getMostSignificantBits());
- rw.putLong(head.getSegmentId().getLeastSignificantBits());
- rw.putInt(head.getOffset());
- }
-
- int n = rw.position() % SEGMENT_SIZE;
- if (n > 0) {
- rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - n);
- }
- }
-
- private synchronized MappedByteBuffer prepare(int size) {
- if (!files.isEmpty()) {
- MappedByteBuffer last = files.getLast();
-
- // Check if there's still enough room in the last open file
- int segments = (size + SEGMENT_SIZE - 1) / SEGMENT_SIZE;
- if ((1 + segments + 2) * SEGMENT_SIZE <= last.remaining()) {
- return last;
- }
-
- // No more room, so finish the last file with padding as needed
- if (last.remaining() >= 3 * SEGMENT_SIZE) {
- // Add a padding entry to avoid problems during reopening
- last.put(createTarHeader(
- PADDING_UUID,
- last.remaining() - 3 * SEGMENT_SIZE));
- if (last.remaining() > 2 * SEGMENT_SIZE) {
- last.putLong(PADDING_MAGIC);
- last.put(PADDING_BYTES, 0, SEGMENT_SIZE - 8);
- }
- }
- while (last.remaining() > 0) {
- last.put(PADDING_BYTES);
- }
- last.force();
- }
-
- // Need to start a new file
- try {
- String name = String.format(FILE_NAME_FORMAT, files.size());
- File file = new File(directory, name);
- RandomAccessFile f = new RandomAccessFile(file, "rw");
- try {
- MappedByteBuffer buffer =
- f.getChannel().map(READ_WRITE, 0, FILE_SIZE);
- files.add(buffer);
- return buffer;
- } finally {
- f.close();
- }
- } catch (IOException e) {
- throw new RuntimeException("Unable to create a new segment", e);
+ buffer.putLong(head.getSegmentId().getMostSignificantBits());
+ buffer.putLong(head.getSegmentId().getLeastSignificantBits());
+ buffer.putInt(head.getOffset());
}
- }
-
- private static byte[] createTarHeader(String name, int length) {
- byte[] header = new byte[SEGMENT_SIZE];
-
- // File name
- byte[] n = name.getBytes(UTF_8);
- System.arraycopy(n, 0, header, 0, n.length);
-
- // File mode
- System.arraycopy(
- String.format("%07o", 0400).getBytes(UTF_8), 0,
- header, 100, 7);
-
- // User's numeric user ID
- System.arraycopy(
- String.format("%07o", 0).getBytes(UTF_8), 0,
- header, 108, 7);
-
- // Group's numeric user ID
- System.arraycopy(
- String.format("%07o", 0).getBytes(UTF_8), 0,
- header, 116, 7);
-
- // File size in bytes (octal basis)
- System.arraycopy(
- String.format("%011o", length).getBytes(UTF_8), 0,
- header, 124, 11);
-
- // Last modification time in numeric Unix time format (octal)
- long time = System.currentTimeMillis() / 1000;
- System.arraycopy(
- String.format("%011o", time).getBytes(UTF_8), 0,
- header, 136, 11);
-
- // Checksum for header record
- System.arraycopy(
- new byte[] { ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ' }, 0,
- header, 148, 8);
-
- // Type flag
- header[156] = '0';
-
- // Compute checksum
- int checksum = 0;
- for (int i = 0; i < header.length; i++) {
- checksum += header[i] & 0xff;
- }
- System.arraycopy(
- String.format("%06o", checksum).getBytes(UTF_8), 0,
- header, 148, 6);
- header[154] = 0;
- return header;
+ writeEntry(JOURNALS_UUID, buffer.array());
}
}
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java?rev=1524513&r1=1524512&r2=1524513&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java Wed Sep 18 18:18:25 2013
@@ -18,6 +18,9 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Predicates.equalTo;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.collect.Maps.filterKeys;
import java.io.File;
import java.io.IOException;
@@ -73,6 +76,7 @@ class TarFile {
ImmutableMap.Builder<UUID, Location> builder = ImmutableMap.builder();
+ Location journals = null;
this.position = 0;
while (position + BLOCK_SIZE <= len) {
// read the tar header block
@@ -87,14 +91,22 @@ class TarFile {
}
try {
+ Location location = new Location(position + BLOCK_SIZE, size);
UUID id = UUID.fromString(name);
- builder.put(id, new Location(position + BLOCK_SIZE, size));
+ if (FileStore.JOURNALS_UUID.equals(id)) {
+ journals = location;
+ } else {
+ builder.put(id, location);
+ }
} catch (IllegalArgumentException e) {
throw new IOException("Unexpected tar entry: " + name);
}
position += (1 + (size + BLOCK_SIZE - 1) / BLOCK_SIZE) * BLOCK_SIZE;
}
+ if (journals != null) {
+ builder.put(FileStore.JOURNALS_UUID, journals);
+ }
this.entries = builder.build();
}
@@ -169,7 +181,7 @@ class TarFile {
file.write(position, b, offset, size);
entries = ImmutableMap.<UUID, Location>builder()
- .putAll(entries)
+ .putAll(filterKeys(entries, not(equalTo(id))))
.put(id, new Location(position, size))
.build();
position += size;