You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jena.apache.org by an...@apache.org on 2019/03/22 12:15:35 UTC
[jena] branch master updated: JENA-1685: Make code safer for
multithreaded use. Add comments.
This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/master by this push:
new dc546ac JENA-1685: Make code safer for multithreaded use. Add comments.
new f48c1f1 Merge pull request #544 from afs/tdb2-bad-term
dc546ac is described below
commit dc546ac419c07bd49281b5b4953c3673fad00077
Author: Andy Seaborne <an...@apache.org>
AuthorDate: Wed Mar 20 00:01:25 2019 +0000
JENA-1685: Make code safer for multithreaded use. Add comments.
---
.../base/file/BinaryDataFileWriteBuffered.java | 46 +++++++++-----
.../jena/dboe/trans/data/TransBinaryDataFile.java | 72 +++++++++++++---------
.../jena/tdb2/store/nodetable/NodeTableTRDF.java | 4 +-
3 files changed, 76 insertions(+), 46 deletions(-)
diff --git a/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java b/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java
index cf51597..c4efb3e 100644
--- a/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java
+++ b/jena-db/jena-dboe-base/src/main/java/org/apache/jena/dboe/base/file/BinaryDataFileWriteBuffered.java
@@ -21,7 +21,7 @@ package org.apache.jena.dboe.base.file;
import org.apache.jena.atlas.RuntimeIOException;
/** Implementation of {@link BinaryDataFile} adding write buffering to another
- * {@link BinaryDataFile} file such as a {@link BinaryDataFileRandomAccess}.
+ * {@link BinaryDataFile} file, such as a {@link BinaryDataFileRandomAccess}.
* <li>Thread-safe.
* <li>No read buffering provided.
* <li>The write buffer is flushed when switching to read.
@@ -30,9 +30,8 @@ import org.apache.jena.atlas.RuntimeIOException;
public class BinaryDataFileWriteBuffered implements BinaryDataFile {
private static final int SIZE = 128*1024;
private final Object sync = new Object();
- private byte[] buffer;
- private int bufferLength;
- private boolean pendingOutput;
+ private final byte[] buffer;
+ private volatile int bufferLength;
private final BinaryDataFile other;
public BinaryDataFileWriteBuffered(BinaryDataFile other) {
@@ -42,6 +41,7 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
public BinaryDataFileWriteBuffered(BinaryDataFile other, int bufferSize) {
this.other = other;
buffer = new byte[bufferSize];
+ bufferLength = 0;
}
@Override
@@ -49,7 +49,6 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
synchronized(sync) {
other.open();
bufferLength = 0;
- pendingOutput = false;
}
}
@@ -81,8 +80,26 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
public void truncate(long posn) {
synchronized(sync) {
checkOpen();
- if ( pendingOutput && posn >= other.length() )
- writeBuffer();
+ long otherLen = other.length();
+ if ( bufferLength > 0) {
+ if ( posn >= otherLen ) {
+ long bufLen = posn-otherLen;
+ if ( bufLen < bufferLength ) {
+ // If truncate is in the buffer area, just truncate the write buffer....
+ bufferLength = (int)bufLen;
+ return;
+ }
+ // Off the top end.
+ // Write and do a real truncate so the underlying meaning of "truncate
+ // above the current end" is used.
+ writeBuffer();
+ // and truncate "other".
+ } else {
+ // Forget buffer.
+ bufferLength = 0;
+ // and truncate "other".
+ }
+ }
other.truncate(posn);
}
}
@@ -112,21 +129,23 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
// if ( false ) {
// // No buffering
-// try { file.write(buf, off, len); }
+// try { other.write(buf, off, len); }
// catch (IOException e) { IO.exception(e); }
// bufferLength = 0;
// return;
// }
- // No room.
- if ( bufferLength + len >= SIZE )
+ // If no room, flush buffer.
+ if ( bufferLength + len > SIZE ) {
writeBuffer();
+ // bufferLength set to zero.
+ }
- if ( bufferLength + len < SIZE ) {
+ // Is there room now?
+ if ( bufferLength + len <= SIZE ) {
// Room to buffer
System.arraycopy(buf, off, buffer, bufferLength, len);
bufferLength += len;
- pendingOutput = true;
return x;
}
// Larger than the buffer space. Write directly.
@@ -144,8 +163,7 @@ public class BinaryDataFileWriteBuffered implements BinaryDataFile {
}
private void writeBuffer() {
- if ( pendingOutput ) {
- pendingOutput = false;
+ if ( bufferLength > 0 ) {
other.write(buffer, 0, bufferLength);
bufferLength = 0;
}
diff --git a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
index ef0507a..cc44a32 100644
--- a/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
+++ b/jena-db/jena-dboe-trans-data/src/main/java/org/apache/jena/dboe/trans/data/TransBinaryDataFile.java
@@ -18,14 +18,20 @@
package org.apache.jena.dboe.trans.data;
+import static java.lang.String.format;
+
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jena.atlas.RuntimeIOException;
import org.apache.jena.atlas.io.IO;
+import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.base.file.BinaryDataFile;
import org.apache.jena.dboe.base.file.BufferChannel;
-import org.apache.jena.dboe.transaction.txn.*;
+import org.apache.jena.dboe.transaction.txn.ComponentId;
+import org.apache.jena.dboe.transaction.txn.StateMgrData;
+import org.apache.jena.dboe.transaction.txn.TransactionalComponentLifecycle;
+import org.apache.jena.dboe.transaction.txn.TxnId;
import org.apache.jena.query.ReadWrite;
/** Transactional {@link BinaryDataFile}.
@@ -38,18 +44,19 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
implements BinaryDataFile {
/*
- * The file is written to as we go along so abort requires some action. We
- * can't recover from just the file, without any redo or undo recovery
- * action.
+ * The file is written to as we go along but we might need to abort
+ * and reset to the starting state of a transaction, which is TxnBinFile
+ * or set a new prepare/commit intended state which is FileState
+ * as a ByteBuffer.
*/
-
- private final FileState stateMgr;
+ private final FileState fileState;
// The current committed position and the limit as seen by readers.
// This is also the abort point.
+ // Global.s
private final AtomicLong committedLength;
- // The per thread runtime state
+ // The state of the file visible outside the transaction.
static class TxnBinFile {
final long length;
@@ -58,7 +65,9 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
}
}
- // Prepare record
+ // This is the state of the file ahead of time at prepare/commit.
+ // It is set on prepare.
+ // It is only valid on the transaction thread in the transaction lifecycle.
static class FileState extends StateMgrData {
FileState(BufferChannel bufferChannel, long length, long position) {
super(bufferChannel, length, position);
@@ -75,7 +84,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
*/
public TransBinaryDataFile(BinaryDataFile binFile, ComponentId cid, BufferChannel bufferChannel) {
super(cid);
- stateMgr = new FileState(bufferChannel, 0L, 0L);
+ fileState = new FileState(bufferChannel, 0L, 0L);
this.binFile = binFile;
if ( ! binFile.isOpen() )
binFile.open();
@@ -93,8 +102,8 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
@Override
public void recover(ByteBuffer ref) {
- stateMgr.setState(ref);
- committedLength.set(stateMgr.length());
+ fileState.setState(ref);
+ committedLength.set(fileState.length());
recoveryAction = true;
}
@@ -104,7 +113,8 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
long length = committedLength.get();
binFile.truncate(length);
binFile.sync();
- committedLength.set(length);
+ committedLength.set(binFile.length());
+ recoveryAction = false;
}
}
@@ -113,8 +123,6 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
@Override
protected TxnBinFile _begin(ReadWrite readWrite, TxnId txnId) {
- // Atomic read across the two because it's called from within
- // TransactionCoordinator.begin$ where there is a lock.
return createState();
}
@@ -124,36 +132,42 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
}
@Override
- protected TxnBinFile _promote(TxnId txnId, TxnBinFile state) {
+ protected TxnBinFile _promote(TxnId txnId, TxnBinFile txnResetState) {
+ // New state object (may be PROMOTE-READ-COMMITTED and not the same as passed in).
return createState();
}
@Override
- protected ByteBuffer _commitPrepare(TxnId txnId, TxnBinFile state) {
- // Force to disk but do not set the on disk state to record that.
+ protected ByteBuffer _commitPrepare(TxnId txnId, TxnBinFile txnResetState) {
+ // Force to disk but do not set the on-disk state to record that.
binFile.sync();
- stateMgr.length(binFile.length());
- return stateMgr.getState();
+ fileState.length(binFile.length());
+ return fileState.getState();
}
@Override
- protected void _commit(TxnId txnId, TxnBinFile state) {
+ protected void _commit(TxnId txnId, TxnBinFile txnResetState) {
if ( isWriteTxn() ) {
- // Force to disk happens in _commitPrepare
- stateMgr.writeState();
- // Move visible commit point forward (not strictly necessary - transaction is ending.
+ // Force data to disk happens in _commitPrepare
+ fileState.writeState();
committedLength.set(binFile.length());
}
}
@Override
protected void _commitEnd(TxnId txnId, TxnBinFile state) {
+ // No clearup needed.
}
@Override
- protected void _abort(TxnId txnId, TxnBinFile state) {
+ protected void _abort(TxnId txnId, TxnBinFile txnResetState) {
if ( isWriteTxn() ) {
- binFile.truncate(committedLength.get());
+ long x = committedLength.get();
+ // Internal consistency check.
+ // (Abort after commit would trigger the warning.)
+ if ( txnResetState.length != x )
+ Log.warn(this, format("Mismatch: state.length = %d, committedLength = %d", txnResetState.length != x));
+ binFile.truncate(x);
binFile.sync();
}
}
@@ -164,8 +178,6 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
@Override
protected void _shutdown() {}
- private void checkBoundsReader(long requestedPoint, TxnBinFile state) { }
-
@Override
public void open() {
if ( ! binFile.isOpen() )
@@ -187,7 +199,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
private void checkRead(long posn) {
if ( posn > getDataState().length )
- IO.exception("Out of bounds: (limit "+getDataState().length+")"+posn);
+ IO.exception("Out of bounds: (limit "+getDataState().length+") "+posn);
}
@Override
@@ -198,7 +210,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
/**
* Truncate only supported for an abort - this transactional version of
- * BinaryDataFile will not truncate to earlier than the committed length.
+ * {@link BinaryDataFile} will not truncate to earlier than the committed length.
*/
@Override
public void truncate(long size) {
@@ -217,7 +229,7 @@ public class TransBinaryDataFile extends TransactionalComponentLifecycle<TransBi
@Override
public void close() {
- stateMgr.close();
+ fileState.close();
binFile.close();
}
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java
index 6dec310..1e8ccb6 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/nodetable/NodeTableTRDF.java
@@ -36,8 +36,8 @@ import org.apache.thrift.protocol.TProtocol ;
public class NodeTableTRDF extends NodeTableNative {
// Write buffering is done in the underlying BinaryDataFile
- BinaryDataFile diskFile ;
- private TReadAppendFileTransport transport ;
+ private final BinaryDataFile diskFile ;
+ private final TReadAppendFileTransport transport ;
private final TProtocol protocol ;
public NodeTableTRDF(Index nodeToId, BinaryDataFile objectFile) {