You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2014/04/02 02:39:34 UTC
svn commit: r1583839 - in /jackrabbit/oak/trunk/oak-core/src:
main/java/org/apache/jackrabbit/oak/plugins/segment/
main/java/org/apache/jackrabbit/oak/plugins/segment/file/
test/java/org/apache/jackrabbit/oak/plugins/segment/file/
Author: jukka
Date: Wed Apr 2 00:39:33 2014
New Revision: 1583839
URL: http://svn.apache.org/r1583839
Log:
OAK-631: SegmentMK: Implement garbage collection
Split TarFile into TarReader/Writer
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (with props)
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java (with props)
Removed:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/RandomAccess.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Apr 2 00:39:33 2014
@@ -124,6 +124,10 @@ public class Segment {
this.data = checkNotNull(data);
if (id.isDataSegmentId()) {
+ checkState(data.get(0) == '0'
+ && data.get(1) == 'a'
+ && data.get(2) == 'K'
+ && data.get(3) == '\n');
this.refids = new SegmentId[getRefCount()];
refids[0] = id;
} else {
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java Wed Apr 2 00:39:33 2014
@@ -16,22 +16,130 @@
*/
package org.apache.jackrabbit.oak.plugins.segment.file;
+import static com.google.common.base.Preconditions.checkState;
+import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
+
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.zip.CRC32;
-interface FileAccess {
-
- int length() throws IOException;
-
- long crc32(int position, int size) throws IOException;
-
- ByteBuffer read(int position, int length) throws IOException;
-
- void write(int position, byte[] b, int offset, int length)
- throws IOException;
+abstract class FileAccess {
- void flush() throws IOException;
+ static FileAccess open(File file, boolean memoryMapping)
+ throws IOException {
+ RandomAccessFile access = new RandomAccessFile(file, "r");
+ if (memoryMapping) {
+ return new Mapped(access);
+ } else {
+ return new Random(access);
+ }
+ }
+
+ abstract boolean isMemoryMapped();
+
+ abstract int length() throws IOException;
+
+ abstract long crc32(int position, int size) throws IOException;
+
+ abstract ByteBuffer read(int position, int length) throws IOException;
+
+ abstract void close() throws IOException;
+
+ //-----------------------------------------------------------< private >--
+
+ private static class Mapped extends FileAccess {
+
+ private final MappedByteBuffer buffer;
+
+ Mapped(RandomAccessFile file) throws IOException {
+ try {
+ buffer = file.getChannel().map(READ_ONLY, 0, file.length());
+ } finally {
+ file.close();
+ }
+ }
+
+ @Override
+ boolean isMemoryMapped() {
+ return true;
+ }
+
+ @Override
+ public int length() {
+ return buffer.remaining();
+ }
+
+ @Override
+ public long crc32(int position, int length) {
+ ByteBuffer entry = buffer.asReadOnlyBuffer();
+ entry.position(entry.position() + position);
+
+ byte[] data = new byte[length];
+ entry.get(data);
+
+ CRC32 checksum = new CRC32();
+ checksum.update(data);
+ return checksum.getValue();
+ }
+
+ @Override
+ public ByteBuffer read(int position, int length) {
+ ByteBuffer entry = buffer.asReadOnlyBuffer();
+ entry.position(entry.position() + position);
+ entry.limit(entry.position() + length);
+ return entry.slice();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+
+ private static class Random extends FileAccess {
+
+ private final RandomAccessFile file;
+
+ Random(RandomAccessFile file) {
+ this.file = file;
+ }
+
+ @Override
+ boolean isMemoryMapped() {
+ return false;
+ }
+
+ @Override
+ public int length() throws IOException {
+ long length = file.length();
+ checkState(length < Integer.MAX_VALUE);
+ return (int) length;
+ }
+
+ @Override
+ public long crc32(int position, int length) throws IOException {
+ CRC32 checksum = new CRC32();
+ checksum.update(read(position, length).array());
+ return checksum.getValue();
+ }
+
+ @Override
+ public synchronized ByteBuffer read(int position, int length)
+ throws IOException {
+ ByteBuffer entry = ByteBuffer.allocate(length);
+ file.seek(position);
+ file.readFully(entry.array());
+ return entry;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ file.close();
+ }
- void close() throws IOException;
+ }
}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Apr 2 00:39:33 2014
@@ -20,11 +20,11 @@ import static com.google.common.base.Pre
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Lists.newCopyOnWriteArrayList;
import static com.google.common.collect.Lists.newLinkedList;
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Maps.newTreeMap;
import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
@@ -84,7 +84,13 @@ public class FileStore implements Segmen
private final boolean memoryMapping;
- private final List<TarFile> files;
+ private volatile List<TarReader> readers;
+
+ private int writeNumber;
+
+ private File writeFile;
+
+ private TarWriter writer;
private final RandomAccessFile journalFile;
@@ -147,14 +153,23 @@ public class FileStore implements Segmen
this.memoryMapping = memoryMapping;
Map<Integer, File> map = collectFiles(directory);
- List<TarFile> list = newArrayListWithCapacity(map.size());
+ this.readers = newArrayListWithCapacity(map.size());
Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
Arrays.sort(indices);
- for (Integer index : indices) {
- File file = map.get(index);
- list.add(new TarFile(file, maxFileSize, memoryMapping));
+ for (int i = indices.length - 1; i >= 0; i--) {
+ readers.add(TarReader.open(
+ singletonMap('a', map.get(indices[i])), memoryMapping));
}
- this.files = newCopyOnWriteArrayList(list);
+
+ if (indices.length > 0) {
+ this.writeNumber = indices[indices.length - 1] + 1;
+ } else {
+ this.writeNumber = 0;
+ }
+ this.writeFile = new File(
+ directory,
+ String.format(FILE_NAME_FORMAT, writeNumber, "a"));
+ this.writer = new TarWriter(writeFile);
journalFile = new RandomAccessFile(
new File(directory, JOURNAL_FILE_NAME), "rw");
@@ -284,16 +299,7 @@ public class FileStore implements Segmen
tracker.getWriter().flush();
synchronized (this) {
- boolean success = true;
- for (TarFile file : files) {
- success = success && file.flush();
- }
- if (!success) {
- log.warn("Failed to sync one ore more tar files with"
- + " the underlying file system, possibly because of"
- + " http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6539707."
- + " Will retry later.");
- }
+ writer.flush();
journalFile.writeBytes(after + " root\n");
journalFile.getChannel().force(false);
persistedHead.set(after);
@@ -302,10 +308,15 @@ public class FileStore implements Segmen
}
}
- public Iterable<SegmentId> getSegmentIds() {
+ public synchronized Iterable<SegmentId> getSegmentIds() {
List<SegmentId> ids = newArrayList();
- for (TarFile file : files) {
- for (UUID uuid : file.getUUIDs()) {
+ for (UUID uuid : writer.getUUIDs()) {
+ ids.add(tracker.getSegmentId(
+ uuid.getMostSignificantBits(),
+ uuid.getLeastSignificantBits()));
+ }
+ for (TarReader reader : readers) {
+ for (UUID uuid : reader.getUUIDs()) {
ids.add(tracker.getSegmentId(
uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits()));
@@ -346,12 +357,14 @@ public class FileStore implements Segmen
synchronized (this) {
flush();
+ writer.close();
journalFile.close();
- for (TarFile file : files) {
- file.close();
+ List<TarReader> list = readers;
+ readers = newArrayList();
+ for (TarReader reader : list) {
+ reader.close();
}
- files.clear();
System.gc(); // for any memory-mappings that are no longer used
}
@@ -373,17 +386,14 @@ public class FileStore implements Segmen
}
private boolean containsSegment(long msb, long lsb) {
- for (TarFile file : files.toArray(new TarFile[0])) {
- try {
- ByteBuffer buffer = file.readEntry(msb, lsb);
- if (buffer != null) {
- return true;
- }
- } catch (IOException e) {
- log.warn("Failed to access file " + file, e);
+ for (TarReader reader : readers) {
+ if (reader.containsEntry(msb, lsb)) {
+ return true;
}
}
- return false;
+ synchronized (this) {
+ return writer.containsEntry(msb, lsb);
+ }
}
@Override
@@ -391,14 +401,32 @@ public class FileStore implements Segmen
long msb = id.getMostSignificantBits();
long lsb = id.getLeastSignificantBits();
- for (TarFile file : files) {
+ for (TarReader reader : readers) {
+ try {
+ ByteBuffer buffer = reader.readEntry(msb, lsb);
+ if (buffer != null) {
+ return new Segment(tracker, id, buffer);
+ }
+ } catch (IOException e) {
+ log.warn("Failed to read from tar file " + reader, e);
+ }
+ }
+
+ synchronized (this) {
+ ByteBuffer buffer = writer.readEntry(msb, lsb);
+ if (buffer != null) {
+ return new Segment(tracker, id, buffer);
+ }
+ }
+
+ for (TarReader reader : readers) {
try {
- ByteBuffer buffer = file.readEntry(msb, lsb);
+ ByteBuffer buffer = reader.readEntry(msb, lsb);
if (buffer != null) {
return new Segment(tracker, id, buffer);
}
} catch (IOException e) {
- log.warn("Failed to access file " + file, e);
+ log.warn("Failed to read from tar file " + reader, e);
}
}
@@ -409,16 +437,24 @@ public class FileStore implements Segmen
public synchronized void writeSegment(
SegmentId id, byte[] data, int offset, int length) {
try {
- UUID uuid = new UUID(
+ long size = writer.writeEntry(
id.getMostSignificantBits(),
- id.getLeastSignificantBits());
- if (files.isEmpty() || !files.get(files.size() - 1).writeEntry(
- uuid, data, offset, length)) {
- String name = format(FILE_NAME_FORMAT, files.size(), "a");
- File file = new File(directory, name);
- TarFile last = new TarFile(file, maxFileSize, memoryMapping);
- checkState(last.writeEntry(uuid, data, offset, length));
- files.add(last);
+ id.getLeastSignificantBits(),
+ data, offset, length);
+ if (size >= maxFileSize) {
+ writer.close();
+
+ List<TarReader> list =
+ newArrayListWithCapacity(1 + readers.size());
+ list.add(new TarReader(writeFile, memoryMapping));
+ list.addAll(readers);
+ readers = list;
+
+ writeNumber++;
+ writeFile = new File(
+ directory,
+ String.format(FILE_NAME_FORMAT, writeNumber, "a"));
+ writer = new TarWriter(writeFile);
}
} catch (IOException e) {
throw new RuntimeException(e);
Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java Wed Apr 2 00:39:33 2014
@@ -20,20 +20,20 @@ import java.util.Comparator;
class TarEntry {
- static final Comparator<TarEntry> REVERSE_OFFSET = new Comparator<TarEntry>() {
+ static final Comparator<TarEntry> OFFSET_ORDER = new Comparator<TarEntry>() {
@Override
public int compare(TarEntry a, TarEntry b) {
if (a.offset > b.offset) {
- return -1;
- } else if (a.offset < b.offset) {
return 1;
+ } else if (a.offset < b.offset) {
+ return -1;
} else {
return 0;
}
}
};
- static final Comparator<TarEntry> IDENTIFIER = new Comparator<TarEntry>() {
+ static final Comparator<TarEntry> IDENTIFIER_ORDER = new Comparator<TarEntry>() {
@Override
public int compare(TarEntry a, TarEntry b) {
if (a.msb > b.msb) {
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1583839&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java Wed Apr 2 00:39:33 2014
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.collect.Maps.newLinkedHashMap;
+import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
+import static org.apache.jackrabbit.oak.plugins.segment.Segment.REF_COUNT_OFFSET;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.CRC32;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TarReader {
+
+ /** Logger instance */
+ private static final Logger log = LoggerFactory.getLogger(TarReader.class);
+
+ /** Magic byte sequence at the end of the index block. */
+ private static final int INDEX_MAGIC = TarWriter.INDEX_MAGIC;
+
+ /** Pattern of the segment entry names */
+ private static final Pattern NAME_PATTERN = Pattern.compile(
+ "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+ + "(\\.([0-9a-f]{8}))?");
+
+ /** The tar file block size. */
+ private static final int BLOCK_SIZE = TarWriter.BLOCK_SIZE;
+
+ private static final int getEntrySize(int size) {
+ return BLOCK_SIZE + size + TarWriter.getPaddingSize(size);
+ }
+
+ static TarReader open(Map<Character, File> files, boolean memoryMapping)
+ throws IOException {
+ Character[] generations =
+ files.keySet().toArray(new Character[files.size()]);
+ Arrays.sort(generations);
+ for (int i = generations.length - 1; i >= 0; i--) {
+ File file = files.get(generations[i]);
+ try {
+ TarReader reader = new TarReader(file, memoryMapping);
+ if (reader.index != null) {
+ // found a generation with a valid index, drop the others
+ for (File other : files.values()) {
+ if (other != file) {
+ log.info("Removing unused tar file {}", other);
+ other.delete();
+ }
+ }
+ return reader;
+ } else {
+ reader.close();
+ }
+ } catch (IOException e) {
+ log.warn("Failed to access tar file " + file, e);
+ }
+ }
+
+ // no generation has a valid index, so recover as much as we can
+ LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap();
+ for (File file : files.values()) {
+ try {
+ FileAccess access = FileAccess.open(file, memoryMapping);
+ try {
+ recoverEntries(file, access, entries);
+ } finally {
+ access.close();
+ }
+ } catch (IOException e) {
+ log.warn("Failed to access tar file " + file, e);
+ }
+ }
+
+ // regenerate the first generation based on the recovered data
+ File file = files.get(generations[0]);
+ File backup = new File(file.getParentFile(), file.getName() + ".bak");
+ if (backup.exists()) {
+ log.info("Removing old backup file " + backup);
+ backup.delete();
+ }
+ if (!file.renameTo(backup)) {
+ throw new IOException("Could not backup tar file " + file);
+ }
+
+ log.info("Regenerating tar file " + file);
+ TarWriter writer = new TarWriter(file);
+ for (Map.Entry<UUID, byte[]> entry : entries.entrySet()) {
+ UUID uuid = entry.getKey();
+ byte[] data = entry.getValue();
+ writer.writeEntry(
+ uuid.getMostSignificantBits(),
+ uuid.getLeastSignificantBits(),
+ data, 0, data.length);
+ }
+ writer.close();
+
+ log.info("Tar file regenerated, removing backup file " + backup);
+ backup.delete();
+ for (File other : files.values()) {
+ if (other != file) {
+ log.info("Removing unused tar file {}", other);
+ other.delete();
+ }
+ }
+
+ return new TarReader(file, memoryMapping);
+ }
+
+ /**
+ * Scans through the tar file, looking for all segment entries.
+ *
+ * @return map of all segment entries in this tar file
+ * @throws IOException if the tar file could not be read
+ */
+ private static void recoverEntries(
+ File file, FileAccess access, LinkedHashMap<UUID, byte[]> entries)
+ throws IOException {
+ int position = 0;
+ int length = access.length();
+ while (position + TarWriter.BLOCK_SIZE <= length) {
+ // read the tar header block
+ ByteBuffer header = access.read(position, BLOCK_SIZE);
+ int pos = header.position();
+ String name = readString(header, 100);
+ header.position(pos + 124);
+ int size = readNumber(header, 12);
+
+ if (name.isEmpty() && size == 0) {
+ return; // no more entries in this file
+ } else if (position + BLOCK_SIZE + size > length) {
+ log.warn("Invalid entry {} in tar file {}", name, file);
+ return; // invalid entry, stop here
+ }
+
+ Matcher matcher = NAME_PATTERN.matcher(name);
+ if (matcher.matches()) {
+ UUID id = UUID.fromString(matcher.group(1));
+
+ String checksum = matcher.group(3);
+ if (checksum == null && entries.containsKey(id)) {
+ // entry already loaded, so skip
+ } else {
+ byte[] data = new byte[size];
+ access.read(position + BLOCK_SIZE, size).get(data);
+
+ if (checksum == null) {
+ entries.put(id, data);
+ } else {
+ CRC32 crc = new CRC32();
+ crc.update(data);
+ if (crc.getValue() == Long.parseLong(checksum, 16)) {
+ entries.put(id, data);
+ } else {
+ log.warn("Checksum mismatch in entry {} of tar file {}", name, file);
+ }
+ }
+ }
+ } else if (!name.equals(file.getName() + ".idx")) {
+ log.warn("Ignoring unexpected entry {} in tar file {}",
+ name, file);
+ }
+
+ position += getEntrySize(size);
+ }
+ }
+
+ private final File file;
+
+ private final FileAccess access;
+
+ private final ByteBuffer index;
+
+ TarReader(File file, boolean memoryMapping) throws IOException {
+ this.file = file;
+ this.access = FileAccess.open(file, memoryMapping);
+
+ ByteBuffer index = null;
+ try {
+ index = loadAndValidateIndex();
+ } catch (IOException e) {
+ log.warn("Unable to access tar file " + file, e);
+ }
+ this.index = index;
+ }
+
+ Set<UUID> getUUIDs() {
+ Set<UUID> uuids = newHashSetWithExpectedSize(index.remaining() / 24);
+ int position = index.position();
+ while (position < index.limit()) {
+ uuids.add(new UUID(
+ index.getLong(position),
+ index.getLong(position + 8)));
+ position += 24;
+ }
+ return uuids;
+ }
+
+ boolean containsEntry(long msb, long lsb) {
+ return findEntry(msb, lsb) != -1;
+ }
+
+ ByteBuffer readEntry(long msb, long lsb) throws IOException {
+ int position = findEntry(msb, lsb);
+ if (position != -1) {
+ return access.read(
+ index.getInt(position + 16),
+ index.getInt(position + 20));
+ } else {
+ return null;
+ }
+ }
+
+ private int findEntry(long msb, long lsb) {
+ // The segment identifiers are randomly generated with uniform
+ // distribution, so we can use interpolation search to find the
+ // matching entry in the index. The average runtime is O(log log n).
+
+ int lowIndex = 0;
+ int highIndex = index.remaining() / 24 - 1;
+ float lowValue = Long.MIN_VALUE;
+ float highValue = Long.MAX_VALUE;
+ float targetValue = msb;
+
+ while (lowIndex <= highIndex) {
+ int guessIndex = lowIndex + Math.round(
+ (highIndex - lowIndex)
+ * (targetValue - lowValue)
+ / (highValue - lowValue));
+ int position = index.position() + guessIndex * 24;
+ long m = index.getLong(position);
+ if (msb < m) {
+ highIndex = guessIndex - 1;
+ highValue = m;
+ } else if (msb > m) {
+ lowIndex = guessIndex + 1;
+ lowValue = m;
+ } else {
+ // getting close...
+ long l = index.getLong(position + 8);
+ if (lsb < l) {
+ highIndex = guessIndex - 1;
+ highValue = m;
+ } else if (lsb > l) {
+ lowIndex = guessIndex + 1;
+ lowValue = m;
+ } else {
+ // found it!
+ return position;
+ }
+ }
+ }
+
+ // not found
+ return -1;
+ }
+
+ synchronized TarReader cleanup(Set<UUID> referencedIds) throws IOException {
+ TarEntry[] sorted = new TarEntry[index.remaining() / 24];
+ int position = index.position();
+ for (int i = 0; position < index.limit(); i++) {
+ sorted[i] = new TarEntry(
+ index.getLong(position),
+ index.getLong(position + 8),
+ index.getInt(position + 16),
+ index.getInt(position + 20));
+ position += 24;
+ }
+ Arrays.sort(sorted, TarEntry.OFFSET_ORDER);
+
+ int size = 0;
+ int count = 0;
+ for (int i = sorted.length - 1; i >= 0; i--) {
+ TarEntry entry = sorted[i];
+ UUID id = new UUID(entry.msb(), entry.lsb());
+ if (!referencedIds.remove(id)) {
+ // this segment is not referenced anywhere
+ sorted[i] = null;
+ } else {
+ size += getEntrySize(entry.size());
+ count += 1;
+
+ if (isDataSegmentId(entry.lsb())) {
+ // this is a referenced data segment, so follow the graph
+ ByteBuffer segment = access.read(
+ entry.offset(),
+ Math.min(entry.size(), 16 * 256));
+ int pos = segment.position();
+ int refcount = segment.get(pos + REF_COUNT_OFFSET) & 0xff;
+ int refend = pos + 16 * (refcount + 1);
+ for (int refpos = pos + 16; refpos < refend; refpos += 16) {
+ referencedIds.add(new UUID(
+ segment.getLong(refpos),
+ segment.getLong(refpos + 8)));
+ }
+ }
+ }
+ }
+ size += getEntrySize(24 * count + 16);
+ size += 2 * BLOCK_SIZE;
+
+ if (size >= access.length() * 3 / 4) {
+ // the space savings are not worth it at less than 25%
+ return this;
+ }
+
+ String name = file.getName();
+ int pos = name.length() - "a.tar".length();
+ char generation = name.charAt(pos);
+ if (generation == 'z') {
+ // no garbage collection after reaching generation z
+ return this;
+ }
+
+ File newFile = new File(
+ file.getParentFile(),
+ name.substring(0, pos) + (char) (generation + 1) + ".tar");
+ TarWriter writer = new TarWriter(newFile);
+ for (int i = 0; i < sorted.length; i++) {
+ TarEntry entry = sorted[i];
+ if (entry != null) {
+ byte[] data = new byte[entry.size()];
+ access.read(entry.offset(), entry.size()).get(data);
+ writer.writeEntry(
+ entry.msb(), entry.lsb(), data, 0, entry.size());
+ }
+ }
+ writer.close();
+
+ return new TarReader(newFile, access.isMemoryMapped());
+ }
+
+ void close() throws IOException {
+ access.close();
+ }
+
+ //-----------------------------------------------------------< private >--
+
+ /**
+ * Tries to read an existing index from the tar file. The index is
+ * returned if it is found and looks valid (correct checksum, passes
+ * sanity checks).
+ *
+ * @return tar index, or {@code null} if not found or not valid
+ * @throws IOException if the tar file could not be read
+ */
+ private ByteBuffer loadAndValidateIndex() throws IOException {
+ long length = file.length();
+ if (length % BLOCK_SIZE != 0
+ || length < 6 * BLOCK_SIZE
+ || length > Integer.MAX_VALUE) {
+ log.warn("Unexpected size {} of tar file {}", length, file);
+ return null; // unexpected file size
+ }
+
+ // read the index metadata just before the two final zero blocks
+ ByteBuffer meta = access.read((int) (length - 2 * BLOCK_SIZE - 16), 16);
+ int crc32 = meta.getInt();
+ int count = meta.getInt();
+ int bytes = meta.getInt();
+ int magic = meta.getInt();
+
+ if (magic != INDEX_MAGIC) {
+ log.warn("No index found in tar file {}", file);
+ return null; // magic byte mismatch
+ }
+
+ if (count < 1 || bytes < count * 24 + 16 || bytes % BLOCK_SIZE != 0) {
+ log.warn("Invalid index metadata in tar file {}", file);
+ return null; // impossible entry and/or byte counts
+ }
+
+ ByteBuffer index = access.read(
+ (int) (length - 2 * BLOCK_SIZE - 16 - count * 24),
+ count * 24);
+ index.mark();
+
+ CRC32 checksum = new CRC32();
+ long limit = length - 2 * BLOCK_SIZE - bytes - BLOCK_SIZE;
+ long lastmsb = Long.MIN_VALUE;
+ long lastlsb = Long.MIN_VALUE;
+ byte[] entry = new byte[24];
+ for (int i = 0; i < count; i++) {
+ index.get(entry);
+ checksum.update(entry);
+
+ ByteBuffer buffer = ByteBuffer.wrap(entry);
+ long msb = buffer.getLong();
+ long lsb = buffer.getLong();
+ int offset = buffer.getInt();
+ int size = buffer.getInt();
+
+ if (lastmsb > msb || (lastmsb == msb && lastlsb > lsb)) {
+ log.warn("Incorrect index ordering in tar file {}", file);
+ return null;
+ } else if (lastmsb == msb && lastlsb == lsb && i > 0) {
+ log.warn("Duplicate index entry in tar file {}", file);
+ return null;
+ } else if (offset < 0 || offset % BLOCK_SIZE != 0) {
+ log.warn("Invalid index entry offset in tar file {}", file);
+ return null;
+ } else if (size < 1 || offset + size > limit) {
+ log.warn("Invalid index entry size in tar file {}", file);
+ return null;
+ }
+
+ lastmsb = msb;
+ lastlsb = lsb;
+ }
+
+ if (crc32 != (int) checksum.getValue()) {
+ log.warn("Invalid index checksum in tar file {}", file);
+ return null; // checksum mismatch
+ }
+
+ index.reset();
+ return index;
+ }
+
+ private static String readString(ByteBuffer buffer, int fieldSize) {
+ byte[] b = new byte[fieldSize];
+ buffer.get(b);
+ int n = 0;
+ while (n < fieldSize && b[n] != 0) {
+ n++;
+ }
+ return new String(b, 0, n, UTF_8);
+ }
+
+ private static int readNumber(ByteBuffer buffer, int fieldSize) {
+ byte[] b = new byte[fieldSize];
+ buffer.get(b);
+ int number = 0;
+ for (int i = 0; i < fieldSize; i++) {
+ int digit = b[i] & 0xff;
+ if ('0' <= digit && digit <= '7') {
+ number = number * 8 + digit - '0';
+ } else {
+ break;
+ }
+ }
+ return number;
+ }
+
+ //------------------------------------------------------------< Object >--
+
+ @Override
+ public String toString() {
+ return file.toString();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java?rev=1583839&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java Wed Apr 2 00:39:33 2014
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.zip.CRC32;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TarWriter {
+
+ /** Logger instance */
+ private static final Logger log = LoggerFactory.getLogger(TarWriter.class);
+
+ /** Magic byte sequence at the end of the index block. */
+ static final int INDEX_MAGIC =
+ ('\n' << 24) + ('0' << 16) + ('K' << 8) + '\n';
+
+ /** The tar file block size. */
+ static final int BLOCK_SIZE = 512;
+
+ private static final byte[] ZERO_BYTES = new byte[BLOCK_SIZE];
+
+ static final int getPaddingSize(int size) {
+ int remainder = size % BLOCK_SIZE;
+ if (remainder > 0) {
+ return BLOCK_SIZE - remainder;
+ } else {
+ return 0;
+ }
+ }
+
+ private final File file;
+
+ private RandomAccessFile access = null;
+
+ private final Map<UUID, TarEntry> index = newHashMap();
+
+ TarWriter(File file) {
+ this.file = file;
+ }
+
+ synchronized Set<UUID> getUUIDs() {
+ return newHashSet(index.keySet());
+ }
+
+ synchronized boolean containsEntry(long msb, long lsb) {
+ return index.containsKey(new UUID(msb, lsb));
+ }
+
+ synchronized ByteBuffer readEntry(long msb, long lsb) {
+ TarEntry entry = index.get(new UUID(msb, lsb));
+ if (entry != null) {
+ checkState(access != null); // implied by entry != null
+ try {
+ try {
+ byte[] data = new byte[entry.size()];
+ access.seek(entry.offset());
+ access.readFully(data);
+ return ByteBuffer.wrap(data);
+ } finally {
+ access.seek(access.length());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Unable to read from tar file " + file, e);
+ }
+ } else {
+ return null;
+ }
+ }
+
+ long writeEntry(
+ long msb, long lsb, byte[] data, int offset, int size)
+ throws IOException {
+ checkNotNull(data);
+ checkPositionIndexes(offset, offset + size, data.length);
+
+ UUID uuid = new UUID(msb, lsb);
+ CRC32 checksum = new CRC32();
+ checksum.update(data, offset, size);
+ String entryName = String.format("%s.%08x", uuid, checksum.getValue());
+ byte[] header = newEntryHeader(entryName, size);
+
+ log.debug("Writing segment {} to {}", uuid, file);
+ return writeEntry(uuid, header, data, offset, size);
+ }
+
+ private synchronized long writeEntry(
+ UUID uuid, byte[] header, byte[] data, int offset, int size)
+ throws IOException {
+ if (access == null) {
+ access = new RandomAccessFile(file, "rw");
+ }
+
+ access.write(header);
+ access.write(data, offset, size);
+ int padding = getPaddingSize(size);
+ if (padding > 0) {
+ access.write(ZERO_BYTES, 0, padding);
+ }
+
+ long length = access.getFilePointer();
+ checkState(length <= Integer.MAX_VALUE);
+ TarEntry entry = new TarEntry(
+ uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(),
+ (int) (length - size - padding), size);
+ index.put(uuid, entry);
+
+ return length;
+ }
+
+ synchronized void flush() throws IOException {
+ if (access != null) {
+ access.getFD().sync();
+ }
+ }
+
+ synchronized void close() throws IOException {
+ if (access != null) {
+ int indexSize = index.size() * 24 + 16;
+ int padding = getPaddingSize(indexSize);
+
+ String indexName = file.getName() + ".idx";
+ byte[] header = newEntryHeader(indexName, indexSize);
+
+ ByteBuffer buffer = ByteBuffer.allocate(indexSize);
+ TarEntry[] sorted = index.values().toArray(new TarEntry[index.size()]);
+ Arrays.sort(sorted, TarEntry.IDENTIFIER_ORDER);
+ for (TarEntry entry : sorted) {
+ buffer.putLong(entry.msb());
+ buffer.putLong(entry.lsb());
+ buffer.putInt(entry.offset());
+ buffer.putInt(entry.size());
+ }
+
+ CRC32 checksum = new CRC32();
+ checksum.update(buffer.array(), 0, buffer.position());
+ buffer.putInt((int) checksum.getValue());
+ buffer.putInt(index.size());
+ buffer.putInt(padding + indexSize);
+ buffer.putInt(INDEX_MAGIC);
+
+ access.write(header);
+ if (padding > 0) {
+ // padding comes *before* the index!
+ access.write(ZERO_BYTES, 0, padding);
+ }
+ access.write(buffer.array());
+ access.write(ZERO_BYTES);
+ access.write(ZERO_BYTES);
+ access.close();
+
+ access = null;
+ }
+ }
+
+ private byte[] newEntryHeader(String name, int size) throws IOException {
+ byte[] header = new byte[BLOCK_SIZE];
+
+ // File name
+ byte[] nameBytes = name.getBytes(UTF_8);
+ System.arraycopy(
+ nameBytes, 0, header, 0, Math.min(nameBytes.length, 100));
+
+ // File mode
+ System.arraycopy(
+ String.format("%07o", 0400).getBytes(UTF_8), 0,
+ header, 100, 7);
+
+ // User's numeric user ID
+ System.arraycopy(
+ String.format("%07o", 0).getBytes(UTF_8), 0,
+ header, 108, 7);
+
+ // Group's numeric user ID
+ System.arraycopy(
+ String.format("%07o", 0).getBytes(UTF_8), 0,
+ header, 116, 7);
+
+ // File size in bytes (octal basis)
+ System.arraycopy(
+ String.format("%011o", size).getBytes(UTF_8), 0,
+ header, 124, 11);
+
+ // Last modification time in numeric Unix time format (octal)
+ long time = System.currentTimeMillis() / 1000;
+ System.arraycopy(
+ String.format("%011o", time).getBytes(UTF_8), 0,
+ header, 136, 11);
+
+ // Checksum for header record
+ System.arraycopy(
+ new byte[] { ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ' }, 0,
+ header, 148, 8);
+
+ // Type flag
+ header[156] = '0';
+
+ // Compute checksum
+ int checksum = 0;
+ for (int i = 0; i < header.length; i++) {
+ checksum += header[i] & 0xff;
+ }
+ System.arraycopy(
+ String.format("%06o", checksum).getBytes(UTF_8), 0,
+ header, 148, 6);
+ header[154] = 0;
+
+ return header;
+ }
+
+ //------------------------------------------------------------< Object >--
+
+ @Override
+ public String toString() {
+ return file.toString();
+ }
+
+}
Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java Wed Apr 2 00:39:33 2014
@@ -46,7 +46,7 @@ public class FileStoreTest {
@Test
public void testRecovery() throws IOException {
- FileStore store = new FileStore(directory, 1);
+ FileStore store = new FileStore(directory, 1, false);
store.flush(); // first 1kB
SegmentNodeState base = store.getHead();
@@ -61,27 +61,25 @@ public class FileStoreTest {
store.setHead(base, builder.getNodeState());
store.close(); // third 1kB
- store = new FileStore(directory, 1);
+ store = new FileStore(directory, 1, false);
assertEquals("b", store.getHead().getString("step"));
store.close();
RandomAccessFile file = new RandomAccessFile(
new File(directory, "data00000a.tar"), "rw");
- file.seek(2048);
- file.write(new byte[1024], 0, 1024);
+ file.setLength(2048);
file.close();
- store = new FileStore(directory, 1);
+ store = new FileStore(directory, 1, false);
assertEquals("a", store.getHead().getString("step"));
store.close();
file = new RandomAccessFile(
new File(directory, "data00000a.tar"), "rw");
- file.seek(1024);
- file.write(new byte[1024], 0, 1024);
+ file.setLength(1024);
file.close();
- store = new FileStore(directory, 1);
+ store = new FileStore(directory, 1, false);
assertFalse(store.getHead().hasProperty("step"));
store.close();
}
Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java Wed Apr 2 00:39:33 2014
@@ -34,18 +34,7 @@ public class TarFileTest {
@Before
public void setUp() throws IOException {
- file = File.createTempFile("TarFileTest", ".tar");
- }
-
- @After
- public void tearDown() {
- file.delete();
- }
-
- @Test
- public void testOpenClose() throws IOException {
- new TarFile(file, 10240, true).close();
- new TarFile(file, 10240, false).close();
+ file = File.createTempFile("TarFileTest", ".tar", new File("target"));
}
@Test
@@ -55,21 +44,31 @@ public class TarFileTest {
long lsb = id.getLeastSignificantBits();
byte[] data = "Hello, World!".getBytes(UTF_8);
- TarFile tar = new TarFile(file, 10240, false);
+ TarWriter writer = new TarWriter(file);
try {
- tar.writeEntry(id, data, 0, data.length);
- assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
+ writer.writeEntry(
+ id.getMostSignificantBits(),
+ id.getLeastSignificantBits(),
+ data, 0, data.length);
+ assertEquals(ByteBuffer.wrap(data), writer.readEntry(msb, lsb));
} finally {
- tar.close();
+ writer.close();
}
- assertEquals(10240, file.length());
+ assertEquals(3072, file.length());
+
+ TarReader reader = new TarReader(file, false);
+ try {
+ assertEquals(ByteBuffer.wrap(data), reader.readEntry(msb, lsb));
+ } finally {
+ reader.close();
+ }
- tar = new TarFile(file, 10240, false);
+ reader = new TarReader(file, false);
try {
- assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
+ assertEquals(ByteBuffer.wrap(data), reader.readEntry(msb, lsb));
} finally {
- tar.close();
+ reader.close();
}
}