You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2019/03/22 12:15:35 UTC

[jena] branch master updated: JENA-1685: Make code safer for multithreaded use. Add comments.

This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/master by this push:
     new dc546ac  JENA-1685: Make code safer for multithreaded use. Add comments.
     new f48c1f1  Merge pull request #544 from afs/tdb2-bad-term
dc546ac is described below

commit dc546ac419c07bd49281b5b4953c3673fad00077
Author: Andy Seaborne <an...@apache.org>
AuthorDate: Wed Mar 20 00:01:25 2019 +0000

    JENA-1685: Make code safer for multithreaded use. Add comments.
---
 .../base/file/BinaryDataFileWriteBuffered.java     | 46 +++++++++-----
 .../jena/dboe/trans/data/TransBinaryDataFile.java  | 72 +++++++++++++---------
 .../jena/tdb2/store/nodetable/NodeTableTRDF.java   |  4 +-
 3 files changed, 76 insertions(+), 46 deletions(-)

diff --git a/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java b/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java
index cf51597..c4efb3e 100644
--- a/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java
+++ b/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java
@@ -21,7 +21,7 @@ package org.apache.jena.dboe.base.file;
 import org.apache.jena.atlas.RuntimeIOException;
 
 /** Implementation of {@link BinaryDataFile} adding write buffering to another
- * {@link BinaryDataFile} file such as a {@link BinaryDataFileRandomAccess}.
+ * {@link BinaryDataFile} file, such as a {@link BinaryDataFileRandomAccess}.
  *  <li>Thread-safe.
  *  <li>No read buffering provided.
  *  <li>The write buffer is flushed when switching to read.
@@ -30,9 +30,8 @@ import org.apache.jena.atlas.RuntimeIOException;
 public class BinaryDataFileWriteBuffered implements BinaryDataFile {
     private static final int SIZE = 128*1024;
     private final Object sync = new Object();
-    private byte[] buffer;
-    private int bufferLength;
-    private boolean pendingOutput;
+    private final byte[] buffer;
+    private volatile int bufferLength;
     private final BinaryDataFile other;
 
     public BinaryDataFileWriteBuffered(BinaryDataFile other) {
@@ -42,6 +41,7 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
     public BinaryDataFileWriteBuffered(BinaryDataFile other, int bufferSize) {
         this.other = other;
         buffer = new byte[bufferSize];
+        bufferLength = 0;
     }
 
     @Override
@@ -49,7 +49,6 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
         synchronized(sync) {
             other.open();
             bufferLength = 0;
-            pendingOutput = false;
         }
     }
 
@@ -81,8 +80,26 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
     public void truncate(long posn) {
         synchronized(sync) {
             checkOpen();
-            if ( pendingOutput && posn >= other.length() )
-                writeBuffer();
+            long otherLen = other.length();
+            if ( bufferLength > 0) { 
+                if ( posn >= otherLen ) {
+                    long bufLen = posn-otherLen;
+                    if ( bufLen < bufferLength ) {
+                        // If truncate is in the buffer area, just truncate the write buffer....
+                        bufferLength = (int)bufLen;
+                        return;
+                    }
+                    // Off the top end.
+                    // Write and do a real truncate so the underlying meaning of "truncate
+                    // above the current end" is used.
+                    writeBuffer();
+                    // and truncate "other".
+                } else {
+                    // Forget buffer.
+                    bufferLength = 0;
+                    // and truncate "other".
+                }
+            }
             other.truncate(posn);
         }
     }
@@ -112,21 +129,23 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
 
 //        if ( false ) {
 //            // No buffering
-//            try { file.write(buf, off, len); }
+//            try { other.write(buf, off, len); }
 //            catch (IOException e) { IO.exception(e); }
 //            bufferLength = 0;
 //            return;
 //        }
 
-            // No room.
-            if ( bufferLength + len >= SIZE )
+            // If no room, flush buffer.
+            if ( bufferLength + len > SIZE ) {
                 writeBuffer();
+                // bufferLength set to zero.
+            }
 
-            if ( bufferLength + len < SIZE ) {
+            // Is there room now?
+            if ( bufferLength + len <= SIZE ) {
                 // Room to buffer
                 System.arraycopy(buf, off, buffer, bufferLength, len);
                 bufferLength += len;
-                pendingOutput = true;
                 return x;
             }
             // Larger than the buffer space.  Write directly.
@@ -144,8 +163,7 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
     }
 
     private void writeBuffer() {
-        if ( pendingOutput ) {
-            pendingOutput = false;
+        if ( bufferLength > 0 ) {
             other.write(buffer, 0, bufferLength);
             bufferLength = 0;
         }
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
index ef0507a..cc44a32 100644
--- a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
@@ -18,14 +18,20 @@
 
 package org.apache.jena.dboe.trans.data;
 
+import static java.lang.String.format;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.jena.atlas.RuntimeIOException;
 import org.apache.jena.atlas.io.IO;
+import org.apache.jena.atlas.logging.Log;
 import org.apache.jena.dboe.base.file.BinaryDataFile;
 import org.apache.jena.dboe.base.file.BufferChannel;
-import org.apache.jena.dboe.transaction.txn.*;
+import org.apache.jena.dboe.transaction.txn.ComponentId;
+import org.apache.jena.dboe.transaction.txn.StateMgrData;
+import org.apache.jena.dboe.transaction.txn.TransactionalComponentLifecycle;
+import org.apache.jena.dboe.transaction.txn.TxnId;
 import org.apache.jena.query.ReadWrite;
 
 /** Transactional {@link BinaryDataFile}.
@@ -38,18 +44,19 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
     implements BinaryDataFile {
 
     /*
-     * The file is written to as we go along so abort requires some action. We
-     * can't recover from just the file, without any redo or undo recovery
-     * action.
+     * The file is written to as we go along but we might need to abort
+     * and reset to the starting state of a transaction, which is TxnBinFile
+     * or set a new prepare/commit intended state which is FileState
+     * as a ByteBuffer.
      */
