You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by sa...@apache.org on 2013/10/25 03:03:38 UTC

svn commit: r1535599 - in /jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb: ./ base/block/ base/file/ transaction/

Author: sallen
Date: Fri Oct 25 01:03:38 2013
New Revision: 1535599

URL: http://svn.apache.org/r1535599
Log:
JENA-567 Add option to TDB for specifying where the temporary write blocks for the BlockMgrJournal come from.  Also added a memory mapped ByteBuffer allocator to work with the new BlockMgrJournal functionality.

Added:
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java
Modified:
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
    jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/TDB.java Fri Oct 25 01:03:38 2013
@@ -87,6 +87,18 @@ public class TDB
     /** Symbol to use the union of named graphs as the default graph of a query */ 
     public static final Symbol symUnionDefaultGraph          = SystemTDB.allocSymbol("unionDefaultGraph") ;
     
+    /**
+     * A String enum Symbol that specifies the type of temporary storage for transaction journal write blocks.
+     * <p/>
+     * "mem" = Java heap memory (default)
+     * <br/>
+     * "direct" = Process heap memory
+     * <br/>
+     * "mapped" = Memory mapped temporary file
+     * <br/>
+     */
+    public static final Symbol transactionJournalWriteBlockMode = SystemTDB.allocSymbol("transactionJournalWriteBlockMode") ;
+    
     public static Context getContext()     { return ARQ.getContext() ; }  
     
     // Called on assembler loading.

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/block/Block.java Fri Oct 25 01:03:38 2013
@@ -123,16 +123,27 @@ public final class Block
         return String.format("Block: %d %s", id, str) ;
     }
     
-    /** Deep copy, including ByteBuffer contents. */
+    /** Deep copy, including ByteBuffer contents into a HeapByteBuffer. */
     public Block replicate()
     {
-        ByteBuffer dstBuffer = replicate(getByteBuffer()) ;
+        ByteBuffer dstBuffer = ByteBuffer.allocate(getByteBuffer().capacity());
+        return replicate(dstBuffer);
+    }
+    
+    /**
+     * Deep copy, including ByteBuffer contents, using the supplied ByteBuffer to hold the contents and
+     * to be used when constructing the new Block.  The capacity of the supplied ByteBuffer must be equal
+     * to or greater than this block's capacity.
+     */
+    public Block replicate(ByteBuffer dstBuffer)
+    {
+        replicateByteBuffer(getByteBuffer(), dstBuffer) ;
         Block b = new Block(getId(), dstBuffer) ;
         b.modified = modified ;
         b.readOnly = readOnly ;
 //        b.blockRef = null ;
         return b ;
-    }  
+    }
 
     public static void replicate(Block srcBlock, Block dstBlock)
     {
@@ -141,10 +152,8 @@ public final class Block
         replicate(srcBlock.getByteBuffer(), dstBlock.getByteBuffer()) ;
     }  
 
