You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by sa...@apache.org on 2013/10/25 03:03:38 UTC
svn commit: r1535599 - in
/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb: ./ base/block/
base/file/ transaction/
Author: sallen
Date: Fri Oct 25 01:03:38 2013
New Revision: 1535599
URL: http://svn.apache.org/r1535599
Log:
JENA-567 Add option to TDB for specifying where the temporary write blocks for the BlockMgrJournal come from. Also added a memory mapped ByteBuffer allocator to work with the new BlockMgrJournal functionality.
Added:
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java
Modified:
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java Fri Oct 25 01:03:38 2013
@@ -87,6 +87,18 @@ public class TDB
/** Symbol to use the union of named graphs as the default graph of a query */
public static final Symbol symUnionDefaultGraph = SystemTDB.allocSymbol("unionDefaultGraph") ;
+ /**
+ * A String enum Symbol that specifies the type of temporary storage for transaction journal write blocks.
+ * <p/>
+ * "mem" = Java heap memory (default)
+ * <br/>
+ * "direct" = Process heap memory
+ * <br/>
+ * "mapped" = Memory mapped temporary file
+ * <br/>
+ */
+ public static final Symbol transactionJournalWriteBlockMode = SystemTDB.allocSymbol("transactionJournalWriteBlockMode") ;
+
public static Context getContext() { return ARQ.getContext() ; }
// Called on assembler loading.
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java Fri Oct 25 01:03:38 2013
@@ -123,16 +123,27 @@ public final class Block
return String.format("Block: %d %s", id, str) ;
}
- /** Deep copy, including ByteBuffer contents. */
+ /** Deep copy, including ByteBuffer contents into a HeapByteBuffer. */
public Block replicate()
{
- ByteBuffer dstBuffer = replicate(getByteBuffer()) ;
+ ByteBuffer dstBuffer = ByteBuffer.allocate(getByteBuffer().capacity());
+ return replicate(dstBuffer);
+ }
+
+ /**
+ * Deep copy, including ByteBuffer contents, using the supplied ByteBuffer to hold the contents and
+ * to be used when constructing the new Block. The capacity of the supplied ByteBuffer must be equal
+ * to or greater than this block's capacity.
+ */
+ public Block replicate(ByteBuffer dstBuffer)
+ {
+ replicateByteBuffer(getByteBuffer(), dstBuffer) ;
Block b = new Block(getId(), dstBuffer) ;
b.modified = modified ;
b.readOnly = readOnly ;
// b.blockRef = null ;
return b ;
- }
+ }
public static void replicate(Block srcBlock, Block dstBlock)
{
@@ -141,10 +152,8 @@ public final class Block
replicate(srcBlock.getByteBuffer(), dstBlock.getByteBuffer()) ;
}
- private static ByteBuffer replicate(ByteBuffer srcBlk)
+ private static ByteBuffer replicateByteBuffer(ByteBuffer srcBlk, ByteBuffer dstBlk)
{
- ByteBuffer dstBlk = ByteBuffer.allocate(srcBlk.capacity()) ;
-
int x = srcBlk.position() ;
int y = srcBlk.limit() ;
srcBlk.clear() ;
Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.atlas.lib.Closeable ;
+
+/**
+ * An allocator for retrieving ByteBuffers of a given size.
+ */
+public interface BufferAllocator extends Closeable
+{
+ /**
+ * Allocate and return a ByteBuffer of the given size
+ * @param capacity the desired size of the ByteBuffer
+ * @return a ByteBuffer with the capacity set to the desired size
+ */
+ public ByteBuffer allocate(int capacity);
+
+ /**
+ * Call this method when you are finished with all of the ByteBuffers
+ * retrieved from allocate. The BufferAllocator is then free to reuse
+ * memory that was previously handed out.
+ */
+ public void clear();
+}
Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,28 @@
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.nio.ByteBuffer ;
+
+/**
+ * Delegates to {@link ByteBuffer#allocateDirect(int)}.
+ */
+public class BufferAllocatorDirect implements BufferAllocator
+{
+ @Override
+ public ByteBuffer allocate(int capacity)
+ {
+ return ByteBuffer.allocateDirect(capacity);
+ }
+
+ @Override
+ public void clear()
+ {
+ // Do nothing
+ }
+
+ @Override
+ public void close()
+ {
+ // Do nothing
+ }
+
+}
Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.io.File ;
+import java.io.IOException ;
+import java.nio.ByteBuffer ;
+import java.nio.MappedByteBuffer ;
+import java.nio.channels.FileChannel.MapMode ;
+import java.util.ArrayList ;
+import java.util.List ;
+import java.util.UUID ;
+
+import com.hp.hpl.jena.tdb.sys.SystemTDB ;
+
+/**
+ * ByteBuffer access to a temporary file, using memory mapped I/O. The file will
+ * grow in chunks as necessary by the SystemTDB.SegmentSize.
+ * <p/>
+ * This class is not thread-safe.
+ */
+final public class BufferAllocatorMapped implements BufferAllocator
+{
+ private final List<MappedByteBuffer> segments;
+
+ private final int segmentSize = SystemTDB.SegmentSize;
+ private final int blockSize;
+ private final int blocksPerSegment;
+
+ private final File tmpFile;
+ private FileBase file;
+ private int seq = 0;
+
+ public BufferAllocatorMapped(int blockSize)
+ {
+ if (blockSize == 0 || blockSize > segmentSize)
+ throw new IllegalArgumentException("Illegal block size: " + blockSize);
+ if (segmentSize % blockSize != 0)
+ throw new IllegalArgumentException(String.format("BufferAllocatorMapped: Segement size(%d) not a multiple of blocksize (%d)", segmentSize, blockSize)) ;
+
+ this.blockSize = blockSize;
+ blocksPerSegment = segmentSize/blockSize ;
+ segments = new ArrayList<MappedByteBuffer>();
+
+ tmpFile = getNewTemporaryFile();
+ tmpFile.deleteOnExit();
+ }
+
+ /**
+ * Returns a handle to a temporary file. Does not actually create the file on disk.
+ */
+ private final File getNewTemporaryFile()
+ {
+ File sysTempDir = new File(System.getProperty("java.io.tmpdir")) ;
+ File tmpFile = new File(sysTempDir, "JenaTempByteBuffer-" + UUID.randomUUID().toString() + ".tmp") ;
+ return tmpFile ;
+ }
+
+ private final int segment(int id) { return id/blocksPerSegment ; }
+ private final int byteOffset(int id) { return (id%blocksPerSegment)*blockSize ; }
+ private final long fileLocation(long segmentNumber) { return segmentNumber*segmentSize ; }
+
+ @Override
+ public ByteBuffer allocate(int blkSize)
+ {
+ if ( blkSize != this.blockSize )
+ throw new FileException("Fixed blocksize only: request= "+blkSize+"fixed size="+this.blockSize) ;
+
+ // Create the file lazily
+ if (null == file)
+ file = FileBase.create(tmpFile.getPath());
+
+ // Get and increment the id
+ int id = seq++;
+ int seg = segment(id);
+ int segOff = byteOffset(id);
+
+ MappedByteBuffer segBuffer;
+ // See if we need to grow the file
+ if (seg >= segments.size())
+ {
+ try
+ {
+ long offset = fileLocation(seg);
+ segBuffer = file.channel().map(MapMode.READ_WRITE, offset, segmentSize) ;
+ segments.add(segBuffer);
+ }
+ catch (IOException e)
+ {
+ throw new FileException("MappedFile.allocate: Segment= " + seg, e);
+ }
+ }
+ else
+ {
+ segBuffer = segments.get(seg);
+ }
+
+ segBuffer.position(segOff);
+ segBuffer.limit(segOff + blockSize);
+
+ ByteBuffer toReturn = segBuffer.slice();
+
+ segBuffer.limit(segBuffer.capacity());
+
+ return toReturn;
+ }
+
+ @Override
+ public void clear()
+ {
+ // Just reset to the start of the file, we'll allocate overtop of the old memory
+ seq = 0;
+ }
+
+ @Override
+ public void close()
+ {
+ // There is no unmap operation for MappedByteBuffers.
+ // Sun Bug id bug_id=4724038
+ // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038
+ clear();
+ segments.clear();
+ file.close();
+ file = null;
+
+ // May not delete on Windows :/
+ tmpFile.delete();
+ }
+}
Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,28 @@
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.nio.ByteBuffer ;
+
+/**
+ * Delegates to {@link ByteBuffer#allocate(int)}.
+ */
+public class BufferAllocatorMem implements BufferAllocator
+{
+ @Override
+ public ByteBuffer allocate(int capacity)
+ {
+ return ByteBuffer.allocate(capacity);
+ }
+
+ @Override
+ public void clear()
+ {
+ // Do nothing
+ }
+
+ @Override
+ public void close()
+ {
+ // Do nothing
+ }
+
+}
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java Fri Oct 25 01:03:38 2013
@@ -18,6 +18,7 @@
package com.hp.hpl.jena.tdb.transaction;
+import java.nio.ByteBuffer ;
import java.util.HashMap ;
import java.util.HashSet ;
import java.util.Iterator ;
@@ -29,10 +30,17 @@ import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import com.hp.hpl.jena.query.ReadWrite ;
+import com.hp.hpl.jena.sparql.util.Context ;
+import com.hp.hpl.jena.tdb.TDB ;
import com.hp.hpl.jena.tdb.base.block.Block ;
import com.hp.hpl.jena.tdb.base.block.BlockException ;
import com.hp.hpl.jena.tdb.base.block.BlockMgr ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocator ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocatorDirect ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocatorMapped ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocatorMem ;
import com.hp.hpl.jena.tdb.sys.FileRef ;
+import com.hp.hpl.jena.tdb.sys.SystemTDB ;
public class BlockMgrJournal implements BlockMgr, TransactionLifecycle
{
@@ -41,15 +49,32 @@ public class BlockMgrJournal implements
private Transaction transaction ;
private FileRef fileRef ;
- final private Set<Long> readBlocks = new HashSet<Long>() ;
- final private Set<Long> iteratorBlocks = new HashSet<Long>() ;
- final private Map<Long, Block> writeBlocks = new HashMap<Long, Block>() ;
- final private Map<Long, Block> freedBlocks = new HashMap<Long, Block>() ;
+ private final BufferAllocator writeBlockBufferAllocator ;
+
+ private final Set<Long> readBlocks = new HashSet<Long>() ;
+ private final Set<Long> iteratorBlocks = new HashSet<Long>() ;
+ private final Map<Long, Block> writeBlocks = new HashMap<Long, Block>() ;
+ private final Map<Long, Block> freedBlocks = new HashMap<Long, Block>() ;
private boolean closed = false ;
private boolean active = false ; // In a transaction, or preparing.
public BlockMgrJournal(Transaction txn, FileRef fileRef, BlockMgr underlyingBlockMgr)
{
+ Context context = txn.getBaseDataset().getContext() ;
+ String mode = (null != context) ? (String) context.get(TDB.transactionJournalWriteBlockMode, "") : "" ;
+ if ("direct".equalsIgnoreCase(mode))
+ {
+ writeBlockBufferAllocator = new BufferAllocatorDirect() ;
+ }
+ else if ("mapped".equalsIgnoreCase(mode))
+ {
+ writeBlockBufferAllocator = new BufferAllocatorMapped(SystemTDB.BlockSize) ;
+ }
+ else
+ {
+ writeBlockBufferAllocator = new BufferAllocatorMem() ;
+ }
+
reset(txn, fileRef, underlyingBlockMgr) ;
if ( txn.getMode() == ReadWrite.READ && underlyingBlockMgr instanceof BlockMgrJournal )
System.err.println("Two level BlockMgrJournal") ;
@@ -108,8 +133,9 @@ public class BlockMgrJournal implements
this.iteratorBlocks.clear() ;
this.writeBlocks.clear() ;
this.freedBlocks.clear() ;
+ this.writeBlockBufferAllocator.clear() ;
}
-
+
@Override
public Block allocate(int blockSize)
{
@@ -121,7 +147,7 @@ public class BlockMgrJournal implements
// But we "copy" it by allocating ByteBuffer space.
if ( active )
{
- block = block.replicate( ) ;
+ block = replicate(block) ;
writeBlocks.put(block.getId(), block) ;
}
return block ;
@@ -192,7 +218,7 @@ public class BlockMgrJournal implements
private Block _promote(Block block)
{
checkActive() ;
- block = block.replicate() ;
+ block = replicate(block) ;
writeBlocks.put(block.getId(), block) ;
return block ;
}
@@ -256,6 +282,7 @@ public class BlockMgrJournal implements
@Override
public void close()
{
+ writeBlockBufferAllocator.close() ;
closed = true ;
}
@@ -348,4 +375,11 @@ public class BlockMgrJournal implements
@Override
public String getLabel() { return fileRef.getFilename() ; }
+
+ // Our own replicate method that gets the destination ByteBuffer from our allocator instead of always the heap
+ private Block replicate(Block srcBlock)
+ {
+ ByteBuffer dstBuffer = writeBlockBufferAllocator.allocate(srcBlock.getByteBuffer().capacity()) ;
+ return srcBlock.replicate(dstBuffer) ;
+ }
}
Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java Fri Oct 25 01:03:38 2013
@@ -168,7 +168,18 @@ class Journal implements Sync, Closeable
// Write all bytes
channel.write(buffer) ;
- adler.update(buffer.array()) ;
+ if (buffer.hasArray())
+ {
+ adler.update(buffer.array()) ;
+ }
+ else
+ {
+ byte[] data = new byte[bufferCapacity] ;
+ buffer.position(0) ;
+ buffer.limit(bufferCapacity) ;
+ buffer.get(data) ;
+ adler.update(data) ;
+ }
buffer.position(bufferPosition) ;
buffer.limit(bufferLimit) ;