-
-    private final FileState stateMgr;
+    private final FileState fileState;
 
     // The current committed position and the limit as seen by readers.
     // This is also the abort point.
+    // Global.s
     private final AtomicLong committedLength;
 
-    // The per thread runtime state
+    // The state of the file visible outside the transaction.
     static class TxnBinFile {
         final long length;
 
@@ -58,7 +65,9 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
         }
     }
 
-    // Prepare record
+    // This is the state of the file ahead of time at prepare/commit.
+    // It is set on prepare.
+    // It is only valid on the transaction thread in the transaction lifecycle.
     static class FileState extends StateMgrData {
         FileState(BufferChannel bufferChannel, long length, long position) {
             super(bufferChannel, length, position);
@@ -75,7 +84,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
      */
     public TransBinaryDataFile(BinaryDataFile binFile, ComponentId cid, BufferChannel bufferChannel) {
         super(cid);
-        stateMgr = new FileState(bufferChannel, 0L, 0L);
+        fileState = new FileState(bufferChannel, 0L, 0L);
         this.binFile = binFile;
         if ( ! binFile.isOpen() )
             binFile.open();
@@ -93,8 +102,8 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
 
     @Override
     public void recover(ByteBuffer ref) {
-        stateMgr.setState(ref);
-        committedLength.set(stateMgr.length());
+        fileState.setState(ref);
+        committedLength.set(fileState.length());
         recoveryAction = true;
     }
 
@@ -104,7 +113,8 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
             long length = committedLength.get();
             binFile.truncate(length);
             binFile.sync();
-            committedLength.set(length);
+            committedLength.set(binFile.length());
+            recoveryAction = false;
         }
     }
 
@@ -113,8 +123,6 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
 
     @Override
     protected TxnBinFile _begin(ReadWrite readWrite, TxnId txnId) {
-        // Atomic read across the two because it's called from within
-        // TransactionCoordinator.begin$ where there is a lock.
         return createState();
     }
 
@@ -124,36 +132,42 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
     }
 
     @Override
