You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by dp...@apache.org on 2006/11/10 17:26:58 UTC
svn commit: r473380 - in
/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core:
cluster/ state/
Author: dpfister
Date: Fri Nov 10 08:26:57 2006
New Revision: 473380
URL: http://svn.apache.org/viewvc?view=rev&rev=473380
Log:
JCR-623 Clustering
Added:
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
Modified:
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java (original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/ClusterNode.java Fri Nov 10 08:26:57 2006
@@ -342,7 +342,7 @@
String msg = "Unable to create log entry: " + e.getMessage();
log.error(msg);
} catch (Throwable e) {
- String msg = "Unexpected error while creating log entry.";
+ String msg = "Unexpected error while preparing log entry.";
log.error(msg, e);
}
}
@@ -357,7 +357,7 @@
String msg = "Unable to create log entry: " + e.getMessage();
log.error(msg);
} catch (Throwable e) {
- String msg = "Unexpected error while creating log entry.";
+ String msg = "Unexpected error while committing log entry.";
log.error(msg, e);
}
}
@@ -372,7 +372,7 @@
String msg = "Unable to create log entry: " + e.getMessage();
log.error(msg);
} catch (Throwable e) {
- String msg = "Unexpected error while creating log entry.";
+ String msg = "Unexpected error while cancelling log entry.";
log.error(msg, e);
}
}
Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java (original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileJournal.java Fri Nov 10 08:26:57 2006
@@ -34,8 +34,6 @@
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
@@ -45,7 +43,25 @@
import EDU.oswego.cs.dl.util.concurrent.Mutex;
/**
- * File-based journal implementation.
+ * File-based journal implementation. A directory specified as <code>directory</code>
+ * bean property will contain log files and a global revision file, containing the
+ * latest revision file. When the current log file's size exceeds <code>maxSize</code>
+ * bytes, it gets renamed to its name appended by '1'. At the same time, all log files
+ * already having a version counter, get their version counter incremented by <code>1</code>.
+ * <p/>
+ * It is configured through the following properties:
+ * <ul>
+ * <li><code>directory</code>: the shared directory where journal logs and read from
+ * and written to; this is a required property with no default value</li>
+ * <li><code>revision</code>: the filename where the parent cluster node's revision
+ * file should be written to; this is a required property with no default value</li>
+ * <li><code>basename</code>: this is the basename of the journal logs created in
+ * the shared directory; its default value is <code>journal</code></li>
+ * <li><code>maximumSize</code>: this is the maximum size in bytes of a journal log
+ * before a new log will be created; its default value is <code>1048576</code> (1MB)</li>
+ * </ul>
+ *
+ * todo after some iterations, old files should be automatically compressed to save space
*/
public class FileJournal implements Journal {
@@ -55,6 +71,21 @@
private static final String REVISION_NAME = "revision";
/**
+ * Log extension.
+ */
+ private static final String LOG_EXTENSION = ".log";
+
+ /**
+ * Default base name for journal files.
+ */
+ private static final String DEFAULT_BASENAME = "journal";
+
+ /**
+ * Default max size of a journal file (1MB).
+ */
+ private static final int DEFAULT_MAXSIZE = 1048576;
+
+ /**
* Logger.
*/
private static Logger log = LoggerFactory.getLogger(FileJournal.class);
@@ -70,7 +101,7 @@
private NamespaceResolver resolver;
/**
- * Callback.
+ * Record processor.
*/
private RecordProcessor processor;
@@ -85,6 +116,16 @@
private String revision;
/**
+ * Journal file base name, bean property.
+ */
+ private String basename;
+
+ /**
+ * Maximum size of a journal file before a rotation takes place, bean property.
+ */
+ private int maximumSize;
+
+ /**
* Journal root directory.
*/
private File root;
@@ -152,6 +193,38 @@
}
/**
+ * Bean getter for base name.
+ * @return base name
+ */
+ public String getBasename() {
+ return basename;
+ }
+
+ /**
+ * Bean setter for basename.
+ * @param basename base name
+ */
+ public void setBasename(String basename) {
+ this.basename = basename;
+ }
+
+ /**
+ * Bean getter for maximum size.
+ * @return maximum size
+ */
+ public int getMaximumSize() {
+ return maximumSize;
+ }
+
+ /**
+ * Bean setter for maximum size.
+ * @param maximumSize maximum size
+ */
+ public void setMaximumSize(int maximumSize) {
+ this.maximumSize = maximumSize;
+ }
+
+ /**
* {@inheritDoc}
*/
public void init(String id, RecordProcessor processor, NamespaceResolver resolver) throws JournalException {
@@ -167,6 +240,12 @@
String msg = "Revision not specified.";
throw new JournalException(msg);
}
+ if (basename == null) {
+ basename = DEFAULT_BASENAME;
+ }
+ if (maximumSize == 0) {
+ maximumSize = DEFAULT_MAXSIZE;
+ }
root = new File(directory);
if (!root.exists() || !root.isDirectory()) {
String msg = "Directory specified does either not exist or is not a directory: " + directory;
@@ -182,49 +261,48 @@
* {@inheritDoc}
*/
public void sync() throws JournalException {
- final long instanceValue = instanceRevision.get();
- final long globalValue = globalRevision.get();
-
- File[] files = root.listFiles(new FilenameFilter() {
+ File[] logFiles = root.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
- if (name.endsWith(FileRecord.EXTENSION)) {
- int sep = name.indexOf('.');
- if (sep > 0) {
- try {
- long counter = Long.parseLong(name.substring(0, sep), 16);
- return counter > instanceValue && counter <= globalValue;
- } catch (NumberFormatException e) {
- String msg = "Skipping bogusly named journal file '" + name + "': " + e.getMessage();
- log.warn(msg);
- }
- }
- }
- return false;
+ return name.startsWith(basename + ".");
}
});
- Arrays.sort(files, new Comparator() {
+ Arrays.sort(logFiles, new Comparator() {
public int compare(Object o1, Object o2) {
File f1 = (File) o1;
File f2 = (File) o2;
- return f1.getName().compareTo(f2.getName());
+ return f1.compareTo(f2);
}
});
- if (files.length > 0) {
- for (int i = 0; i < files.length; i++) {
- try {
- FileRecord record = new FileRecord(files[i]);
- if (!record.getJournalId().equals(id)) {
+
+ long instanceValue = instanceRevision.get();
+ long globalValue = globalRevision.get();
+
+ if (instanceValue < globalValue) {
+ FileRecordCursor cursor = new FileRecordCursor(logFiles,
+ instanceValue, globalValue);
+ try {
+ while (cursor.hasNext()) {
+ FileRecord record = cursor.next();
+ if (!record.getCreator().equals(id)) {
process(record);
} else {
- log.info("Log entry matches journal id, skipped: " + files[i]);
+ log.info("Log entry matches journal id, skipped: " + record.getRevision());
}
- instanceRevision.set(record.getCounter());
- } catch (IllegalArgumentException e) {
- String msg = "Skipping bogusly named journal file '" + files[i] + ": " + e.getMessage();
+ instanceRevision.set(record.getNextRevision());
+ }
+ } catch (IOException e) {
+ String msg = "Unable to iterate over modified records: " + e.getMessage();
+ throw new JournalException(msg);
+
+ } finally {
+ try {
+ cursor.close();
+ } catch (IOException e) {
+ String msg = "I/O error while closing record cursor: " + e.getMessage();
log.warn(msg);
}
}
- log.info("Sync finished, instance revision is: " + FileRecord.toHexString(instanceRevision.get()));
+ log.info("Sync finished, instance revision is: " + instanceRevision.get());
}
}
@@ -235,16 +313,12 @@
* @throws JournalException if an error occurs
*/
void process(FileRecord record) throws JournalException {
- File file = record.getFile();
-
- log.info("Processing: " + file);
+ log.info("Processing revision: " + record.getRevision());
- FileRecordInput in = null;
+ FileRecordInput in = record.getInput(resolver);
String workspace = null;
try {
- in = new FileRecordInput(new FileInputStream(file), resolver);
-
workspace = in.readString();
if (workspace.equals("")) {
workspace = null;
@@ -296,23 +370,19 @@
processor.end();
} catch (NameException e) {
- String msg = "Unable to read journal entry " + file + ": " + e.getMessage();
+ String msg = "Unable to read revision " + record.getRevision() +
+ ": " + e.getMessage();
throw new JournalException(msg);
} catch (IOException e) {
- String msg = "Unable to read journal entry " + file + ": " + e.getMessage();
+ String msg = "Unable to read revision " + record.getRevision() +
+ ": " + e.getMessage();
throw new JournalException(msg);
} catch (IllegalArgumentException e) {
- String msg = "Error while processing journal file " + file + ": " + e.getMessage();
+ String msg = "Error while processing revision " +
+ record.getRevision() + ": " + e.getMessage();
throw new JournalException(msg);
} finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException e) {
- String msg = "I/O error while closing " + file + ": " + e.getMessage();
- log.warn(msg);
- }
- }
+ in.close();
}
}
@@ -333,7 +403,9 @@
sync();
tempLog = File.createTempFile("journal", ".tmp", root);
- out = new FileRecordOutput(new FileOutputStream(tempLog), resolver);
+
+ record = new FileRecord(id, tempLog);
+ out = record.getOutput(resolver);
out.writeString(workspace != null ? workspace : "");
succeeded = true;
@@ -490,7 +562,8 @@
try {
sync();
- record = new FileRecord(root, globalRevision.get() + 1, id);
+
+ record.setRevision(globalRevision.get());
prepared = true;
} finally {
@@ -508,10 +581,22 @@
out.writeChar('\0');
out.close();
- if (!tempLog.renameTo(record.getFile())) {
- throw new JournalException("Unable to rename " + tempLog + " to " + record.getFile());
+ long nextRevision = record.getNextRevision();
+
+ File journalFile = new File(root, basename + LOG_EXTENSION);
+
+ FileRecordLog recordLog = new FileRecordLog(journalFile);
+ if (!recordLog.isNew()) {
+ if (nextRevision - recordLog.getFirstRevision() > maximumSize) {
+ switchLogs();
+ recordLog = new FileRecordLog(journalFile);
+ }
}
- globalRevision.set(record.getCounter());
+ recordLog.append(record);
+
+ tempLog.delete();
+ globalRevision.set(nextRevision);
+ instanceRevision.set(nextRevision);
} catch (IOException e) {
String msg = "Unable to close journal log " + tempLog + ": " + e.getMessage();
@@ -535,6 +620,48 @@
} finally {
globalRevision.unlock();
writeMutex.release();
+ }
+ }
+
+ /**
+ * Move away current journal file (and all other files), incrementing their
+ * version counter. A file named <code>journal.N.log</code> gets renamed to
+ * <code>journal.(N+1).log</code>, whereas the main journal file gets renamed
+ * to <code>journal.1.log</code>.
+ */
+ private void switchLogs() {
+ FilenameFilter filter = new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith(basename + ".");
+ }
+ };
+ File[] files = root.listFiles(filter);
+ Arrays.sort(files, new Comparator() {
+ public int compare(Object o1, Object o2) {
+ File f1 = (File) o1;
+ File f2 = (File) o2;
+ return f2.compareTo(f1);
+ }
+ });
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ String name = file.getName();
+ int sep = name.lastIndexOf('.');
+ if (sep != -1) {
+ String ext = name.substring(sep + 1);
+ if (ext.equals(LOG_EXTENSION)) {
+ file.renameTo(new File(root, name + ".1"));
+ } else {
+ try {
+ int version = Integer.parseInt(ext);
+ String newName = name.substring(0, sep + 1) +
+ String.valueOf(version + 1);
+ file.renameTo(new File(newName));
+ } catch (NumberFormatException e) {
+ log.warn("Bogusly named journal file, skipped: " + file);
+ }
+ }
+ }
}
}
}
Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java (original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecord.java Fri Nov 10 08:26:57 2006
@@ -16,134 +16,364 @@
*/
package org.apache.jackrabbit.core.cluster;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
+import org.apache.jackrabbit.name.NamespaceResolver;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.DataInput;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
/**
- * Represents a file-based record.
+ * Represents a file-based record. Physically, a file record contains its length in the
+ * first 4 bytes, immediately followed by its creator in a length-prefixed, UTF-encoded
+ * string. All further fields are record-specific.
*/
class FileRecord {
/**
- * File record extension.
+ * Indicator for a literal UUID.
*/
- static final String EXTENSION = ".log";
+ static final byte UUID_LITERAL = 'L';
/**
- * Indicator for a literal UUID.
+ * Indicator for a UUID index.
*/
- static final byte UUID_LITERAL = 0x00;
+ static final byte UUID_INDEX = 'I';
/**
- * Indicator for a UUID index.
+ * Revision.
+ */
+ private long revision;
+
+ /**
+ * Underlying input stream.
+ */
+ private DataInputStream in;
+
+ /**
+ * File use when creating a new record.
+ */
+ private File file;
+
+ /**
+ * Underlying output stream.
*/
- static final byte UUID_INDEX = 0x01;
+ private DataOutputStream out;
/**
- * Used for padding long string representations.
+ * Record length.
*/
- private static final String LONG_PADDING = "0000000000000000";
+ private int length;
/**
- * Underlying file.
+ * Creator of a record.
*/
- private final File file;
+ private String creator;
/**
- * Counter.
+ * Bytes used by creator when written in UTF encoding and length-prefixed.
*/
- private final long counter;
+ private int creatorLength;
/**
- * Journal id.
+ * Flag indicating whether bytes need to be skipped at the end.
*/
- private final String journalId;
+ private boolean consumed;
+
+ /**
+ * Creates a new file record. Used when opening an existing record.
+ *
+ * @param revision revision this record represents
+ * @param in underlying input stream
+ * @throws IOException if reading the creator fails
+ */
+ public FileRecord(long revision, InputStream in)
+ throws IOException {
+
+ this.revision = revision;
+ if (in instanceof DataInputStream) {
+ this.in = (DataInputStream) in;
+ } else {
+ this.in = new DataInputStream(in);
+ }
+ this.length = this.in.readInt();
+
+ readCreator();
+ }
/**
- * Creates a new file record from an existing file. Retrieves meta data by parsing the file's name.
+ * Creates a new file record. Used when creating a new record.
*
- * @param file file to use as record
- * @throws IllegalArgumentException if file name is bogus
+ * @param creator creator of this record
+ * @param file underlying (temporary) file
+ * @throws IOException if writing the creator fails
*/
- public FileRecord(File file) throws IllegalArgumentException {
+ public FileRecord(String creator, File file) throws IOException {
+
+ this.creator = creator;
this.file = file;
- String name = file.getName();
+ this.out = new DataOutputStream(new FileOutputStream(file));
- int sep1 = name.indexOf('.');
- if (sep1 == -1) {
- throw new IllegalArgumentException("Missing first . separator.");
- }
- try {
- counter = Long.parseLong(name.substring(0, sep1), 16);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Unable to decompose long: " + e.getMessage());
- }
- int sep2 = name.lastIndexOf('.');
- if (sep2 == -1) {
- throw new IllegalArgumentException("Missing second . separator.");
- }
- journalId = name.substring(sep1 + 1, sep2);
+ writeCreator();
}
/**
- * Creates a new file record from a counter and instance ID.
+ * Return the journal revision associated with this record.
*
- * @param parent parent directory
- * @param counter counter to use
- * @param journalId journal id to use
+ * @return revision
*/
- public FileRecord(File parent, long counter, String journalId) {
- StringBuffer name = new StringBuffer();
- name.append(toHexString(counter));
- name.append('.');
- name.append(journalId);
+ public long getRevision() {
+ return revision;
+ }
- name.append(EXTENSION);
+ /**
+ * Set the journal revision associated with this record.
+ *
+ * @param revision journal revision
+ */
+ public void setRevision(long revision) {
+ this.revision = revision;
+ }
- this.file = new File(parent, name.toString());
- this.counter = counter;
- this.journalId = journalId;
+ /**
+ * Return the journal counter associated with the next record.
+ *
+ * @return next revision
+ */
+ public long getNextRevision() {
+ return revision + length + 4;
}
/**
- * Return the journal counter associated with this record.
+ * Return the creator of this record.
*
- * @return counter
+ * @return creator
*/
- public long getCounter() {
- return counter;
+ public String getCreator() {
+ return creator;
}
/**
- * Return the id of the journal that created this record.
+ * Return an input on this record.
*
- * @return journal id
+ * @param resolver resolver to use when mapping prefixes to full names
+ * @return record input
*/
- public String getJournalId() {
- return journalId;
+ public FileRecordInput getInput(NamespaceResolver resolver) {
+ consumed = true;
+ return new FileRecordInput(in, resolver);
}
/**
- * Return this record's file.
+ * Return an output on this record.
*
- * @return file
+ * @param resolver resolver to use when mapping full names to prefixes
+ * @return record output
*/
- public File getFile() {
- return file;
+ public FileRecordOutput getOutput(NamespaceResolver resolver) {
+ return new FileRecordOutput(this, out, resolver);
+ }
+
+ /**
+ * Append this record to some output stream.
+ *
+ * @param out outputstream to append to
+ */
+ void append(DataOutputStream out) throws IOException {
+ out.writeInt(length);
+
+ byte[] buffer = new byte[8192];
+ int len;
+
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ try {
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ out.flush();
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Skip over this record, positioning the underlying input stream
+ * on the next available record.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ void skip() throws IOException {
+ if (!consumed) {
+ long skiplen = length - creatorLength;
+ while (skiplen > 0) {
+ long skipped = in.skip(skiplen);
+ if (skipped <= 0) {
+ break;
+ }
+ skiplen -= skipped;
+ }
+ if (skiplen != 0) {
+ String msg = "Unable to skip remaining bytes.";
+ throw new IOException(msg);
+ }
+ }
}
/**
- * Return a zero-padded long string representation.
+ * Invoked when output has been closed.
*/
- public static String toHexString(long l) {
- String s = Long.toHexString(l);
- int padlen = LONG_PADDING.length() - s.length();
- if (padlen > 0) {
- s = LONG_PADDING.substring(0, padlen) + s;
+ void closed() {
+ length = (int) file.length();
+ }
+
+ /**
+ * Read creator from the underlying data input stream.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void readCreator() throws IOException {
+ UTFByteCounter counter = new UTFByteCounter(in);
+ creator = DataInputStream.readUTF(counter);
+ creatorLength = counter.getBytes();
+ }
+
+ /**
+ * Write creator to the underlying data output stream.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void writeCreator() throws IOException {
+ out.writeUTF(creator);
+ }
+
+ /**
+ * UTF byte counter. Counts the bytes actually read from a given
+ * <code>DataInputStream</code> that make up a UTF-encoded string.
+ */
+ static class UTFByteCounter implements DataInput {
+
+ /**
+ * Underlying input stream.
+ */
+ private final DataInputStream in;
+
+ /**
+ * UTF length.
+ */
+ private int bytes;
+
+ /**
+ * Create a new instance of this class.
+ *
+ * @param in underlying data input stream
+ */
+ public UTFByteCounter(DataInputStream in) {
+ this.in = in;
+ }
+
+ /**
+ * Return the number of bytes read from the underlying input stream.
+ *
+ * @return number of bytes
+ */
+ public int getBytes() {
+ return bytes;
+ }
+
+ /**
+ * @see java.io.DataInputStream#readUnsignedShort()
+ *
+ * Remember number of bytes read.
+ */
+ public int readUnsignedShort() throws IOException {
+ try {
+ return in.readUnsignedShort();
+ } finally {
+ bytes += 2;
+ }
+ }
+
+ /**
+ * @see java.io.DataInputStream#readUnsignedShort()
+ *
+ * Remember number of bytes read.
+ */
+ public void readFully(byte b[]) throws IOException {
+ try {
+ in.readFully(b);
+ } finally {
+ bytes += b.length;
+ }
+ }
+
+ /**
+ * @see java.io.DataInputStream#readUnsignedShort()
+ *
+ * Remember number of bytes read.
+ */
+ public void readFully(byte b[], int off, int len) throws IOException {
+ try {
+ in.readFully(b, off, len);
+ } finally {
+ bytes += b.length;
+ }
+ }
+
+ /**
+ * Methods not implemented.
+ */
+ public byte readByte() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public char readChar() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public double readDouble() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public float readFloat() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public int readInt() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public int readUnsignedByte() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public long readLong() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public short readShort() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public boolean readBoolean() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public int skipBytes(int n) throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public String readLine() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
+ }
+
+ public String readUTF() throws IOException {
+ throw new IllegalStateException("Unexpected call, deliberately not implemented.");
}
- return s;
}
}
Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java?view=auto&rev=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordCursor.java Fri Nov 10 08:26:57 2006
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Record cursor that returns unseen revisions in ascending order on every
+ * iteration. When iterating, a record must either be completely processed
+ * or its {@link FileRecord#skip()} method must be invoked to guarantee
+ * that this cursor is pointing at the next record.
+ */
+class FileRecordCursor {
+
+ /**
+ * Log files to scan for revisions.
+ */
+ private File[] logFiles;
+
+ /**
+ * Next revision to visit.
+ */
+ private long nextRevision;
+
+ /**
+ * Last revision to visit.
+ */
+ private long lastRevision;
+
+ /**
+ * Current record log, containing file records.
+ */
+ private FileRecordLog recordLog;
+
+ /**
+ * Current record.
+ */
+ private FileRecord record;
+
+ /**
+ * Creates a new instance of this class.
+ *
+ * @param logFiles available log files, sorted ascending by age
+ * @param firstRevision first revision to return
+ * @param lastRevision last revision to return
+ */
+ public FileRecordCursor(File[] logFiles, long firstRevision, long lastRevision) {
+ this.logFiles = logFiles;
+ this.nextRevision = firstRevision;
+ this.lastRevision = lastRevision;
+ }
+
+
+ /**
+ * Return a flag indicating whether there are next records.
+ */
+ public boolean hasNext() {
+ return nextRevision < lastRevision;
+ }
+
+ /**
+ * Returns the next record.
+ *
+ * @throws IllegalStateException if no next revision exists
+ * @throws IOException if an I/O error occurs
+ */
+ public FileRecord next() throws IOException {
+ if (!hasNext()) {
+ String msg = "No next revision.";
+ throw new IllegalStateException(msg);
+ }
+ if (record != null) {
+ record.skip();
+ record = null;
+ }
+ if (recordLog != null) {
+ if (!recordLog.contains(nextRevision)) {
+ recordLog.close();
+ recordLog = null;
+ }
+ }
+ if (recordLog == null) {
+ recordLog = getRecordLog(nextRevision);
+ recordLog.seek(nextRevision);
+ }
+ record = new FileRecord(nextRevision, recordLog.getInputStream());
+ nextRevision = record.getNextRevision();
+ return record;
+ }
+
+ /**
+ * Return record log containing a given revision.
+ *
+ * @param revision revision to locate
+ * @return record log containing that revision
+ * @throws IOException if an I/O error occurs
+ */
+ private FileRecordLog getRecordLog(long revision) throws IOException {
+ for (int i = 0; i < logFiles.length; i++) {
+ FileRecordLog recordLog = new FileRecordLog(logFiles[i]);
+ if (recordLog.contains(revision)) {
+ return recordLog;
+ }
+ }
+ String msg = "No log file found containing revision: " + revision;
+ throw new IOException(msg);
+ }
+
+ /**
+ * Close this cursor, releasing its resources.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void close() throws IOException {
+ if (recordLog != null) {
+ recordLog.close();
+ }
+ }
+}
\ No newline at end of file
Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java (original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordInput.java Fri Nov 10 08:26:57 2006
@@ -26,16 +26,15 @@
import org.apache.jackrabbit.name.Path;
import org.apache.jackrabbit.name.PathFormat;
import org.apache.jackrabbit.name.MalformedPathException;
+import org.apache.jackrabbit.uuid.Constants;
+import org.apache.jackrabbit.uuid.UUID;
-import java.io.File;
-import java.io.RandomAccessFile;
import java.io.IOException;
-import java.io.InputStream;
import java.io.DataInputStream;
import java.util.ArrayList;
/**
- * Defines methods to read members out of a file record.
+ * Allows reading data from a <code>FileRecord</code>.
*/
class FileRecordInput {
@@ -65,8 +64,8 @@
* @param in underlying input stream
* @param resolver namespace resolver
*/
- public FileRecordInput(InputStream in, NamespaceResolver resolver) {
- this.in = new DataInputStream(in);
+ public FileRecordInput(DataInputStream in, NamespaceResolver resolver) {
+ this.in = in;
this.resolver = resolver;
}
@@ -186,20 +185,22 @@
public NodeId readNodeId() throws IOException {
checkOpen();
- byte b = readByte();
- if (b == FileRecord.UUID_INDEX) {
+ byte uuidType = readByte();
+ if (uuidType == FileRecord.UUID_INDEX) {
int index = readInt();
if (index == -1) {
return null;
} else {
return (NodeId) uuidIndex.get(index);
}
- } else if (b == FileRecord.UUID_LITERAL) {
- NodeId nodeId = NodeId.valueOf(readString());
+ } else if (uuidType == FileRecord.UUID_LITERAL) {
+ byte[] b = new byte[Constants.UUID_BYTE_LENGTH];
+ in.readFully(b);
+ NodeId nodeId = new NodeId(new UUID(b));
uuidIndex.add(nodeId);
return nodeId;
} else {
- String msg = "UUID indicator unknown: " + b;
+ String msg = "UUID type unknown: " + uuidType;
throw new IOException(msg);
}
}
@@ -219,18 +220,12 @@
}
/**
- * Close this input.
- *
- * @throws IOException if an I/O error occurs
+ * Close this input. Does not close underlying stream as this is a shared resource.
*/
- public void close() throws IOException {
+ public void close() {
checkOpen();
- try {
- in.close();
- } finally {
- closed = true;
- }
+ closed = true;
}
/**
Added: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java?view=auto&rev=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java (added)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordLog.java Fri Nov 10 08:26:57 2006
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.cluster;
+
+import java.io.File;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.BufferedInputStream;
+import java.io.DataOutputStream;
+import java.io.FileOutputStream;
+
+/**
+ * A file record log is a file containing {@link FileRecord}s. Internally,
+ * the first 8 bytes contain the revision this log starts with.
+ */
+class FileRecordLog {
+
+ /**
+ * Underlying file.
+ */
+ private File file;
+
+ /**
+ * Flag indicating whether this is a new log.
+ */
+ private boolean isNew;
+
+ /**
+ * Input stream used when seeking a specific record.
+ */
+ private DataInputStream in;
+
+ /**
+ * First revision available in this log.
+ */
+ private long minRevision;
+
+ /**
+ * First revision that is not available in this, but in the next log.
+ */
+ private long maxRevision;
+
+ /**
+ * Create a new instance of this class.
+ *
+ * @param file file containing record log
+ * @throws IOException if an I/O error occurs
+ */
+ public FileRecordLog(File file) throws IOException {
+ this.file = file;
+
+ if (file.exists()) {
+ DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+ try {
+ minRevision = in.readLong();
+ maxRevision = minRevision + file.length() - 8;
+ } finally {
+ in.close();
+ }
+ } else {
+ isNew = true;
+ }
+ }
+
+ /**
+ * Return the first revision.
+ *
+ * @return first revision
+ */
+ public long getFirstRevision() {
+ return minRevision;
+ }
+
+ /**
+ * Return a flag indicating whether this record log contains a certain revision.
+ *
+ * @param revision revision to look for
+ * @return <code>true</code> if this record log contain a certain revision;
+ * <code>false</code> otherwise
+ */
+ public boolean contains(long revision) {
+ return (revision >= minRevision && revision < maxRevision);
+ }
+
+ /**
+ * Return a flag indicating whether this record log is new.
+ *
+ * @return <code>true</code> if this record log is new;
+ * <code>false</code> otherwise
+ */
+ public boolean isNew() {
+ return isNew;
+ }
+
+ /**
+ * Seek an entry. This is an operation that allows the unterlying input stream
+ * to be sequentially scanned and must therefore not be called twice.
+ *
+ * @param revision revision to seek
+ * @throws IOException if an I/O error occurs
+ */
+ public void seek(long revision) throws IOException {
+ if (in != null) {
+ String msg = "Seek allowed exactly once.";
+ throw new IllegalStateException(msg);
+ }
+ open();
+
+ long skiplen = revision - minRevision + 8;
+ while (skiplen > 0) {
+ long skipped = in.skip(skiplen);
+ if (skipped <= 0) {
+ break;
+ }
+ skiplen -= skipped;
+ }
+ if (skiplen != 0) {
+ String msg = "Unable to skip remaining bytes.";
+ throw new IOException(msg);
+ }
+ }
+
+ /**
+ * Append a record to this log.
+ *
+ * @param record record to add
+ * @throws IOException if an I/O error occurs
+ */
+ public void append(FileRecord record) throws IOException {
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(file, true));
+ try {
+ if (isNew) {
+ out.writeLong(record.getRevision());
+ }
+ record.append(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Open this log.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ private void open() throws IOException {
+ in = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(file)));
+ }
+
+ /**
+ * Return the underlying input stream.
+ *
+ * @return underlying input stream
+ */
+ protected DataInputStream getInputStream() {
+ if (in == null) {
+ String msg = "Input stream not open.";
+ throw new IllegalStateException(msg);
+ }
+ return in;
+ }
+
+ /**
+ * Close this log.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+}
\ No newline at end of file
Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java (original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/cluster/FileRecordOutput.java Fri Nov 10 08:26:57 2006
@@ -27,15 +27,19 @@
import java.io.IOException;
import java.io.DataOutputStream;
-import java.io.OutputStream;
import java.util.ArrayList;
/**
- * Defines methods to write members to a file record.
+ * Allows writing data to a <code>FileRecord</code>.
*/
class FileRecordOutput {
/**
+ * File record.
+ */
+ private final FileRecord record;
+
+ /**
* Underlying output stream.
*/
private final DataOutputStream out;
@@ -58,11 +62,13 @@
/**
* Create a new file record.
*
+ * @param record file record
* @param out outputstream to write to
* @param resolver namespace resolver
*/
- public FileRecordOutput(OutputStream out, NamespaceResolver resolver) {
- this.out = new DataOutputStream(out);
+ public FileRecordOutput(FileRecord record, DataOutputStream out, NamespaceResolver resolver) {
+ this.record = record;
+ this.out = out;
this.resolver = resolver;
}
@@ -187,7 +193,7 @@
writeInt(index);
} else {
writeByte(FileRecord.UUID_LITERAL);
- writeString(nodeId.toString());
+ out.write(nodeId.getUUID().getRawBytes());
}
}
}
@@ -218,6 +224,7 @@
out.close();
} finally {
closed = true;
+ record.closed();
}
}
Modified: jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java?view=diff&rev=473380&r1=473379&r2=473380
==============================================================================
--- jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java (original)
+++ jackrabbit/trunk/jackrabbit/src/main/java/org/apache/jackrabbit/core/state/SharedItemStateManager.java Fri Nov 10 08:26:57 2006
@@ -834,7 +834,7 @@
shared.modified(state);
} catch (ItemStateException e) {
String msg = "Unable to retrieve state: " + state.getId();
- log.warn(msg, e);
+ log.warn(msg);
state.discard();
}
}