You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2018/04/05 09:08:51 UTC
[2/4] jena git commit: JENA-1516: Simplify write. Sync writer buffer;
protect length read.
JENA-1516: Simplify write. Sync writer buffer; protect length read.
Remove alloc-write.
Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/ab8d0f4e
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/ab8d0f4e
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/ab8d0f4e
Branch: refs/heads/master
Commit: ab8d0f4e71039e1049d355ad37109c2bb515b803
Parents: d0662ca
Author: Andy Seaborne <an...@apache.org>
Authored: Tue Apr 3 22:52:24 2018 +0100
Committer: Andy Seaborne <an...@apache.org>
Committed: Tue Apr 3 22:52:24 2018 +0100
----------------------------------------------------------------------
.../tdb/base/objectfile/ObjectFileStorage.java | 242 +++++--------------
.../java/org/apache/jena/tdb/lib/NodeLib.java | 35 +--
2 files changed, 79 insertions(+), 198 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/jena/blob/ab8d0f4e/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
index 8fcd06b..3ad126b 100644
--- a/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
+++ b/jena-tdb/src/main/java/org/apache/jena/tdb/base/objectfile/ObjectFileStorage.java
@@ -27,8 +27,6 @@ import java.util.Iterator ;
import org.apache.jena.atlas.iterator.Iter ;
import org.apache.jena.atlas.iterator.IteratorSlotted ;
import org.apache.jena.atlas.lib.Pair ;
-import org.apache.jena.atlas.logging.Log ;
-import org.apache.jena.tdb.base.block.Block ;
import org.apache.jena.tdb.base.file.BufferChannel ;
import org.apache.jena.tdb.base.file.FileException ;
import org.apache.jena.tdb.sys.SystemTDB ;
@@ -50,7 +48,8 @@ public class ObjectFileStorage implements ObjectFile
}
/*
- * No synchronization - assumes that the caller has some appropriate lock
+ * No synchronization excpet for the write buffer.
+ * This code assumes that the caller has some appropriate lock
* because the combination of file and cache operations needs to be thread safe.
*
* The position of the channel is assumed to be the end of the file always.
@@ -60,25 +59,12 @@ public class ObjectFileStorage implements ObjectFile
* Writing is buffered.
*/
- // The object length slot.
- private ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt) ;
-
- // Delayed write buffer.
+ private final Object lockWriteBuffer = new Object();
private final ByteBuffer writeBuffer ;
private final BufferChannel file ; // Access to storage
- private long filesize ; // Size of on-disk.
+ private volatile long filesize ; // Size of on-disk.
- // Two-step write - alloc, write
- private boolean inAllocWrite = false ;
- private Block allocBlock = null ;
- private long allocLocation = -1 ;
-
- // Old values for abort.
- int oldBufferPosn = -1 ;
- int oldBufferLimit = -1 ;
-
-
public ObjectFileStorage(BufferChannel file)
{
this(file, ObjectFileWriteCacheSize) ;
@@ -94,13 +80,11 @@ public class ObjectFileStorage implements ObjectFile
}
@Override
+ synchronized
public long write(ByteBuffer bb)
{
log("W") ;
- if ( inAllocWrite )
- Log.error(this, "In the middle of an alloc-write") ;
- inAllocWrite = false ;
if ( writeBuffer == null )
{
long x = rawWrite(bb) ;
@@ -111,35 +95,40 @@ public class ObjectFileStorage implements ObjectFile
int len = bb.limit() - bb.position() ;
int spaceNeeded = len + SizeOfInt ;
- if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
- // No room - flush.
- flushOutputBuffer() ;
- if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
- {
- long x = rawWrite(bb) ;
+ synchronized(lockWriteBuffer) {
+ if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
+ // No room - flush.
+ flushOutputBuffer() ;
+ if ( writeBuffer.position()+spaceNeeded > writeBuffer.capacity() )
+ {
+ long x = rawWrite(bb) ;
+ if ( logging )
+ log("W -> 0x%X", x);
+ return x ;
+ }
+
+ long loc = writeBuffer.position()+filesize ;
+ writeBuffer.putInt(len) ;
+ writeBuffer.put(bb) ;
if ( logging )
- log("W -> 0x%X", x);
- return x ;
+ log("W -> 0x%X", loc);
+ return loc ;
}
-
- long loc = writeBuffer.position()+filesize ;
- writeBuffer.putInt(len) ;
- writeBuffer.put(bb) ;
- if ( logging )
- log("W -> 0x%X", loc);
- return loc ;
}
+ // The object length slot.
+ private ByteBuffer writeLengthBuffer = ByteBuffer.allocate(SizeOfInt) ;
+
private long rawWrite(ByteBuffer bb)
{
if ( logging )
log("RW %s", bb) ;
int len = bb.limit() - bb.position() ;
- lengthBuffer.rewind() ;
- lengthBuffer.putInt(len) ;
- lengthBuffer.flip() ;
+ writeLengthBuffer.rewind() ;
+ writeLengthBuffer.putInt(len) ;
+ writeLengthBuffer.flip() ;
long location = file.position() ;
- file.write(lengthBuffer) ;
+ file.write(writeLengthBuffer) ;
int x = file.write(bb) ;
if ( x != len )
throw new FileException() ;
@@ -153,140 +142,25 @@ public class ObjectFileStorage implements ObjectFile
return location ;
}
- @Override
- public Block allocWrite(int bytesSpace)
- {
- //log.info("AW("+bytesSpace+"):"+state()) ;
- if ( inAllocWrite )
- Log.error(this, "In the middle of an alloc-write") ;
-
- // Include space for length.
- int spaceRequired = bytesSpace + SizeOfInt ;
-
- // Find space.
- if ( writeBuffer != null && spaceRequired > writeBuffer.remaining() )
- flushOutputBuffer() ;
-
- if ( writeBuffer == null || spaceRequired > writeBuffer.remaining() )
- {
- // Too big. Have flushed buffering if buffering.
- inAllocWrite = true ;
- ByteBuffer bb = ByteBuffer.allocate(bytesSpace) ;
- allocBlock = new Block(filesize, bb) ;
- allocLocation = -1 ;
- //log.info("AW:"+state()+"-> ----") ;
- return allocBlock ;
- }
-
- // Will fit.
- inAllocWrite = true ;
- int start = writeBuffer.position() ;
- // Old values for restoration
- oldBufferPosn = start ;
- oldBufferLimit = writeBuffer.limit() ;
-
- // id (but don't tell the caller yet).
- allocLocation = filesize+start ;
-
- // Slice it.
- writeBuffer.putInt(bytesSpace) ;
- writeBuffer.position(start + SizeOfInt) ;
- writeBuffer.limit(start+spaceRequired) ;
- ByteBuffer bb = writeBuffer.slice() ;
-
- allocBlock = new Block(allocLocation, bb) ;
-
- if ( logging )
- log("AW: %s->0x%X", state(), allocLocation) ;
- return allocBlock ;
- }
-
- @Override
- public void completeWrite(Block block)
- {
- if ( logging )
- log("CW: %s @0x%X",block, allocLocation) ;
- if ( ! inAllocWrite )
- throw new FileException("Not in the process of an allocated write operation pair") ;
- if ( allocBlock != null && ( allocBlock.getByteBuffer() != block.getByteBuffer() ) )
- throw new FileException("Wrong byte buffer in an allocated write operation pair") ;
-
- inAllocWrite = false ;
-
- ByteBuffer buffer = block.getByteBuffer() ;
-
- if ( allocLocation == -1 )
- {
- // It was too big to use the buffering.
- rawWrite(buffer) ;
- return ;
- }
- // Write area is 0 -> limit
- if ( 0 != buffer.position() )
- log.warn("ObjectFleStorage: position != 0") ;
- buffer.position(0) ;
- int actualLength = buffer.limit()-buffer.position() ;
- // Insert object length
- int idx = (int)(allocLocation-filesize) ;
- writeBuffer.putInt(idx, actualLength) ;
- // And bytes to idx+actualLength+4 are used
- allocBlock = null ;
- int newLen = idx+actualLength+4 ;
- writeBuffer.position(newLen);
- writeBuffer.limit(writeBuffer.capacity()) ;
- allocLocation = -1 ;
- oldBufferPosn = -1 ;
- oldBufferLimit = -1 ;
- }
-
- @Override
- public void abortWrite(Block block)
- {
- allocBlock = null ;
- int oldstart = (int)(allocLocation-filesize) ;
- if ( oldstart != oldBufferPosn)
- throw new FileException("Wrong reset point: calc="+oldstart+" : expected="+oldBufferPosn) ;
-
- writeBuffer.position(oldstart) ;
- writeBuffer.limit(oldBufferLimit) ;
- allocLocation = -1 ;
- oldBufferPosn = -1 ;
- oldBufferLimit = -1 ;
- inAllocWrite = false ;
- }
-
private void flushOutputBuffer()
{
if ( logging )
log("Flush") ;
- if ( writeBuffer == null ) return ;
- if ( writeBuffer.position() == 0 ) return ;
-
- if ( false )
- {
- String x = getLabel() ;
- if ( x.contains("nodes") )
- {
- long x1 = filesize ;
- long x2 = writeBuffer.position() ;
- long x3 = x1 + x2 ;
- System.out.printf("Flush(%s) : %d/0x%04X (%d/0x%04X) %d/0x%04X\n", getLabel(), x1, x1, x2, x2, x3, x3) ;
- }
- }
-
- long location = filesize ;
+ if ( writeBuffer == null )
+ return;
+ if ( writeBuffer.position() == 0 )
+ return;
+ long location = filesize;
writeBuffer.flip();
- int x = file.write(writeBuffer) ;
- filesize += x ;
- writeBuffer.clear() ;
+ int x = file.write(writeBuffer);
+ filesize += x;
+ writeBuffer.clear();
}
@Override
public void reposition(long posn)
{
- if ( inAllocWrite )
- throw new FileException("In the middle of an alloc-write") ;
if ( posn < 0 || posn > length() )
throw new IllegalArgumentException("reposition: Bad location: "+posn) ;
flushOutputBuffer() ;
@@ -307,38 +181,40 @@ public class ObjectFileStorage implements ObjectFile
if ( logging )
log("R(0x%X)", loc) ;
- if ( inAllocWrite )
- throw new FileException("In the middle of an alloc-write") ;
if ( loc < 0 )
throw new IllegalArgumentException("ObjectFile.read["+file.getLabel()+"]: Bad read: "+loc) ;
// Maybe it's in the in the write buffer.
- // Maybe the write buffer should keep more structure?
if ( loc >= filesize )
{
- if ( loc >= filesize+writeBuffer.position() )
- throw new IllegalArgumentException("ObjectFileStorage.read["+file.getLabel()+"]: Bad read: location="+loc+" >= max="+(filesize+writeBuffer.position())) ;
-
- int x = writeBuffer.position() ;
- int y = writeBuffer.limit() ;
-
- int offset = (int)(loc-filesize) ;
- int len = writeBuffer.getInt(offset) ;
- int posn = offset + SizeOfInt ;
- // Slice the data bytes,
- writeBuffer.position(posn) ;
- writeBuffer.limit(posn+len) ;
- ByteBuffer bb = writeBuffer.slice() ;
- writeBuffer.limit(y) ;
- writeBuffer.position(x) ;
- return bb ;
+ // This path should be uncommon.
+ synchronized(lockWriteBuffer) {
+ if ( loc >= filesize+writeBuffer.position() )
+ throw new IllegalArgumentException("ObjectFileStorage.read["+file.getLabel()+"]: Bad read: location="+loc+" >= max="+(filesize+writeBuffer.position())) ;
+ int offset = (int)(loc-filesize) ;
+ int len = writeBuffer.getInt(offset) ;
+ int posn = offset + SizeOfInt ;
+ ByteBuffer bb1 = ByteBuffer.allocate(len) ;
+ for (int i = 0; i < len; i++)
+ bb1.put(i, writeBuffer.get(posn+i));
+ return bb1 ;
+ }
}
// No - it's in the underlying file storage.
+ // XXX Need to make this safe.
+ // XXX Length buffer
+ ByteBuffer lengthBuffer = ByteBuffer.allocate(SizeOfInt) ;
+
lengthBuffer.clear() ;
int x = file.read(lengthBuffer, loc) ;
- if ( x != 4 )
+ if ( x != 4 ) {
+ String msg = "ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]: Failed to read the length : got "+x+" bytes";
+ System.err.println(msg) ;
+ lengthBuffer.clear() ;
+ int x1 = file.read(lengthBuffer, loc) ;
throw new FileException("ObjectFileStorage.read["+file.getLabel()+"]("+loc+")[filesize="+filesize+"][file.size()="+file.size()+"]: Failed to read the length : got "+x+" bytes") ;
+ }
int len = lengthBuffer.getInt(0) ;
// Sanity check.
if ( len > filesize-(loc+SizeOfInt) )
http://git-wip-us.apache.org/repos/asf/jena/blob/ab8d0f4e/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
----------------------------------------------------------------------
diff --git a/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java b/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
index a4f6939..240a12e 100644
--- a/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
+++ b/jena-tdb/src/main/java/org/apache/jena/tdb/lib/NodeLib.java
@@ -36,7 +36,6 @@ import org.apache.jena.graph.Node ;
import org.apache.jena.riot.out.NodeFmtLib ;
import org.apache.jena.sparql.util.NodeUtils ;
import org.apache.jena.tdb.TDBException ;
-import org.apache.jena.tdb.base.block.Block ;
import org.apache.jena.tdb.base.objectfile.ObjectFile ;
import org.apache.jena.tdb.base.record.Record ;
import org.apache.jena.tdb.store.Hash ;
@@ -53,26 +52,32 @@ public class NodeLib
// Characters in IRIs that are illegal and cause SSE problems, but we wish to keep.
final private static char MarkerChar = '_' ;
final private static char[] invalidIRIChars = { MarkerChar , ' ' } ;
+ final private static int SIZE = 1024;
+ // Marshalling space.
+ final private static ByteBuffer workspace = ByteBuffer.allocate(SIZE);
+ /** Encode and write a {@link Node} to the {@link ObjectFile}.
+ * Returns the location, suitable for use with {@link #fetchDecode}.
+ * <p>
+ * Callers must synchonize to ensure writing is not concurrent.
+ */
public static long encodeStore(Node node, ObjectFile file)
{
- // Buffer pool?
-
- // Nodes can be writtern during reads.
- // Make sure this operation is sync'ed.
int maxSize = nodec.maxSize(node) ;
- Block block = file.allocWrite(maxSize) ;
- try {
- int len = nodec.encode(node, block.getByteBuffer(), null) ;
- file.completeWrite(block) ;
- return block.getId() ;
- } catch (TDBException ex)
- {
- file.abortWrite(block) ;
- throw ex ;
- }
+ ByteBuffer bb = workspace;
+ if ( maxSize >= SIZE )
+ // Large object. Special buffer.
+ bb = ByteBuffer.allocate(maxSize);
+ else
+ bb.clear();
+ int len = nodec.encode(node, bb, null) ;
+ long x = file.write(bb);
+ return x;
}
+ /** Read and decode a {@link Node} from the {@link ObjectFile}.
+ * The {@code id} must have originally been generated by {@link #encodeStore}.
+ */
public static Node fetchDecode(long id, ObjectFile file)
{
ByteBuffer bb = file.read(id) ;