-    private static ByteBuffer replicate(ByteBuffer srcBlk)
+    private static ByteBuffer replicateByteBuffer(ByteBuffer srcBlk, ByteBuffer dstBlk)
     {
-        ByteBuffer dstBlk = ByteBuffer.allocate(srcBlk.capacity()) ;
-        
         int x = srcBlk.position() ;
         int y = srcBlk.limit() ;
         srcBlk.clear() ;

Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocator.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.nio.ByteBuffer ;
+
+import org.apache.jena.atlas.lib.Closeable ;
+
+/**
+ * An allocator for retrieving ByteBuffers of a given size.
+ */
+public interface BufferAllocator extends Closeable
+{
+    /**
+     * Allocate and return a ByteBuffer of the given size
+     * @param capacity the desired size of the ByteBuffer
+     * @return a ByteBuffer with the capacity set to the desired size
+     */
+    public ByteBuffer allocate(int capacity);
+    
+    /**
+     * Call this method when you are finished with all of the ByteBuffers
+     * retrieved from allocate.  The BufferAllocator is then free to reuse
+     * memory that was previously handed out.
+     */
+    public void clear();
+}

Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorDirect.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,28 @@
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.nio.ByteBuffer ;
+
+/**
+ * Delegates to {@link ByteBuffer#allocateDirect(int)}.
+ */
+public class BufferAllocatorDirect implements BufferAllocator
+{
+    @Override
+    public ByteBuffer allocate(int capacity)
+    {
+        return ByteBuffer.allocateDirect(capacity);
+    }
+
+    @Override
+    public void clear()
+    {
+        // Do nothing
+    }
+    
+    @Override
+    public void close()
+    {
+        // Do nothing
+    }
+
+}

Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMapped.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.io.File ;
+import java.io.IOException ;
+import java.nio.ByteBuffer ;
+import java.nio.MappedByteBuffer ;
+import java.nio.channels.FileChannel.MapMode ;
+import java.util.ArrayList ;
+import java.util.List ;
+import java.util.UUID ;
+
+import com.hp.hpl.jena.tdb.sys.SystemTDB ;
+
+/**
+ * ByteBuffer access to a temporary file, using memory mapped I/O.  The file will
+ * grow in chunks as necessary by the SystemTDB.SegmentSize.
+ * <p/>
+ * This class is not thread-safe.
+ */
+final public class BufferAllocatorMapped implements BufferAllocator
+{
+    private final List<MappedByteBuffer> segments;
+
+    private final int segmentSize = SystemTDB.SegmentSize;
+    private final int blockSize;
+    private final int blocksPerSegment;
+    
+    private final File tmpFile;
+    private FileBase file;
+    private int seq = 0;
+    
+    public BufferAllocatorMapped(int blockSize)
+    {
+        if (blockSize == 0 || blockSize > segmentSize)
+            throw new IllegalArgumentException("Illegal block size: " + blockSize);
+        if (segmentSize % blockSize != 0)
+            throw new IllegalArgumentException(String.format("BufferAllocatorMapped: Segement size(%d) not a multiple of blocksize (%d)", segmentSize, blockSize)) ;
+        
+        this.blockSize = blockSize;
+        blocksPerSegment = segmentSize/blockSize ;
+        segments = new ArrayList<MappedByteBuffer>();
+        
+        tmpFile = getNewTemporaryFile();
+        tmpFile.deleteOnExit();
+    }
+    
+    /**
+     * Returns a handle to a temporary file.  Does not actually create the file on disk.
+     */
+    private final File getNewTemporaryFile()
+    {
+        File sysTempDir = new File(System.getProperty("java.io.tmpdir")) ;
+        File tmpFile = new File(sysTempDir, "JenaTempByteBuffer-" + UUID.randomUUID().toString() + ".tmp") ;
+        return tmpFile ;
+    }
+    
+    private final int segment(int id)                   { return id/blocksPerSegment ; }
+    private final int byteOffset(int id)                { return (id%blocksPerSegment)*blockSize ; }
+    private final long fileLocation(long segmentNumber) { return segmentNumber*segmentSize ; }
+    
+    @Override
+    public ByteBuffer allocate(int blkSize)
+    {
+        if ( blkSize != this.blockSize )
+            throw new FileException("Fixed blocksize only: request= "+blkSize+"fixed size="+this.blockSize) ;
+        
+        // Create the file lazily
+        if (null == file)
+            file = FileBase.create(tmpFile.getPath());
+        
+        // Get and increment the id
+        int id = seq++;
+        int seg = segment(id);
+        int segOff = byteOffset(id);
+        
+        MappedByteBuffer segBuffer;
+        // See if we need to grow the file
+        if (seg >= segments.size())
+        {
+            try
+            {
+                long offset = fileLocation(seg);
+                segBuffer = file.channel().map(MapMode.READ_WRITE, offset, segmentSize) ;
+                segments.add(segBuffer);
+            }
+            catch (IOException e)
+            {
+                throw new FileException("MappedFile.allocate: Segment= " + seg, e);
+            }
+        }
+        else
+        {
+            segBuffer = segments.get(seg);
+        }
+        
+        segBuffer.position(segOff);
+        segBuffer.limit(segOff + blockSize);
+        
+        ByteBuffer toReturn = segBuffer.slice();
+        
+        segBuffer.limit(segBuffer.capacity());
+        
+        return toReturn;
+    }
+    
+    @Override
+    public void clear()
+    {
+        // Just reset to the start of the file, we'll allocate overtop of the old memory
+        seq = 0;
+    }
+    
+    @Override
+    public void close()
+    {
+        // There is no unmap operation for MappedByteBuffers.
+        // Sun Bug id bug_id=4724038
+        // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4724038
+        clear();
+        segments.clear();
+        file.close();
+        file = null;
+        
+        // May not delete on Windows :/
+        tmpFile.delete();
+    }
+}

Added: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java?rev=1535599&view=auto
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java (added)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/base/file/BufferAllocatorMem.java Fri Oct 25 01:03:38 2013
@@ -0,0 +1,28 @@
+package com.hp.hpl.jena.tdb.base.file;
+
+import java.nio.ByteBuffer ;
+
+/**
+ * Delegates to {@link ByteBuffer#allocate(int)}.
+ */
+public class BufferAllocatorMem implements BufferAllocator
+{
+    @Override
+    public ByteBuffer allocate(int capacity)
+    {
+        return ByteBuffer.allocate(capacity);
+    }
+
+    @Override
+    public void clear()
+    {
+        // Do nothing
+    }
+    
+    @Override
+    public void close()
+    {
+        // Do nothing
+    }
+
+}

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/BlockMgrJournal.java Fri Oct 25 01:03:38 2013
@@ -18,6 +18,7 @@
 
 package com.hp.hpl.jena.tdb.transaction;
 
+import java.nio.ByteBuffer ;
 import java.util.HashMap ;
 import java.util.HashSet ;
 import java.util.Iterator ;
@@ -29,10 +30,17 @@ import org.slf4j.Logger ;
 import org.slf4j.LoggerFactory ;
 
 import com.hp.hpl.jena.query.ReadWrite ;
+import com.hp.hpl.jena.sparql.util.Context ;
+import com.hp.hpl.jena.tdb.TDB ;
 import com.hp.hpl.jena.tdb.base.block.Block ;
 import com.hp.hpl.jena.tdb.base.block.BlockException ;
 import com.hp.hpl.jena.tdb.base.block.BlockMgr ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocator ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocatorDirect ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocatorMapped ;
+import com.hp.hpl.jena.tdb.base.file.BufferAllocatorMem ;
 import com.hp.hpl.jena.tdb.sys.FileRef ;
+import com.hp.hpl.jena.tdb.sys.SystemTDB ;
 
 public class BlockMgrJournal implements BlockMgr, TransactionLifecycle
 {
@@ -41,15 +49,32 @@ public class BlockMgrJournal implements 
     private Transaction transaction ;
     private FileRef fileRef ;
     
-    final private Set<Long> readBlocks = new HashSet<Long>() ;
-    final private Set<Long> iteratorBlocks = new HashSet<Long>() ;
-    final private Map<Long, Block> writeBlocks = new HashMap<Long, Block>() ;
-    final private Map<Long, Block> freedBlocks = new HashMap<Long, Block>() ;
+    private final BufferAllocator writeBlockBufferAllocator ;
+    
+    private final Set<Long> readBlocks = new HashSet<Long>() ;
+    private final Set<Long> iteratorBlocks = new HashSet<Long>() ;
+    private final Map<Long, Block> writeBlocks = new HashMap<Long, Block>() ;
+    private final Map<Long, Block> freedBlocks = new HashMap<Long, Block>() ;
     private boolean closed  = false ;
     private boolean active  = false ;   // In a transaction, or preparing.
     
     public BlockMgrJournal(Transaction txn, FileRef fileRef, BlockMgr underlyingBlockMgr)
     {
+        Context context = txn.getBaseDataset().getContext() ;
+        String mode = (null != context) ? (String) context.get(TDB.transactionJournalWriteBlockMode, "") : "" ;
+        if ("direct".equalsIgnoreCase(mode))
+        {
+            writeBlockBufferAllocator = new BufferAllocatorDirect() ;
+        }
+        else if ("mapped".equalsIgnoreCase(mode))
+        {
+            writeBlockBufferAllocator = new BufferAllocatorMapped(SystemTDB.BlockSize) ;
+        }
+        else
+        {
+            writeBlockBufferAllocator = new BufferAllocatorMem() ;
+        }
+        
         reset(txn, fileRef, underlyingBlockMgr) ;
         if ( txn.getMode() == ReadWrite.READ &&  underlyingBlockMgr instanceof BlockMgrJournal )
             System.err.println("Two level BlockMgrJournal") ;
@@ -108,8 +133,9 @@ public class BlockMgrJournal implements 
         this.iteratorBlocks.clear() ;
         this.writeBlocks.clear() ;
         this.freedBlocks.clear() ;
+        this.writeBlockBufferAllocator.clear() ;
     }
-                       
+    
     @Override
     public Block allocate(int blockSize)
     {
@@ -121,7 +147,7 @@ public class BlockMgrJournal implements 
         // But we "copy" it by allocating ByteBuffer space.
         if ( active ) 
         {
-            block = block.replicate( ) ;
+            block = replicate(block) ;
             writeBlocks.put(block.getId(), block) ;
         }
         return block ;
@@ -192,7 +218,7 @@ public class BlockMgrJournal implements 
     private Block _promote(Block block)
     {
         checkActive() ; 
-        block = block.replicate() ;
+        block = replicate(block) ;
         writeBlocks.put(block.getId(), block) ;
         return block ;
     }
@@ -256,6 +282,7 @@ public class BlockMgrJournal implements 
     @Override
     public void close()
     {
+        writeBlockBufferAllocator.close() ;
         closed = true ;
     }
 
@@ -348,4 +375,11 @@ public class BlockMgrJournal implements 
 
     @Override
     public String getLabel() { return fileRef.getFilename() ; }
+    
+    // Our own replicate method that gets the destination ByteBuffer from our allocator instead of always the heap
+    private Block replicate(Block srcBlock)
+    {
+        ByteBuffer dstBuffer = writeBlockBufferAllocator.allocate(srcBlock.getByteBuffer().capacity()) ;
+        return srcBlock.replicate(dstBuffer) ;
+    }
 }

Modified: jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java
URL: http://svn.apache.org/viewvc/jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java?rev=1535599&r1=1535598&r2=1535599&view=diff
==============================================================================
--- jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java (original)
+++ jena/trunk/jena-tdb/src/main/java/com/hp/hpl/jena/tdb/transaction/Journal.java Fri Oct 25 01:03:38 2013
@@ -168,7 +168,18 @@ class Journal implements Sync, Closeable
             
             // Write all bytes
             channel.write(buffer) ;
-            adler.update(buffer.array()) ;
+            if (buffer.hasArray())
+            {
+                adler.update(buffer.array()) ;
+            }
+            else
+            {
+                byte[] data = new byte[bufferCapacity] ;
+                buffer.position(0) ;
+                buffer.limit(bufferCapacity) ;
+                buffer.get(data) ;
+                adler.update(data) ;
+            }
             
             buffer.position(bufferPosition) ;
             buffer.limit(bufferLimit) ;