-    protected TxnBinFile _promote(TxnId txnId, TxnBinFile state) {
+    protected TxnBinFile _promote(TxnId txnId, TxnBinFile txnResetState) {
+        // New state object (may be PROMOTE-READ-COMMITTED and not the same as passed in).
         return createState();
     }
 
     @Override
-    protected ByteBuffer _commitPrepare(TxnId txnId, TxnBinFile state) {
-        // Force to disk but do not set the on disk state to record that.
+    protected ByteBuffer _commitPrepare(TxnId txnId, TxnBinFile txnResetState) {
+        // Force to disk but do not set the on-disk state to record that.
         binFile.sync();
-        stateMgr.length(binFile.length());
-        return stateMgr.getState();
+        fileState.length(binFile.length());
+        return fileState.getState();
     }
 
     @Override
-    protected void _commit(TxnId txnId, TxnBinFile state) {
+    protected void _commit(TxnId txnId, TxnBinFile txnResetState) {
         if ( isWriteTxn() ) {
-            // Force to disk happens in _commitPrepare
-            stateMgr.writeState();
-            // Move visible commit point forward (not strictly necessary - transaction is ending.
+            // Force data to disk happens in _commitPrepare
+            fileState.writeState();
             committedLength.set(binFile.length());
         }
     }
 
     @Override
     protected void _commitEnd(TxnId txnId, TxnBinFile state) {
+        // No clearup needed.
     }
 
     @Override
-    protected void _abort(TxnId txnId, TxnBinFile state) {
+    protected void _abort(TxnId txnId, TxnBinFile txnResetState) {
         if ( isWriteTxn() ) {
-            binFile.truncate(committedLength.get());
+            long x = committedLength.get();
+            // Internal consistency check.
+            // (Abort after commit would trigger the warning.) 
+            if ( txnResetState.length != x )
+                Log.warn(this, format("Mismatch: state.length = %d,  committedLength = %d", txnResetState.length != x));
+            binFile.truncate(x);
             binFile.sync();
         }
     }
@@ -164,8 +178,6 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
     @Override
     protected void _shutdown() {}
 
-    private void checkBoundsReader(long requestedPoint, TxnBinFile state) { }
-
     @Override
     public void open() {
         if ( ! binFile.isOpen() )
@@ -187,7 +199,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
 
     private void checkRead(long posn) {
         if ( posn > getDataState().length )
-            IO.exception("Out of bounds: (limit "+getDataState().length+")"+posn);
+            IO.exception("Out of bounds: (limit "+getDataState().length+") "+posn);
     }
 
     @Override
@@ -198,7 +210,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
 
     /**
      * Truncate only supported for an abort - this transactional version of
-     * BinaryDataFile will not truncate to earlier than the committed length.
+     * {@link BinaryDataFile} will not truncate to earlier than the committed length.
      */
     @Override
     public void truncate(long size) {
@@ -217,7 +229,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
 
     @Override
     public void close() {
-        stateMgr.close();
+        fileState.close();
         binFile.close();
     }
 
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java
index 6dec310..1e8ccb6 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java
@@ -36,8 +36,8 @@ import org.apache.thrift.protocol.TProtocol ;
 
 public class NodeTableTRDF extends NodeTableNative {
     // Write buffering is done in the underlying BinaryDataFile
-    BinaryDataFile diskFile ;
-    private TReadAppendFileTransport transport ;
+    private final BinaryDataFile diskFile ;
+    private final TReadAppendFileTransport transport ;
     private final TProtocol protocol ;
 
     public NodeTableTRDF(Index nodeToId, BinaryDataFile objectFile) {