You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by to...@apache.org on 2018/02/13 11:17:43 UTC
svn commit: r1824115 [2/3] - in /jackrabbit/oak/trunk:
oak-run/src/main/java/org/apache/jackrabbit/oak/explorer/
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ oak-...
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java?rev=1824115&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarManager.java Tue Feb 13 11:17:42 2018
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.file.tar;
+
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.file.tar.index.Index;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.CRC32;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static java.nio.ByteBuffer.wrap;
+import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE;
+
+public class SegmentTarManager implements SegmentArchiveManager {
+
+ /**
+ * Pattern of the segment entry names. Note the trailing (\\..*)? group
+ * that's included for compatibility with possible future extensions.
+ */
+ private static final Pattern NAME_PATTERN = Pattern.compile(
+ "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+ + "(\\.([0-9a-f]{8}))?(\\..*)?");
+
+ private static final Logger log = LoggerFactory.getLogger(SegmentTarManager.class);
+
+ private final File segmentstoreDir;
+
+ private final FileStoreMonitor fileStoreMonitor;
+
+ private final IOMonitor ioMonitor;
+
+ private final boolean memoryMapping;
+
+ public SegmentTarManager(File segmentstoreDir, FileStoreMonitor fileStoreMonitor, IOMonitor ioMonitor, boolean memoryMapping) {
+ this.segmentstoreDir = segmentstoreDir;
+ this.fileStoreMonitor = fileStoreMonitor;
+ this.ioMonitor = ioMonitor;
+ this.memoryMapping = memoryMapping;
+ }
+
+ @Override
+ public List<String> listArchives() {
+ return Arrays.asList(segmentstoreDir.list());
+ }
+
+ @Override
+ public SegmentArchiveReader open(String name) throws IOException {
+ File file = new File(segmentstoreDir, name);
+ RandomAccessFile access = new RandomAccessFile(file, "r");
+ try {
+ Index index = SegmentTarReader.loadAndValidateIndex(access, name);
+ if (index == null) {
+ log.info("No index found in tar file {}, skipping...", name);
+ return null;
+ } else {
+ if (memoryMapping) {
+ try {
+ FileAccess mapped = new FileAccess.Mapped(access);
+ return new SegmentTarReader(file, mapped, index, ioMonitor);
+ } catch (IOException e) {
+ log.warn("Failed to mmap tar file {}. Falling back to normal file " +
+ "IO, which will negatively impact repository performance. " +
+ "This problem may have been caused by restrictions on the " +
+ "amount of virtual memory available to the JVM. Please make " +
+ "sure that a 64-bit JVM is being used and that the process " +
+ "has access to unlimited virtual memory (ulimit option -v).",
+ name, e);
+ }
+ }
+
+ FileAccess random = new FileAccess.Random(access);
+ // prevent the finally block from closing the file
+ // as the returned TarReader will take care of that
+ access = null;
+ return new SegmentTarReader(file, random, index, ioMonitor);
+ }
+ } finally {
+ if (access != null) {
+ access.close();
+ }
+ }
+ }
+
+ @Override
+ public SegmentArchiveWriter create(String archiveName) {
+ return new SegmentTarWriter(new File(segmentstoreDir, archiveName), fileStoreMonitor, ioMonitor);
+ }
+
+ @Override
+ public boolean delete(String archiveName) {
+ try {
+ return Files.deleteIfExists(new File(segmentstoreDir, archiveName).toPath());
+ } catch (IOException e) {
+ log.error("Can't remove archive {}", archiveName, e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean renameTo(String from, String to) {
+ try {
+ Files.move(new File(segmentstoreDir, from).toPath(), new File(segmentstoreDir, to).toPath());
+ return true;
+ } catch (IOException e) {
+ log.error("Can't move archive {} to {}", from, to, e);
+ return false;
+ }
+ }
+
+ @Override
+ public void copyFile(String from, String to) throws IOException {
+ Files.copy(new File(segmentstoreDir, from).toPath(), new File(segmentstoreDir, to).toPath());
+ }
+
+ @Override
+ public boolean exists(String archiveName) {
+ return new File(segmentstoreDir, archiveName).exists();
+ }
+
+ @Override
+ public void recoverEntries(String archiveName, LinkedHashMap<UUID, byte[]> entries) throws IOException {
+ File file = new File(segmentstoreDir, archiveName);
+ RandomAccessFile access = new RandomAccessFile(file, "r");
+ try {
+ recoverEntries(file, access, entries);
+ } finally {
+ access.close();
+ }
+ }
+
+ /**
+ * Scans through the tar file, looking for all segment entries.
+ *
+ * @param file The path of the TAR file.
+ * @param access The contents of the TAR file.
+ * @param entries The map that will contain the recovered entries. The
+ * entries are inserted in the {@link LinkedHashMap} in the
+ * order they appear in the TAR file.
+ */
+ private static void recoverEntries(File file, RandomAccessFile access, LinkedHashMap<UUID, byte[]> entries) throws IOException {
+ byte[] header = new byte[BLOCK_SIZE];
+ while (access.getFilePointer() + BLOCK_SIZE <= access.length()) {
+ // read the tar header block
+ access.readFully(header);
+
+ // compute the header checksum
+ int sum = 0;
+ for (int i = 0; i < BLOCK_SIZE; i++) {
+ sum += header[i] & 0xff;
+ }
+
+
+ // identify possible zero block
+ if (sum == 0 && access.getFilePointer() + 2 * BLOCK_SIZE == access.length()) {
+ return; // found the zero blocks at the end of the file
+ }
+
+ // replace the actual stored checksum with spaces for comparison
+ for (int i = 148; i < 148 + 8; i++) {
+ sum -= header[i] & 0xff;
+ sum += ' ';
+ }
+
+ byte[] checkbytes = String.format("%06o\0 ", sum).getBytes(UTF_8);
+ for (int i = 0; i < checkbytes.length; i++) {
+ if (checkbytes[i] != header[148 + i]) {
+ log.warn("Invalid entry checksum at offset {} in tar file {}, skipping...",
+ access.getFilePointer() - BLOCK_SIZE, file);
+ }
+ }
+
+ // The header checksum passes, so read the entry name and size
+ ByteBuffer buffer = wrap(header);
+ String name = readString(buffer, 100);
+ buffer.position(124);
+ int size = readNumber(buffer, 12);
+ if (access.getFilePointer() + size > access.length()) {
+ // checksum was correct, so the size field should be accurate
+ log.warn("Partial entry {} in tar file {}, ignoring...", name, file);
+ return;
+ }
+
+ Matcher matcher = NAME_PATTERN.matcher(name);
+ if (matcher.matches()) {
+ UUID id = UUID.fromString(matcher.group(1));
+
+ String checksum = matcher.group(3);
+ if (checksum != null || !entries.containsKey(id)) {
+ byte[] data = new byte[size];
+ access.readFully(data);
+
+ // skip possible padding to stay at block boundaries
+ long position = access.getFilePointer();
+ long remainder = position % BLOCK_SIZE;
+ if (remainder != 0) {
+ access.seek(position + (BLOCK_SIZE - remainder));
+ }
+
+ if (checksum != null) {
+ CRC32 crc = new CRC32();
+ crc.update(data);
+ if (crc.getValue() != Long.parseLong(checksum, 16)) {
+ log.warn("Checksum mismatch in entry {} of tar file {}, skipping...",
+ name, file);
+ continue;
+ }
+ }
+
+ entries.put(id, data);
+ }
+ } else if (!name.equals(file.getName() + ".idx")) {
+ log.warn("Unexpected entry {} in tar file {}, skipping...",
+ name, file);
+ long position = access.getFilePointer() + size;
+ long remainder = position % BLOCK_SIZE;
+ if (remainder != 0) {
+ position += BLOCK_SIZE - remainder;
+ }
+ access.seek(position);
+ }
+ }
+ }
+
+ private static String readString(ByteBuffer buffer, int fieldSize) {
+ byte[] b = new byte[fieldSize];
+ buffer.get(b);
+ int n = 0;
+ while (n < fieldSize && b[n] != 0) {
+ n++;
+ }
+ return new String(b, 0, n, UTF_8);
+ }
+
+ private static int readNumber(ByteBuffer buffer, int fieldSize) {
+ byte[] b = new byte[fieldSize];
+ buffer.get(b);
+ int number = 0;
+ for (int i = 0; i < fieldSize; i++) {
+ int digit = b[i] & 0xff;
+ if ('0' <= digit && digit <= '7') {
+ number = number * 8 + digit - '0';
+ } else {
+ break;
+ }
+ }
+ return number;
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java?rev=1824115&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarReader.java Tue Feb 13 11:17:42 2018
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.file.tar;
+
+import com.google.common.base.Stopwatch;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex;
+import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader;
+import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException;
+import org.apache.jackrabbit.oak.segment.file.tar.index.Index;
+import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry;
+import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader;
+import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException;
+import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.jackrabbit.oak.segment.file.tar.SegmentTarWriter.getPaddingSize;
+import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE;
+import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader;
+
+public class SegmentTarReader implements SegmentArchiveManager.SegmentArchiveReader {
+
+ private static final Logger log = LoggerFactory.getLogger(SegmentTarReader.class);
+
+ private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE);
+
+ private final FileAccess access;
+
+ private final File file;
+
+ private final IOMonitor ioMonitor;
+
+ private final String name;
+
+ private final Index index;
+
+ private volatile Boolean hasGraph;
+
+ public SegmentTarReader(File file, FileAccess access, Index index, IOMonitor ioMonitor) {
+ this.access = access;
+ this.file = file;
+ this.index = index;
+ this.name = file.getName();
+ this.ioMonitor = ioMonitor;
+ }
+
+ @Override
+ public ByteBuffer readSegment(long msb, long lsb) throws IOException {
+ int i = index.findEntry(msb, lsb);
+ if (i == -1) {
+ return null;
+ }
+ IndexEntry indexEntry = index.entry(i);
+ ioMonitor.beforeSegmentRead(file, msb, lsb, indexEntry.getLength());
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ ByteBuffer buffer = access.read(indexEntry.getPosition(), indexEntry.getLength());
+ long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
+ ioMonitor.afterSegmentRead(file, msb, lsb, indexEntry.getLength(), elapsed);
+ return buffer;
+ }
+
+ @Override
+ public Index getIndex() {
+ return index;
+ }
+
+ public static Index loadAndValidateIndex(RandomAccessFile file, String name) throws IOException {
+ long length = file.length();
+ if (length % BLOCK_SIZE != 0) {
+ log.warn("Unable to load index of file {}: Invalid alignment", name);
+ return null;
+ }
+ if (length < 6 * BLOCK_SIZE) {
+ log.warn("Unable to load index of file {}: File too short", name);
+ return null;
+ }
+ if (length > Integer.MAX_VALUE) {
+ log.warn("Unable to load index of file {}: File too long", name);
+ return null;
+ }
+ ReaderAtEnd r = (whence, size) -> {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ file.seek(length - 2 * BLOCK_SIZE - whence);
+ file.readFully(buffer.array());
+ return buffer;
+ };
+ try {
+ return indexLoader.loadIndex(r);
+ } catch (InvalidIndexException e) {
+ log.warn("Unable to load index of file {}: {}", name, e.getMessage());
+ }
+ return null;
+ }
+
+ @Override
+ public Map<UUID, List<UUID>> getGraph() throws IOException {
+ ByteBuffer graph = loadGraph();
+ if (graph == null) {
+ return null;
+ } else {
+ return GraphLoader.parseGraph(graph);
+ }
+ }
+
+ @Override
+ public boolean hasGraph() {
+ if (hasGraph == null) {
+ try {
+ loadGraph();
+ } catch (IOException ignore) { }
+ }
+ return hasGraph;
+ }
+
+ private ByteBuffer loadGraph() throws IOException {
+ int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize();
+ ByteBuffer graph = GraphLoader.loadGraph((whence, amount) -> access.read(end - whence, amount));
+ hasGraph = graph != null;
+ return graph;
+ }
+
+ @Override
+ public BinaryReferencesIndex getBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException {
+ int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize() - getGraphEntrySize();
+ return BinaryReferencesIndexLoader.loadBinaryReferencesIndex((whence, size) -> access.read(end - whence, size));
+ }
+
+ @Override
+ public long length() {
+ return file.length();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void close() throws IOException {
+ access.close();
+ }
+
+ @Override
+ public int getEntrySize(int size) {
+ return BLOCK_SIZE + size + getPaddingSize(size);
+ }
+
+ private int getIndexEntrySize() {
+ return getEntrySize(index.size());
+ }
+
+ private int getGraphEntrySize() {
+ ByteBuffer buffer;
+
+ try {
+ buffer = loadGraph();
+ } catch (IOException e) {
+ log.warn("Exception while loading pre-compiled tar graph", e);
+ return 0;
+ }
+
+ if (buffer == null) {
+ return 0;
+ }
+
+ return getEntrySize(buffer.getInt(buffer.limit() - 8));
+ }
+
+
+}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java?rev=1824115&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/SegmentTarWriter.java Tue Feb 13 11:17:42 2018
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.file.tar;
+
+import com.google.common.base.Stopwatch;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE;
+
+public class SegmentTarWriter implements SegmentArchiveManager.SegmentArchiveWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(SegmentTarWriter.class);
+
+ private static final byte[] ZERO_BYTES = new byte[BLOCK_SIZE];
+
+ private final FileStoreMonitor monitor;
+
+ /**
+ * The file being written. This instance is also used as an additional
+ * synchronization point by {@link #flush()} and {@link #close()} to
+ * allow {@link #flush()} to work concurrently with normal reads and
+ * writes, but not with a concurrent {@link #close()}.
+ */
+ private final File file;
+
+ private final IOMonitor ioMonitor;
+
+ /**
+ * File handle. Initialized lazily in {@link #writeSegment(long, long, byte[], int, int, GCGeneration)}
+ * to avoid creating an extra empty file when just reading from the repository.
+ * Should only be accessed from synchronized code.
+ */
+ private RandomAccessFile access = null;
+
+ private FileChannel channel = null;
+
+ private volatile long length;
+
+ public SegmentTarWriter(File file, FileStoreMonitor monitor, IOMonitor ioMonitor) {
+ this.file = file;
+ this.monitor = monitor;
+ this.ioMonitor = ioMonitor;
+ }
+
+ @Override
+ public TarEntry writeSegment(long msb, long lsb, byte[] data, int offset, int size, GCGeneration generation) throws IOException {
+ UUID uuid = new UUID(msb, lsb);
+ CRC32 checksum = new CRC32();
+ checksum.update(data, offset, size);
+ String entryName = String.format("%s.%08x", uuid, checksum.getValue());
+ byte[] header = newEntryHeader(entryName, size);
+
+ log.debug("Writing segment {} to {}", uuid, file);
+
+ if (access == null) {
+ access = new RandomAccessFile(file, "rw");
+ channel = access.getChannel();
+ }
+
+ int padding = getPaddingSize(size);
+
+ long initialLength = access.getFilePointer();
+
+ access.write(header);
+
+ long dataOffset = access.getFilePointer();
+
+ ioMonitor.beforeSegmentWrite(file, msb, lsb, size);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ access.write(data, offset, size);
+ ioMonitor.afterSegmentWrite(file, msb, lsb, size, stopwatch.elapsed(TimeUnit.NANOSECONDS));
+
+ if (padding > 0) {
+ access.write(ZERO_BYTES, 0, padding);
+ }
+
+ long currentLength = access.getFilePointer();
+ monitor.written(currentLength - initialLength);
+
+ length = currentLength;
+
+ return new TarEntry(msb, lsb, (int) dataOffset, size, generation);
+ }
+
+ @Override
+ public ByteBuffer readSegment(TarEntry tarEntry) throws IOException {
+ checkState(channel != null); // implied by entry != null
+ ByteBuffer data = ByteBuffer.allocate(tarEntry.size());
+ channel.read(data, tarEntry.offset());
+ data.rewind();
+ return data;
+ }
+
+ @Override
+ public void writeIndex(byte[] data) throws IOException {
+ byte[] header = newEntryHeader(file.getName() + ".idx", data.length);
+ access.write(header);
+ access.write(data);
+ monitor.written(header.length + data.length);
+
+ length = access.getFilePointer();
+ }
+
+ @Override
+ public void writeGraph(byte[] data) throws IOException {
+ int paddingSize = getPaddingSize(data.length);
+ byte[] header = newEntryHeader(file.getName() + ".gph", data.length + paddingSize);
+ access.write(header);
+ if (paddingSize > 0) {
+ access.write(ZERO_BYTES, 0, paddingSize);
+ }
+ access.write(data);
+ monitor.written(header.length + paddingSize + data.length);
+
+ length = access.getFilePointer();
+ }
+
+ @Override
+ public void writeBinaryReferences(byte[] data) throws IOException {
+ int paddingSize = getPaddingSize(data.length);
+ byte[] header = newEntryHeader(file.getName() + ".brf", data.length + paddingSize);
+ access.write(header);
+ if (paddingSize > 0) {
+ access.write(ZERO_BYTES, 0, paddingSize);
+ }
+ access.write(data);
+ monitor.written(header.length + paddingSize + data.length);
+
+ length = access.getFilePointer();
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public void close() throws IOException {
+ access.write(ZERO_BYTES);
+ access.write(ZERO_BYTES);
+ access.close();
+
+ monitor.written(BLOCK_SIZE * 2);
+ }
+
+ @Override
+ public boolean isCreated() {
+ return access != null;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ access.getFD().sync();
+ }
+
+ @Override
+ public String getName() {
+ return file.getName();
+ }
+
+ private static byte[] newEntryHeader(String name, int size) {
+ byte[] header = new byte[BLOCK_SIZE];
+
+ // File name
+ byte[] nameBytes = name.getBytes(UTF_8);
+ System.arraycopy(
+ nameBytes, 0, header, 0, Math.min(nameBytes.length, 100));
+
+ // File mode
+ System.arraycopy(
+ String.format("%07o", 0400).getBytes(UTF_8), 0,
+ header, 100, 7);
+
+ // User's numeric user ID
+ System.arraycopy(
+ String.format("%07o", 0).getBytes(UTF_8), 0,
+ header, 108, 7);
+
+ // Group's numeric user ID
+ System.arraycopy(
+ String.format("%07o", 0).getBytes(UTF_8), 0,
+ header, 116, 7);
+
+ // File size in bytes (octal basis)
+ System.arraycopy(
+ String.format("%011o", size).getBytes(UTF_8), 0,
+ header, 124, 11);
+
+ // Last modification time in numeric Unix time format (octal)
+ long time = System.currentTimeMillis() / 1000;
+ System.arraycopy(
+ String.format("%011o", time).getBytes(UTF_8), 0,
+ header, 136, 11);
+
+ // Checksum for header record
+ System.arraycopy(
+ new byte[] {' ', ' ', ' ', ' ', ' ', ' ', ' ', ' '}, 0,
+ header, 148, 8);
+
+ // Type flag
+ header[156] = '0';
+
+ // Compute checksum
+ int checksum = 0;
+ for (byte aHeader : header) {
+ checksum += aHeader & 0xff;
+ }
+ System.arraycopy(
+ String.format("%06o\0 ", checksum).getBytes(UTF_8), 0,
+ header, 148, 8);
+
+ return header;
+ }
+
+ static int getPaddingSize(int size) {
+ int remainder = size % BLOCK_SIZE;
+ if (remainder > 0) {
+ return BLOCK_SIZE - remainder;
+ } else {
+ return 0;
+ }
+ }
+}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java?rev=1824115&r1=1824114&r2=1824115&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarConstants.java Tue Feb 13 11:17:42 2018
@@ -17,7 +17,7 @@
package org.apache.jackrabbit.oak.segment.file.tar;
-class TarConstants {
+public class TarConstants {
private TarConstants() {
// Prevent instantiation.
@@ -42,11 +42,11 @@ class TarConstants {
* (size, checksum, the number of UUIDs).</li>
* </ul>
*/
- static final int GRAPH_MAGIC = ('\n' << 24) + ('0' << 16) + ('G' << 8) + '\n';
+ public static final int GRAPH_MAGIC = ('\n' << 24) + ('0' << 16) + ('G' << 8) + '\n';
/**
* The tar file block size.
*/
- static final int BLOCK_SIZE = 512;
+ public static final int BLOCK_SIZE = 512;
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java?rev=1824115&r1=1824114&r2=1824115&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarEntry.java Tue Feb 13 11:17:42 2018
@@ -24,7 +24,7 @@ import java.util.Comparator;
* A file entry location in a tar file. This is used for the index with a tar
* file.
*/
-class TarEntry {
+public class TarEntry {
/** Size in bytes a tar entry takes up in the tar file */
static final int SIZE = 33;
@@ -52,7 +52,7 @@ class TarEntry {
private final GCGeneration generation;
- TarEntry(long msb, long lsb, int offset, int size, GCGeneration generation) {
+ public TarEntry(long msb, long lsb, int offset, int size, GCGeneration generation) {
this.msb = msb;
this.lsb = lsb;
this.offset = offset;
@@ -60,19 +60,19 @@ class TarEntry {
this.generation = generation;
}
- long msb() {
+ public long msb() {
return msb;
}
- long lsb() {
+ public long lsb() {
return lsb;
}
- int offset() {
+ public int offset() {
return offset;
}
- int size() {
+ public int size() {
return size;
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java?rev=1824115&r1=1824114&r2=1824115&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarFiles.java Tue Feb 13 11:17:42 2018
@@ -24,7 +24,6 @@ import static com.google.common.collect.
import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
import static java.util.Collections.emptySet;
-import static org.apache.commons.io.FileUtils.listFiles;
import java.io.Closeable;
import java.io.File;
@@ -52,6 +51,9 @@ import javax.annotation.Nonnull;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.file.FileReaper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,7 +78,7 @@ public class TarFiles implements Closeab
private long reclaimedSize;
- private List<File> removableFiles;
+ private List<String> removableFiles;
private Set<UUID> reclaimedSegmentIds;
@@ -88,7 +90,7 @@ public class TarFiles implements Closeab
return reclaimedSize;
}
- public List<File> getRemovableFiles() {
+ public List<String> getRemovableFiles() {
return removableFiles;
}
@@ -118,6 +120,8 @@ public class TarFiles implements Closeab
private boolean readOnly;
+ private SegmentNodeStorePersistence persistence;
+
private Builder() {
// Prevent external instantiation.
}
@@ -158,15 +162,54 @@ public class TarFiles implements Closeab
return this;
}
+ public Builder withPersistence(SegmentNodeStorePersistence persistence) {
+ this.persistence = persistence;
+ return this;
+ }
+
public TarFiles build() throws IOException {
checkState(directory != null, "Directory not specified");
checkState(tarRecovery != null, "TAR recovery strategy not specified");
checkState(ioMonitor != null, "I/O monitor not specified");
checkState(readOnly || fileStoreMonitor != null, "File store statistics not specified");
checkState(readOnly || maxFileSize != 0, "Max file size not specified");
+ if (persistence == null) {
+ persistence = new TarPersistence(directory);
+ }
return new TarFiles(this);
}
+ public File getDirectory() {
+ return directory;
+ }
+
+ public boolean isMemoryMapping() {
+ return memoryMapping;
+ }
+
+ public TarRecovery getTarRecovery() {
+ return tarRecovery;
+ }
+
+ public IOMonitor getIoMonitor() {
+ return ioMonitor;
+ }
+
+ public FileStoreMonitor getFileStoreMonitor() {
+ return fileStoreMonitor;
+ }
+
+ public long getMaxFileSize() {
+ return maxFileSize;
+ }
+
+ public boolean isReadOnly() {
+ return readOnly;
+ }
+
+ private SegmentArchiveManager buildArchiveManager() throws IOException {
+ return persistence.createArchiveManager(memoryMapping, ioMonitor, readOnly && fileStoreMonitor == null ? new FileStoreMonitorAdapter() : fileStoreMonitor);
+ }
}
private static final Logger log = LoggerFactory.getLogger(TarFiles.class);
@@ -218,13 +261,13 @@ public class TarFiles implements Closeab
};
}
- private static Map<Integer, Map<Character, File>> collectFiles(File directory) {
- Map<Integer, Map<Character, File>> dataFiles = newHashMap();
- for (File file : listFiles(directory, null, false)) {
- Matcher matcher = FILE_NAME_PATTERN.matcher(file.getName());
+ private static Map<Integer, Map<Character, String>> collectFiles(SegmentArchiveManager archiveManager) throws IOException {
+ Map<Integer, Map<Character, String>> dataFiles = newHashMap();
+ for (String file : archiveManager.listArchives()) {
+ Matcher matcher = FILE_NAME_PATTERN.matcher(file);
if (matcher.matches()) {
Integer index = Integer.parseInt(matcher.group(2));
- Map<Character, File> files = dataFiles.get(index);
+ Map<Character, String> files = dataFiles.get(index);
if (files == null) {
files = newHashMap();
dataFiles.put(index, files);
@@ -245,9 +288,7 @@ public class TarFiles implements Closeab
private final long maxFileSize;
- private final boolean memoryMapping;
-
- private final IOMonitor ioMonitor;
+ private SegmentArchiveManager archiveManager;
/**
* Guards access to the {@link #readers} and {@link #writer} references.
@@ -281,9 +322,9 @@ public class TarFiles implements Closeab
private TarFiles(Builder builder) throws IOException {
maxFileSize = builder.maxFileSize;
- memoryMapping = builder.memoryMapping;
- ioMonitor = builder.ioMonitor;
- Map<Integer, Map<Character, File>> map = collectFiles(builder.directory);
+ archiveManager = builder.buildArchiveManager();
+
+ Map<Integer, Map<Character, String>> map = collectFiles(archiveManager);
Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
Arrays.sort(indices);
@@ -295,9 +336,9 @@ public class TarFiles implements Closeab
for (Integer index : indices) {
TarReader r;
if (builder.readOnly) {
- r = TarReader.openRO(map.get(index), memoryMapping, builder.tarRecovery, ioMonitor);
+ r = TarReader.openRO(map.get(index), builder.tarRecovery, archiveManager);
} else {
- r = TarReader.open(map.get(index), memoryMapping, builder.tarRecovery, ioMonitor);
+ r = TarReader.open(map.get(index), builder.tarRecovery, archiveManager);
}
readers = new Node(r, readers);
}
@@ -308,10 +349,9 @@ public class TarFiles implements Closeab
if (indices.length > 0) {
writeNumber = indices[indices.length - 1] + 1;
}
- writer = new TarWriter(builder.directory, builder.fileStoreMonitor, writeNumber, builder.ioMonitor);
+ writer = new TarWriter(archiveManager, writeNumber);
}
- @Override
public void close() throws IOException {
shutdown = true;
@@ -510,7 +550,7 @@ public class TarFiles implements Closeab
if (newWriter == writer) {
return;
}
- readers = new Node(TarReader.open(writer.getFile(), memoryMapping, ioMonitor), readers);
+ readers = new Node(TarReader.open(writer.getFileName(), archiveManager), readers);
writer = newWriter;
}
@@ -657,7 +697,7 @@ public class TarFiles implements Closeab
} catch (IOException e) {
log.warn("Unable to close swept TAR reader", e);
}
- result.removableFiles.add(closeable.getFile());
+ result.removableFiles.add(closeable.getFileName());
}
return result;
@@ -711,7 +751,7 @@ public class TarFiles implements Closeab
Map<UUID, List<UUID>> graph = null;
for (TarReader reader : iterable(head)) {
- if (fileName.equals(reader.getFile().getName())) {
+ if (fileName.equals(reader.getFileName())) {
index = reader.getUUIDs();
graph = reader.getGraph();
break;
@@ -744,9 +784,12 @@ public class TarFiles implements Closeab
Map<String, Set<UUID>> index = new HashMap<>();
for (TarReader reader : iterable(head)) {
- index.put(reader.getFile().getName(), reader.getUUIDs());
+ index.put(reader.getFileName(), reader.getUUIDs());
}
return index;
}
+ public FileReaper createFileReaper() {
+ return new FileReaper(archiveManager);
+ }
}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java?rev=1824115&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarPersistence.java Tue Feb 13 11:17:42 2018
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.segment.file.tar;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.file.GCJournal;
+import org.apache.jackrabbit.oak.segment.file.LocalGCJournalFile;
+import org.apache.jackrabbit.oak.segment.file.LocalManifestFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.Collection;
+
+public class TarPersistence implements SegmentNodeStorePersistence {
+
+ private static final String LOCK_FILE_NAME = "repo.lock";
+
+ private static final String GC_JOURNAL = "gc.log";
+
+ private static final String MANIFEST_FILE_NAME = "manifest";
+
+ private static final String JOURNAL_FILE_NAME = "journal.log";
+
+ private final File directory;
+
+ public TarPersistence(File directory) {
+ this.directory = directory;
+ }
+
+ @Override
+ public SegmentArchiveManager createArchiveManager(boolean memoryMapping, IOMonitor ioMonitor, FileStoreMonitor fileStoreMonitor) {
+ return new SegmentTarManager(directory, fileStoreMonitor, ioMonitor, memoryMapping);
+ }
+
+ @Override
+ public boolean segmentFilesExist() {
+ Collection<File> entries = FileUtils.listFiles(directory, new String[] {"tar"}, false);
+ return !entries.isEmpty();
+ }
+
+ @Override
+ public JournalFile getJournalFile() {
+ return new LocalJournalFile(directory, JOURNAL_FILE_NAME);
+ }
+
+ @Override
+ public GCJournalFile getGCJournalFile() {
+ return new LocalGCJournalFile(directory, GC_JOURNAL);
+ }
+
+ @Override
+ public ManifestFile getManifestFile() {
+ return new LocalManifestFile(directory, MANIFEST_FILE_NAME);
+ }
+
+ @Override
+ public RepositoryLock lockRepository() throws IOException {
+ RandomAccessFile lockFile = new RandomAccessFile(new File(directory, LOCK_FILE_NAME), "rw");
+ try {
+ FileLock lock = lockFile.getChannel().lock();
+ return () -> {
+ lock.release();
+ lockFile.close();
+ };
+ } catch (OverlappingFileLockException ex) {
+ throw new IllegalStateException(directory.getAbsolutePath() + " is in use by another store.", ex);
+ }
+ }
+
+}
\ No newline at end of file
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java?rev=1824115&r1=1824114&r2=1824115&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/tar/TarReader.java Tue Feb 13 11:17:42 2018
@@ -19,24 +19,16 @@
package org.apache.jackrabbit.oak.segment.file.tar;
-import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
import static com.google.common.collect.Maps.newLinkedHashMap;
import static com.google.common.collect.Maps.newTreeMap;
import static com.google.common.collect.Sets.newHashSet;
-import static java.nio.ByteBuffer.wrap;
import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.segment.file.tar.GCGeneration.newGCGeneration;
-import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.BLOCK_SIZE;
-import static org.apache.jackrabbit.oak.segment.file.tar.TarConstants.GRAPH_MAGIC;
-import static org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader.newIndexLoader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
@@ -47,48 +39,25 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.CRC32;
import javax.annotation.Nonnull;
import com.google.common.base.Predicate;
-import com.google.common.base.Stopwatch;
-import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.segment.SegmentArchiveManager;
import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndex;
-import org.apache.jackrabbit.oak.segment.file.tar.binaries.BinaryReferencesIndexLoader;
import org.apache.jackrabbit.oak.segment.file.tar.binaries.InvalidBinaryReferencesIndexException;
import org.apache.jackrabbit.oak.segment.file.tar.index.Index;
import org.apache.jackrabbit.oak.segment.file.tar.index.IndexEntry;
-import org.apache.jackrabbit.oak.segment.file.tar.index.IndexLoader;
-import org.apache.jackrabbit.oak.segment.file.tar.index.InvalidIndexException;
-import org.apache.jackrabbit.oak.segment.util.ReaderAtEnd;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class TarReader implements Closeable {
+public class TarReader implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TarReader.class);
- private static final IndexLoader indexLoader = newIndexLoader(BLOCK_SIZE);
-
- /**
- * Pattern of the segment entry names. Note the trailing (\\..*)? group
- * that's included for compatibility with possible future extensions.
- */
- private static final Pattern NAME_PATTERN = Pattern.compile(
- "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
- + "(\\.([0-9a-f]{8}))?(\\..*)?");
-
- private static int getEntrySize(int size) {
- return BLOCK_SIZE + size + TarWriter.getPaddingSize(size);
- }
-
- static TarReader open(File file, boolean memoryMapping, IOMonitor ioMonitor) throws IOException {
- TarReader reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor);
+ static TarReader open(String file, SegmentArchiveManager archiveManager) throws IOException {
+ TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager);
if (reader != null) {
return reader;
} else {
@@ -107,21 +76,17 @@ class TarReader implements Closeable {
* generations.
*
* @param files The generations of the same TAR file.
- * @param memoryMapping If {@code true}, opens the TAR file with memory
- * mapping enabled.
* @param recovery Strategy for recovering a damaged TAR file.
- * @param ioMonitor Callbacks to track internal operations for the open
- * TAR file.
* @return An instance of {@link TarReader}.
*/
- static TarReader open(Map<Character, File> files, boolean memoryMapping, TarRecovery recovery, IOMonitor ioMonitor) throws IOException {
- SortedMap<Character, File> sorted = newTreeMap();
+ static TarReader open(Map<Character, String> files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException {
+ SortedMap<Character, String> sorted = newTreeMap();
sorted.putAll(files);
- List<File> list = newArrayList(sorted.values());
+ List<String> list = newArrayList(sorted.values());
Collections.reverse(list);
- TarReader reader = openFirstFileWithValidIndex(list, memoryMapping, ioMonitor);
+ TarReader reader = openFirstFileWithValidIndex(list, archiveManager);
if (reader != null) {
return reader;
}
@@ -129,15 +94,15 @@ class TarReader implements Closeable {
// no generation has a valid index, so recover as much as we can
log.warn("Could not find a valid tar index in {}, recovering...", list);
LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap();
- for (File file : sorted.values()) {
- collectFileEntries(file, entries, true);
+ for (String file : sorted.values()) {
+ collectFileEntries(file, entries, true, archiveManager);
}
// regenerate the first generation based on the recovered data
- File file = sorted.values().iterator().next();
- generateTarFile(entries, file, recovery, ioMonitor);
+ String file = sorted.values().iterator().next();
+ generateTarFile(entries, file, recovery, archiveManager);
- reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor);
+ reader = openFirstFileWithValidIndex(singletonList(file), archiveManager);
if (reader != null) {
return reader;
} else {
@@ -145,11 +110,11 @@ class TarReader implements Closeable {
}
}
- static TarReader openRO(Map<Character, File> files, boolean memoryMapping, TarRecovery recovery, IOMonitor ioMonitor) throws IOException {
+ static TarReader openRO(Map<Character, String> files, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException {
// for readonly store only try the latest generation of a given
// tar file to prevent any rollback or rewrite
- File file = files.get(Collections.max(files.keySet()));
- TarReader reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor);
+ String file = files.get(Collections.max(files.keySet()));
+ TarReader reader = openFirstFileWithValidIndex(singletonList(file), archiveManager);
if (reader != null) {
return reader;
}
@@ -157,10 +122,10 @@ class TarReader implements Closeable {
// collecting the entries (without touching the original file) and
// writing them into an artificial tar file '.ro.bak'
LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap();
- collectFileEntries(file, entries, false);
- file = findAvailGen(file, ".ro.bak");
- generateTarFile(entries, file, recovery, ioMonitor);
- reader = openFirstFileWithValidIndex(singletonList(file), memoryMapping, ioMonitor);
+ collectFileEntries(file, entries, false, archiveManager);
+ file = findAvailGen(file, ".ro.bak", archiveManager);
+ generateTarFile(entries, file, recovery, archiveManager);
+ reader = openFirstFileWithValidIndex(singletonList(file), archiveManager);
if (reader != null) {
return reader;
}
@@ -176,21 +141,16 @@ class TarReader implements Closeable {
* into.
* @param backup If {@code true}, performs a backup of the TAR file.
*/
- private static void collectFileEntries(File file, LinkedHashMap<UUID, byte[]> entries, boolean backup) throws IOException {
+ private static void collectFileEntries(String file, LinkedHashMap<UUID, byte[]> entries, boolean backup, SegmentArchiveManager archiveManager) throws IOException {
log.info("Recovering segments from tar file {}", file);
try {
- RandomAccessFile access = new RandomAccessFile(file, "r");
- try {
- recoverEntries(file, access, entries);
- } finally {
- access.close();
- }
+ archiveManager.recoverEntries(file, entries);
} catch (IOException e) {
log.warn("Could not read tar file {}, skipping...", file, e);
}
if (backup) {
- backupSafely(file);
+ backupSafely(archiveManager, file);
}
}
@@ -202,12 +162,11 @@ class TarReader implements Closeable {
* @param file The output file that will contain the recovered
* entries.
* @param recovery The recovery strategy to execute.
- * @param ioMonitor An instance of {@link IOMonitor}.
*/
- private static void generateTarFile(LinkedHashMap<UUID, byte[]> entries, File file, TarRecovery recovery, IOMonitor ioMonitor) throws IOException {
+ private static void generateTarFile(LinkedHashMap<UUID, byte[]> entries, String file, TarRecovery recovery, SegmentArchiveManager archiveManager) throws IOException {
log.info("Regenerating tar file {}", file);
- try (TarWriter writer = new TarWriter(file, ioMonitor)) {
+ try (TarWriter writer = new TarWriter(archiveManager, file)) {
for (Entry<UUID, byte[]> entry : entries.entrySet()) {
try {
recovery.recoverEntry(entry.getKey(), entry.getValue(), new EntryRecovery() {
@@ -242,13 +201,13 @@ class TarReader implements Closeable {
*
* @param file File to backup.
*/
- private static void backupSafely(File file) throws IOException {
- File backup = findAvailGen(file, ".bak");
- log.info("Backing up {} to {}", file, backup.getName());
- if (!file.renameTo(backup)) {
+ private static void backupSafely(SegmentArchiveManager archiveManager, String file) throws IOException {
+ String backup = findAvailGen(file, ".bak", archiveManager);
+ log.info("Backing up {} to {}", file, backup);
+ if (!archiveManager.renameTo(file, backup)) {
log.warn("Renaming failed, so using copy to backup {}", file);
- FileUtils.copyFile(file, backup);
- if (!file.delete()) {
+ archiveManager.copyFile(file, backup);
+ if (!archiveManager.delete(file)) {
throw new IOException(
"Could not remove broken tar file " + file);
}
@@ -259,62 +218,29 @@ class TarReader implements Closeable {
* Fine next available generation number so that a generated file doesn't
* overwrite another existing file.
*
- * @param file The file to backup.
+ * @param name The file to backup.
* @param ext The extension of the backed up file.
*/
- private static File findAvailGen(File file, String ext) {
- File parent = file.getParentFile();
- String name = file.getName();
- File backup = new File(parent, name + ext);
- for (int i = 2; backup.exists(); i++) {
- backup = new File(parent, name + "." + i + ext);
+ private static String findAvailGen(String name, String ext, SegmentArchiveManager archiveManager) {
+ String backup = name + ext;
+ for (int i = 2; archiveManager.exists(backup); i++) {
+ backup = name + "." + i + ext;
}
return backup;
}
- private static TarReader openFirstFileWithValidIndex(List<File> files, boolean memoryMapping, IOMonitor ioMonitor) {
- for (File file : files) {
- String name = file.getName();
+ private static TarReader openFirstFileWithValidIndex(List<String> archives, SegmentArchiveManager archiveManager) {
+ for (String name : archives) {
try {
- RandomAccessFile access = new RandomAccessFile(file, "r");
- try {
- Index index = loadAndValidateIndex(access, name);
- if (index == null) {
- log.info("No index found in tar file {}, skipping...", name);
- } else {
- // found a file with a valid index, drop the others
- for (File other : files) {
- if (other != file) {
- log.info("Removing unused tar file {}", other.getName());
- other.delete();
- }
- }
-
- if (memoryMapping) {
- try {
- FileAccess mapped = new FileAccess.Mapped(access);
- return new TarReader(file, mapped, index, ioMonitor);
- } catch (IOException e) {
- log.warn("Failed to mmap tar file {}. Falling back to normal file " +
- "IO, which will negatively impact repository performance. " +
- "This problem may have been caused by restrictions on the " +
- "amount of virtual memory available to the JVM. Please make " +
- "sure that a 64-bit JVM is being used and that the process " +
- "has access to unlimited virtual memory (ulimit option -v).",
- name, e);
- }
+ SegmentArchiveManager.SegmentArchiveReader reader = archiveManager.open(name);
+ if (reader != null) {
+ for (String other : archives) {
+ if (other != name) {
+ log.info("Removing unused tar file {}", other);
+ archiveManager.delete(other);
}
-
- FileAccess random = new FileAccess.Random(access);
- // prevent the finally block from closing the file
- // as the returned TarReader will take care of that
- access = null;
- return new TarReader(file, random, index, ioMonitor);
- }
- } finally {
- if (access != null) {
- access.close();
}
+ return new TarReader(archiveManager, reader);
}
} catch (IOException e) {
log.warn("Could not read tar file {}, skipping...", name, e);
@@ -324,160 +250,22 @@ class TarReader implements Closeable {
return null;
}
- /**
- * Tries to read an existing index from the given tar file. The index is
- * returned if it is found and looks valid (correct checksum, passes sanity
- * checks).
- *
- * @param file The TAR file.
- * @param name Name of the TAR file, for logging purposes.
- * @return An instance of {@link ByteBuffer} populated with the content of
- * the index. If the TAR doesn't contain any index, {@code null} is returned
- * instead.
- */
- private static Index loadAndValidateIndex(RandomAccessFile file, String name) throws IOException {
- long length = file.length();
-
- if (length % BLOCK_SIZE != 0) {
- log.warn("Unable to load index of file {}: Invalid alignment", name);
- return null;
- }
- if (length < 6 * BLOCK_SIZE) {
- log.warn("Unable to load index of file {}: File too short", name);
- return null;
- }
- if (length > Integer.MAX_VALUE) {
- log.warn("Unable to load index of file {}: File too long", name);
- return null;
- }
-
- ReaderAtEnd r = (whence, size) -> {
- ByteBuffer buffer = ByteBuffer.allocate(size);
- file.seek(length - 2 * BLOCK_SIZE - whence);
- file.readFully(buffer.array());
- return buffer;
- };
-
- try {
- return indexLoader.loadIndex(r);
- } catch (InvalidIndexException e) {
- log.warn("Unable to load index of file {}: {}", name, e.getMessage());
- }
-
- return null;
- }
-
- /**
- * Scans through the tar file, looking for all segment entries.
- *
- * @param file The path of the TAR file.
- * @param access The contents of the TAR file.
- * @param entries The map that will contain the recovered entries. The
- * entries are inserted in the {@link LinkedHashMap} in the
- * order they appear in the TAR file.
- */
- private static void recoverEntries(File file, RandomAccessFile access, LinkedHashMap<UUID, byte[]> entries) throws IOException {
- byte[] header = new byte[BLOCK_SIZE];
- while (access.getFilePointer() + BLOCK_SIZE <= access.length()) {
- // read the tar header block
- access.readFully(header);
-
- // compute the header checksum
- int sum = 0;
- for (int i = 0; i < BLOCK_SIZE; i++) {
- sum += header[i] & 0xff;
- }
-
- // identify possible zero block
- if (sum == 0 && access.getFilePointer() + 2 * BLOCK_SIZE == access.length()) {
- return; // found the zero blocks at the end of the file
- }
-
- // replace the actual stored checksum with spaces for comparison
- for (int i = 148; i < 148 + 8; i++) {
- sum -= header[i] & 0xff;
- sum += ' ';
- }
-
- byte[] checkbytes = String.format("%06o\0 ", sum).getBytes(UTF_8);
- for (int i = 0; i < checkbytes.length; i++) {
- if (checkbytes[i] != header[148 + i]) {
- log.warn("Invalid entry checksum at offset {} in tar file {}, skipping...",
- access.getFilePointer() - BLOCK_SIZE, file);
- }
- }
-
- // The header checksum passes, so read the entry name and size
- ByteBuffer buffer = wrap(header);
- String name = readString(buffer, 100);
- buffer.position(124);
- int size = readNumber(buffer, 12);
- if (access.getFilePointer() + size > access.length()) {
- // checksum was correct, so the size field should be accurate
- log.warn("Partial entry {} in tar file {}, ignoring...", name, file);
- return;
- }
-
- Matcher matcher = NAME_PATTERN.matcher(name);
- if (matcher.matches()) {
- UUID id = UUID.fromString(matcher.group(1));
-
- String checksum = matcher.group(3);
- if (checksum != null || !entries.containsKey(id)) {
- byte[] data = new byte[size];
- access.readFully(data);
-
- // skip possible padding to stay at block boundaries
- long position = access.getFilePointer();
- long remainder = position % BLOCK_SIZE;
- if (remainder != 0) {
- access.seek(position + (BLOCK_SIZE - remainder));
- }
-
- if (checksum != null) {
- CRC32 crc = new CRC32();
- crc.update(data);
- if (crc.getValue() != Long.parseLong(checksum, 16)) {
- log.warn("Checksum mismatch in entry {} of tar file {}, skipping...",
- name, file);
- continue;
- }
- }
-
- entries.put(id, data);
- }
- } else if (!name.equals(file.getName() + ".idx")) {
- log.warn("Unexpected entry {} in tar file {}, skipping...",
- name, file);
- long position = access.getFilePointer() + size;
- long remainder = position % BLOCK_SIZE;
- if (remainder != 0) {
- position += BLOCK_SIZE - remainder;
- }
- access.seek(position);
- }
- }
- }
-
- private final File file;
+ private final SegmentArchiveManager archiveManager;
- private final FileAccess access;
+ private final SegmentArchiveManager.SegmentArchiveReader archive;
private final Index index;
private volatile boolean hasGraph;
- private final IOMonitor ioMonitor;
-
- private TarReader(File file, FileAccess access, Index index, IOMonitor ioMonitor) {
- this.file = file;
- this.access = access;
- this.index = index;
- this.ioMonitor = ioMonitor;
+ private TarReader(SegmentArchiveManager archiveManager, SegmentArchiveManager.SegmentArchiveReader archive) {
+ this.archiveManager = archiveManager;
+ this.archive = archive;
+ this.index = archive.getIndex();
}
long size() {
- return file.length();
+ return archive.length();
}
/**
@@ -514,15 +302,7 @@ class TarReader implements Closeable {
* @return the byte buffer, or null if not in this file.
*/
ByteBuffer readEntry(long msb, long lsb) throws IOException {
- int idx = findEntry(msb, lsb);
- if (idx == -1) {
- return null;
- }
- return readEntry(msb, lsb, index.entry(idx));
- }
-
- private ByteBuffer readEntry(long msb, long lsb, IndexEntry entry) throws IOException {
- return readSegment(msb, lsb, entry.getPosition(), entry.getLength());
+ return archive.readSegment(msb, lsb);
}
/**
@@ -705,7 +485,7 @@ class TarReader implements Closeable {
* TarReader}, or {@code null}.
*/
TarReader sweep(@Nonnull Set<UUID> reclaim, @Nonnull Set<UUID> reclaimed) throws IOException {
- String name = file.getName();
+ String name = archive.getName();
log.debug("Cleaning up {}", name);
Set<UUID> cleaned = newHashSet();
@@ -716,13 +496,13 @@ class TarReader implements Closeable {
TarEntry[] entries = getEntries();
for (int i = 0; i < entries.length; i++) {
TarEntry entry = entries[i];
- beforeSize += getEntrySize(entry.size());
+ beforeSize += archive.getEntrySize(entry.size());
UUID id = new UUID(entry.msb(), entry.lsb());
if (reclaim.contains(id)) {
cleaned.add(id);
entries[i] = null;
} else {
- afterSize += getEntrySize(entry.size());
+ afterSize += archive.getEntrySize(entry.size());
afterCount += 1;
}
}
@@ -737,7 +517,7 @@ class TarReader implements Closeable {
// in which case we'll always generate a new tar file with
// the graph to speed up future garbage collection runs.
log.debug("Not enough space savings. ({}/{}). Skipping clean up of {}",
- access.length() - afterSize, access.length(), name);
+ archive.length() - afterSize, archive.length(), name);
return this;
}
if (!hasGraph()) {
@@ -751,21 +531,18 @@ class TarReader implements Closeable {
return this;
}
- File newFile = new File(
- file.getParentFile(),
- name.substring(0, pos) + (char) (generation + 1) + ".tar");
+ String newFile = name.substring(0, pos) + (char) (generation + 1) + ".tar";
- log.debug("Writing new generation {}", newFile.getName());
- TarWriter writer = new TarWriter(newFile, ioMonitor);
+ log.debug("Writing new generation {}", newFile);
+ TarWriter writer = new TarWriter(archiveManager, newFile);
for (TarEntry entry : entries) {
if (entry != null) {
long msb = entry.msb();
long lsb = entry.lsb();
- int offset = entry.offset();
int size = entry.size();
GCGeneration gen = entry.generation();
byte[] data = new byte[size];
- readSegment(msb, lsb, offset, size).get(data);
+ archive.readSegment(msb, lsb).get(data);
writer.writeEntry(msb, lsb, data, 0, size, gen);
}
}
@@ -809,19 +586,19 @@ class TarReader implements Closeable {
writer.close();
- TarReader reader = openFirstFileWithValidIndex(singletonList(newFile), access.isMemoryMapped(), ioMonitor);
+ TarReader reader = openFirstFileWithValidIndex(singletonList(newFile), archiveManager);
if (reader != null) {
reclaimed.addAll(cleaned);
return reader;
} else {
- log.warn("Failed to open cleaned up tar file {}", file);
+ log.warn("Failed to open cleaned up tar file {}", getFileName());
return this;
}
}
@Override
public void close() throws IOException {
- access.close();
+ archive.close();
}
/**
@@ -831,42 +608,11 @@ class TarReader implements Closeable {
* @return The parsed graph, or {@code null} if one was not found.
*/
Map<UUID, List<UUID>> getGraph() throws IOException {
- ByteBuffer graph = loadGraph();
- if (graph == null) {
- return null;
- } else {
- return parseGraph(graph);
- }
+ return archive.getGraph();
}
private boolean hasGraph() {
- if (!hasGraph) {
- try {
- loadGraph();
- } catch (IOException ignore) { }
- }
- return hasGraph;
- }
-
- private int getIndexEntrySize() {
- return getEntrySize(index.size());
- }
-
- private int getGraphEntrySize() {
- ByteBuffer buffer;
-
- try {
- buffer = loadGraph();
- } catch (IOException e) {
- log.warn("Exception while loading pre-compiled tar graph", e);
- return 0;
- }
-
- if (buffer == null) {
- return 0;
- }
-
- return getEntrySize(buffer.getInt(buffer.limit() - 8));
+ return archive.hasGraph();
}
/**
@@ -883,142 +629,27 @@ class TarReader implements Closeable {
BinaryReferencesIndex getBinaryReferences() {
BinaryReferencesIndex index = null;
try {
- index = loadBinaryReferences();
+ index = archive.getBinaryReferences();
} catch (InvalidBinaryReferencesIndexException | IOException e) {
log.warn("Exception while loading binary reference", e);
}
return index;
}
- private BinaryReferencesIndex loadBinaryReferences() throws IOException, InvalidBinaryReferencesIndexException {
- int end = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize() - getGraphEntrySize();
- return BinaryReferencesIndexLoader.loadBinaryReferencesIndex((whence, size) -> access.read(end - whence, size));
- }
-
- /**
- * Loads the optional pre-compiled graph entry from the given tar file.
- *
- * @return graph buffer, or {@code null} if one was not found
- * @throws IOException if the tar file could not be read
- */
- private ByteBuffer loadGraph() throws IOException {
- int pos = access.length() - 2 * BLOCK_SIZE - getIndexEntrySize();
-
- ByteBuffer meta = access.read(pos - 16, 16);
-
- int crc32 = meta.getInt();
- int count = meta.getInt();
- int bytes = meta.getInt();
- int magic = meta.getInt();
-
- if (magic != GRAPH_MAGIC) {
- log.warn("Invalid graph magic number in {}", file);
- return null;
- }
-
- if (count < 0) {
- log.warn("Invalid number of entries in {}", file);
- return null;
- }
-
- if (bytes < 4 + count * 34) {
- log.warn("Invalid entry size in {}", file);
- return null;
- }
-
- ByteBuffer graph = access.read(pos - bytes, bytes);
-
- byte[] b = new byte[bytes - 16];
-
- graph.mark();
- graph.get(b);
- graph.reset();
-
- CRC32 checksum = new CRC32();
- checksum.update(b);
-
- if (crc32 != (int) checksum.getValue()) {
- log.warn("Invalid graph checksum in tar file {}", file);
- return null;
- }
-
- hasGraph = true;
-
- return graph;
- }
-
- private ByteBuffer readSegment(long msb, long lsb, int offset, int size) throws IOException {
- ioMonitor.beforeSegmentRead(file, msb, lsb, size);
- Stopwatch stopwatch = Stopwatch.createStarted();
- ByteBuffer buffer = access.read(offset, size);
- long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
- ioMonitor.afterSegmentRead(file, msb, lsb, size, elapsed);
- return buffer;
- }
-
- private static Map<UUID, List<UUID>> parseGraph(ByteBuffer buffer) {
- int nEntries = buffer.getInt(buffer.limit() - 12);
-
- Map<UUID, List<UUID>> graph = newHashMapWithExpectedSize(nEntries);
-
- for (int i = 0; i < nEntries; i++) {
- long msb = buffer.getLong();
- long lsb = buffer.getLong();
- int nVertices = buffer.getInt();
-
- List<UUID> vertices = newArrayListWithCapacity(nVertices);
-
- for (int j = 0; j < nVertices; j++) {
- long vMsb = buffer.getLong();
- long vLsb = buffer.getLong();
- vertices.add(new UUID(vMsb, vLsb));
- }
-
- graph.put(new UUID(msb, lsb), vertices);
- }
-
- return graph;
- }
-
- private static String readString(ByteBuffer buffer, int fieldSize) {
- byte[] b = new byte[fieldSize];
- buffer.get(b);
- int n = 0;
- while (n < fieldSize && b[n] != 0) {
- n++;
- }
- return new String(b, 0, n, UTF_8);
- }
-
- private static int readNumber(ByteBuffer buffer, int fieldSize) {
- byte[] b = new byte[fieldSize];
- buffer.get(b);
- int number = 0;
- for (int i = 0; i < fieldSize; i++) {
- int digit = b[i] & 0xff;
- if ('0' <= digit && digit <= '7') {
- number = number * 8 + digit - '0';
- } else {
- break;
- }
- }
- return number;
- }
-
/**
* Return the path of this TAR file.
*
* @return An instance of {@link File}.
*/
- File getFile() {
- return file;
+ String getFileName() {
+ return archive.getName();
}
//------------------------------------------------------------< Object >--
@Override
public String toString() {
- return file.toString();
+ return getFileName();
}
}