You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/12 22:18:30 UTC
[27/51] [abbrv] [partial] geode git commit: GEODE-2632: change
dependencies on GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java
index d4e7f2a..3a0bf8e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/MemberFunctionStreamingMessage.java
@@ -50,10 +50,6 @@ import org.apache.geode.internal.cache.execute.MultiRegionFunctionContextImpl;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-/**
- *
- *
- */
public class MemberFunctionStreamingMessage extends DistributionMessage
implements TransactionMessage, MessageWithReply {
@@ -72,6 +68,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
private int processorId;
private int txUniqId = TXManagerImpl.NOTX;
+
private InternalDistributedMember txMemberId = null;
private boolean isFnSerializationReqd;
@@ -80,8 +77,6 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
private boolean isReExecute;
- // private final Object lastResultLock = new Object();
-
private static final short IS_REEXECUTE = UNRESERVED_FLAGS_START;
public MemberFunctionStreamingMessage() {}
@@ -124,7 +119,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
if (this.txUniqId == TXManagerImpl.NOTX) {
return null;
} else {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// ignore and return, we are shutting down!
return null;
@@ -134,9 +129,9 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
}
}
- private void cleanupTransasction(TXStateProxy tx) {
+ private void cleanupTransaction(TXStateProxy tx) {
if (this.txUniqId != TXManagerImpl.NOTX) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
// ignore and return, we are shutting down!
return;
@@ -167,7 +162,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
ResultSender resultSender = new MemberFunctionResultSender(dm, this, this.functionObject);
Set<Region> regions = new HashSet<Region>();
if (this.regionPathSet != null) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
for (String regionPath : this.regionPathSet) {
if (checkCacheClosing(dm) || checkDSClosing(dm)) {
thr =
@@ -181,7 +176,6 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
FunctionContextImpl context = new MultiRegionFunctionContextImpl(this.functionObject.getId(),
this.args, resultSender, regions, isReExecute);
-
long start = stats.startTime();
stats.startFunctionExecution(this.functionObject.hasResult());
if (logger.isDebugEnabled()) {
@@ -235,7 +229,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
SystemFailure.checkFailure();
thr = t;
} finally {
- cleanupTransasction(tx);
+ cleanupTransaction(tx);
if (thr != null) {
rex = new ReplyException(thr);
replyWithException(dm, rex);
@@ -268,7 +262,7 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
if ((flags & HAS_TX_ID) != 0)
this.txUniqId = in.readInt();
if ((flags & HAS_TX_MEMBERID) != 0) {
- this.txMemberId = (InternalDistributedMember) DataSerializer.readObject(in);
+ this.txMemberId = DataSerializer.readObject(in);
}
Object object = DataSerializer.readObject(in);
@@ -358,8 +352,8 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
/**
* check to see if the cache is closing
*/
- final public boolean checkCacheClosing(DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ private boolean checkCacheClosing(DistributionManager dm) {
+ InternalCache cache = GemFireCacheImpl.getInstance();
return (cache == null || cache.getCancelCriterion().isCancelInProgress());
}
@@ -368,25 +362,15 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
*
* @return true if the distributed system is closing
*/
- final public boolean checkDSClosing(DistributionManager dm) {
+ private boolean checkDSClosing(DistributionManager dm) {
InternalDistributedSystem ds = dm.getSystem();
return (ds == null || ds.isDisconnecting());
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TransactionMessage#canStartRemoteTransaction()
- */
public boolean canStartRemoteTransaction() {
return true;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TransactionMessage#getTXUniqId()
- */
public int getTXUniqId() {
return this.txUniqId;
}
@@ -400,7 +384,6 @@ public class MemberFunctionStreamingMessage extends DistributionMessage
}
public InternalDistributedMember getTXOriginatorClient() {
- // TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
index faec43a..bfcf6ff 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java
@@ -349,13 +349,6 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
.toLocalizedString());
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.RegionEntry#getSerializedValueOnDisk(org.apache.geode.internal.
- * cache.LocalRegion)
- */
public Object getSerializedValueOnDisk(LocalRegion localRegion) {
throw new UnsupportedOperationException(
LocalizedStrings.PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
http://git-wip-us.apache.org/repos/asf/geode/blob/654d65b5/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index f21d195..7bf1a9d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -14,12 +14,49 @@
*/
package org.apache.geode.internal.cache;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.SyncFailedException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SerializationException;
@@ -78,52 +115,14 @@ import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.internal.util.IOUtils;
import org.apache.geode.internal.util.TransformUtils;
import org.apache.geode.pdx.internal.PdxWriterImpl;
-import org.apache.logging.log4j.Logger;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.SyncFailedException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
/**
* Implements an operation log to write to disk. As of prPersistSprint2 this file only supports
* persistent regions. For overflow only regions see {@link OverflowOplog}.
*
- *
* @since GemFire 5.1
*/
-
-public final class Oplog implements CompactableOplog, Flushable {
+public class Oplog implements CompactableOplog, Flushable {
private static final Logger logger = LogService.getLogger();
/** Extension of the oplog file * */
@@ -141,8 +140,6 @@ public final class Oplog implements CompactableOplog, Flushable {
private final OplogFile drf = new OplogFile();
private final KRFile krf = new KRFile();
- /** preallocated space available for writing to* */
- // volatile private long opLogSpace = 0L;
/** The stats for this store */
private final DiskStoreStats stats;
@@ -190,6 +187,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* The HighWaterMark of recentValues.
*/
private final AtomicLong totalCount = new AtomicLong(0);
+
/**
* The number of records in this oplog that contain the most recent value of the entry.
*/
@@ -209,13 +207,13 @@ public final class Oplog implements CompactableOplog, Flushable {
* Set to true after the first drf recovery.
*/
private boolean haveRecoveredDrf = true;
+
/**
* Set to true after the first crf recovery.
*/
private boolean haveRecoveredCrf = true;
- private OpState opState;
- /** OPCODES - byte appended before being written to disk* */
+ private OpState opState;
/**
* Written to CRF, and DRF.
@@ -239,7 +237,9 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_NEW_ENTRY_BASE_ID = 63;
+
static final int OPLOG_NEW_ENTRY_BASE_REC_SIZE = 1 + 8 + 1;
+
/**
* Written to CRF. The OplogEntryId is +1 the previous new_entry OplogEntryId. Byte Format: 1:
* userBits RegionId 4: valueLength (optional depending on bits) valueLength: value bytes
@@ -258,6 +258,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_1ID = 65;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 2 bytes. Byte Format: 1: userBits 2: OplogEntryId RegionId 4:
@@ -297,6 +298,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_5ID = 69;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 6 bytes. Byte Format: 1: userBits 6: OplogEntryId RegionId 4:
@@ -306,6 +308,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_6ID = 70;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 7 bytes. Byte Format: 1: userBits 7: OplogEntryId RegionId 4:
@@ -315,6 +318,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_7ID = 71;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 8 bytes. Byte Format: 1: userBits 8: OplogEntryId RegionId 4:
@@ -334,6 +338,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_WITH_KEY_1ID = 73;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 2 bytes. Byte Format: 1: userBits 2: OplogEntryId RegionId 4:
@@ -373,6 +378,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_WITH_KEY_5ID = 77;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 6 bytes. Byte Format: 1: userBits 6: OplogEntryId RegionId 4:
@@ -382,6 +388,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_WITH_KEY_6ID = 78;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 7 bytes. Byte Format: 1: userBits 7: OplogEntryId RegionId 4:
@@ -391,6 +398,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_MOD_ENTRY_WITH_KEY_7ID = 79;
+
/**
* Written to CRF. The OplogEntryId is relative to the previous mod_entry OplogEntryId. The signed
* difference is encoded in 8 bytes. Byte Format: 1: userBits 8: OplogEntryId RegionId 4:
@@ -439,6 +447,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_DEL_ENTRY_5ID = 85;
+
/**
* Written to DRF. The OplogEntryId is relative to the previous del_entry OplogEntryId. The signed
* difference is encoded in 6 bytes. Byte Format: 6: OplogEntryId 1: EndOfRecord
@@ -446,6 +455,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_DEL_ENTRY_6ID = 86;
+
/**
* Written to DRF. The OplogEntryId is relative to the previous del_entry OplogEntryId. The signed
* difference is encoded in 7 bytes. Byte Format: 7: OplogEntryId 1: EndOfRecord
@@ -453,6 +463,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* @since GemFire prPersistSprint1
*/
private static final byte OPLOG_DEL_ENTRY_7ID = 87;
+
/**
* Written to DRF. The OplogEntryId is relative to the previous del_entry OplogEntryId. The signed
* difference is encoded in 8 bytes. Byte Format: 8: OplogEntryId 1: EndOfRecord
@@ -488,6 +499,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* such as 7.0.0.beta EndOfRecord
*/
private static final byte OPLOG_GEMFIRE_VERSION = 91;
+
static final int OPLOG_GEMFIRE_VERSION_REC_SIZE = 1 + 3 + 1;
/**
@@ -499,14 +511,14 @@ public final class Oplog implements CompactableOplog, Flushable {
*/
static final byte OPLOG_MAGIC_SEQ_ID = 92;
- public static enum OPLOG_TYPE {
+ public enum OPLOG_TYPE {
CRF(new byte[] {0x47, 0x46, 0x43, 0x52, 0x46, 0x31}), // GFCRF1
DRF(new byte[] {0x47, 0x46, 0x44, 0x52, 0x46, 0x31}), // GFDRF1
IRF(new byte[] {0x47, 0x46, 0x49, 0x52, 0x46, 0x31}), // GFIRF1
KRF(new byte[] {0x47, 0x46, 0x4b, 0x52, 0x46, 0x31}), // GFKRF1
IF(new byte[] {0x47, 0x46, 0x49, 0x46, 0x30, 0x31}); // GFIF01
- private byte[] bytes;
+ private final byte[] bytes;
OPLOG_TYPE(byte[] byteSeq) {
this.bytes = byteSeq;
@@ -527,10 +539,10 @@ public final class Oplog implements CompactableOplog, Flushable {
private final boolean compactOplogs;
/**
- * Asif: This object is used to correctly identify the OpLog size so as to cause a switch of
- * oplogs
+ * This object is used to correctly identify the OpLog size so as to cause a switch of oplogs
*/
final Object lock = new Object();
+
final ByteBuffer[] bbArray = new ByteBuffer[2];
private boolean lockedForKRFcreate = false;
@@ -542,16 +554,7 @@ public final class Oplog implements CompactableOplog, Flushable {
private boolean doneAppending = false;
/**
- * Extra bytes to be skipped before reading value bytes. Value is currently 6 : 1 byte for opcode,
- * 1 byte for userbits and 4 bytes for value length.
- */
- private static final long SKIP_BYTES = 6;
-
- private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
-
- // ///////////////////// Constructors ////////////////////////
- /**
- * Creates new <code>Oplog</code> for the given region.
+ * Creates new {@code Oplog} for the given region.
*
* @param oplogId int identifying the new oplog
* @param dirHolder The directory in which to create new Oplog
@@ -620,8 +623,8 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: A copy constructor used for creating a new oplog based on the previous Oplog. This
- * constructor is invoked only from the function switchOplog
+ * A copy constructor used for creating a new oplog based on the previous Oplog. This constructor
+ * is invoked only from the function switchOplog
*
* @param oplogId integer identifying the new oplog
* @param dirHolder The directory in which to create new Oplog
@@ -773,7 +776,7 @@ public final class Oplog implements CompactableOplog, Flushable {
this.dirHolder.incrementTotalOplogSize(getOpStateSize());
}
- public final Version currentRecoveredGFVersion() {
+ public Version currentRecoveredGFVersion() {
return this.gfversion;
}
@@ -790,7 +793,6 @@ public final class Oplog implements CompactableOplog, Flushable {
* @param olf the oplog to write to
* @param diskRegions the set of disk regions we should write the RVV of
* @param writeGCRVV true to write write the GC RVV
- * @throws IOException
*/
private void writeRVVRecord(OplogFile olf, Map<Long, AbstractDiskRegion> diskRegions,
boolean writeGCRVV) throws IOException {
@@ -835,9 +837,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* This constructor will get invoked only in case of persistent region when it is recovering an
* oplog.
- *
- * @param oplogId
- * @param parent
*/
Oplog(long oplogId, PersistentOplogSet parent) {
// @todo have the crf and drf use different directories.
@@ -850,8 +849,7 @@ public final class Oplog implements CompactableOplog, Flushable {
this.parent = parent.getParent();
this.oplogSet = parent;
this.opState = new OpState();
- long maxOplogSizeParam = getParent().getMaxOplogSizeInBytes();
- this.maxOplogSize = maxOplogSizeParam;
+ this.maxOplogSize = getParent().getMaxOplogSizeInBytes();
setMaxCrfDrfSize();
this.stats = getParent().getStats();
this.compactOplogs = getParent().getAutoCompact();
@@ -1084,8 +1082,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* Creates the crf oplog file
- *
- * @throws IOException
*/
private void createCrf(OplogFile prevOlf) throws IOException {
File f = new File(this.diskFile.getPath() + CRF_FILE_EXT);
@@ -1121,14 +1117,12 @@ public final class Oplog implements CompactableOplog, Flushable {
prevOlf.writeBuf = null;
return result;
} else {
- return ByteBuffer.allocateDirect(Integer.getInteger("WRITE_BUF_SIZE", 32768).intValue());
+ return ByteBuffer.allocateDirect(Integer.getInteger("WRITE_BUF_SIZE", 32768));
}
}
/**
* Creates the drf oplog file
- *
- * @throws IOException
*/
private void createDrf(OplogFile prevOlf) throws IOException {
File f = new File(this.diskFile.getPath() + DRF_FILE_EXT);
@@ -1150,27 +1144,18 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Returns the <code>DiskStoreStats</code> for this oplog
+ * Returns the {@code DiskStoreStats} for this oplog
*/
public DiskStoreStats getStats() {
return this.stats;
}
/**
- * Flushes any pending writes to disk.
- *
- * public final void flush() { forceFlush(); }
- */
-
- /**
* Test Method to be used only for testing purposes. Gets the underlying File object for the Oplog
* . Oplog class uses this File object to obtain the RandomAccessFile object. Before returning the
* File object , the dat present in the buffers of the RandomAccessFile object is flushed.
* Otherwise, for windows the actual file length does not match with the File size obtained from
* the File object
- *
- * @throws IOException
- * @throws SyncFailedException
*/
File getOplogFile() throws SyncFailedException, IOException {
// @todo check callers for drf
@@ -1271,7 +1256,7 @@ public final class Oplog implements CompactableOplog, Flushable {
* present. @param faultingIn @param bitOnly boolean indicating whether to extract just the
* UserBit or UserBit with value @return BytesAndBits object wrapping the value & user bit
*/
- public final BytesAndBits getBytesAndBits(DiskRegionView dr, DiskId id, boolean faultingIn,
+ public BytesAndBits getBytesAndBits(DiskRegionView dr, DiskId id, boolean faultingIn,
boolean bitOnly) {
Oplog retryOplog = null;
long offset = 0;
@@ -1292,22 +1277,18 @@ public final class Oplog implements CompactableOplog, Flushable {
BytesAndBits bb = null;
long start = this.stats.startRead();
- // Asif: If the offset happens to be -1, still it is possible that
+ // If the offset happens to be -1, still it is possible that
// the data is present in the current oplog file.
if (offset == -1) {
- // Asif: Since it is given that a get operation has alreadty
+ // Since it is given that a get operation has alreadty
// taken a
// lock on an entry , no put operation could have modified the
// oplog ID
// there fore synchronization is not needed
- // synchronized (id) {
- // if (id.getOplogId() == this.oplogId) {
offset = id.getOffsetInOplog();
- // }
- // }
}
- // Asif :If the current OpLog is not destroyed ( its opLogRaf file
+ // If the current OpLog is not destroyed ( its opLogRaf file
// is still open) we can retrieve the value from this oplog.
try {
bb = basicGet(dr, offset, bitOnly, id.getValueLength(), id.getUserBits());
@@ -1321,7 +1302,7 @@ public final class Oplog implements CompactableOplog, Flushable {
if (bb == null) {
throw new EntryDestroyedException(
LocalizedStrings.Oplog_NO_VALUE_WAS_FOUND_FOR_ENTRY_WITH_DISK_ID_0_ON_A_REGION_WITH_SYNCHRONOUS_WRITING_SET_TO_1
- .toLocalizedString(new Object[] {id, Boolean.valueOf(dr.isSync())}));
+ .toLocalizedString(new Object[] {id, dr.isSync()}));
}
if (bitOnly) {
dr.endRead(start, this.stats.endRead(start, 1), 1);
@@ -1339,17 +1320,14 @@ public final class Oplog implements CompactableOplog, Flushable {
* HTree with the oplog being destroyed
*
* @param id A DiskId object for which the value on disk will be fetched
- *
*/
- public final BytesAndBits getNoBuffer(DiskRegion dr, DiskId id) {
+ public BytesAndBits getNoBuffer(DiskRegion dr, DiskId id) {
if (logger.isDebugEnabled()) {
logger.debug("Oplog::getNoBuffer:Before invoking Oplog.basicGet for DiskID ={}", id);
}
try {
- BytesAndBits bb =
- basicGet(dr, id.getOffsetInOplog(), false, id.getValueLength(), id.getUserBits());
- return bb;
+ return basicGet(dr, id.getOffsetInOplog(), false, id.getValueLength(), id.getUserBits());
} catch (DiskAccessException dae) {
logger.error(LocalizedMessage.create(
LocalizedStrings.Oplog_OPLOGGETNOBUFFEREXCEPTION_IN_RETRIEVING_VALUE_FROM_DISK_FOR_DISKID_0,
@@ -1612,32 +1590,24 @@ public final class Oplog implements CompactableOplog, Flushable {
+ getParent().getInitFile() + "\". Drf did not contain a disk store id.",
getParent());
}
- } catch (EOFException ex) {
+ } catch (EOFException ignore) {
// ignore since a partial record write can be caused by a crash
- // if (byteCount < fileLength) {
- // throw new
- // DiskAccessException(LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0
- // .toLocalizedString(drfFile.getPath()), ex, getParent());
- // }// else do nothing, this is expected in crash scenarios
} catch (IOException ex) {
getParent().getCancelCriterion().checkCancelInProgress(ex);
throw new DiskAccessException(
LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0
.toLocalizedString(drfFile.getPath()),
ex, getParent());
- } catch (CancelException ignore) {
+ } catch (CancelException e) {
if (logger.isDebugEnabled()) {
- logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", ignore);
+ logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", e);
}
- } catch (RegionDestroyedException ignore) {
+ } catch (RegionDestroyedException e) {
if (logger.isDebugEnabled()) {
- logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", ignore);
+ logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", e);
}
- } catch (IllegalStateException ex) {
- // @todo
- // if (!rgn.isClosed()) {
- throw ex;
- // }
+ } catch (IllegalStateException e) {
+ throw e;
}
// Add the Oplog size to the Directory Holder which owns this oplog,
// so that available space is correctly calculated & stats updated.
@@ -1711,7 +1681,7 @@ public final class Oplog implements CompactableOplog, Flushable {
FileInputStream fis;
try {
fis = new FileInputStream(f);
- } catch (FileNotFoundException ex) {
+ } catch (FileNotFoundException ignore) {
return false;
}
try {
@@ -1735,7 +1705,7 @@ public final class Oplog implements CompactableOplog, Flushable {
validateOpcode(dis, OPLOG_DISK_STORE_ID);
readDiskStoreRecord(dis, f);
- } catch (DiskAccessException notInNewFormatErr) {
+ } catch (DiskAccessException ignore) {
// Failed to read the file. There are two possibilities. Either this
// file is in old format which does not have a magic seq in the
// beginning or this is not a valid file at all. Try reading it as a
@@ -1744,7 +1714,7 @@ public final class Oplog implements CompactableOplog, Flushable {
fis = new FileInputStream(f);
dis = new DataInputStream(new BufferedInputStream(fis, 1024 * 1024));
readDiskStoreRecord(dis, f);
- } catch (IllegalStateException notOldFileErr) {
+ } catch (IllegalStateException ignore) {
// Failed to read the file. There are two possibilities. Either this
// is in new format which has a magic seq in the beginning or this is
// not a valid file at all
@@ -2023,32 +1993,24 @@ public final class Oplog implements CompactableOplog, Flushable {
+ getParent().getInitFile() + "\". Crf did not contain a disk store id.",
getParent());
}
- } catch (EOFException ex) {
+ } catch (EOFException ignore) {
// ignore since a partial record write can be caused by a crash
- // if (byteCount < fileLength) {
- // throw new
- // DiskAccessException(LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0
- // .toLocalizedString(this.crf.f.getPath()), ex, getParent());
- // }// else do nothing, this is expected in crash scenarios
} catch (IOException ex) {
getParent().getCancelCriterion().checkCancelInProgress(ex);
throw new DiskAccessException(
LocalizedStrings.Oplog_FAILED_READING_FILE_DURING_RECOVERY_FROM_0
.toLocalizedString(this.crf.f.getPath()),
ex, getParent());
- } catch (CancelException ignore) {
+ } catch (CancelException e) {
if (logger.isDebugEnabled()) {
- logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", ignore);
+ logger.debug("Oplog::readOplog:Error in recovery as Cache was closed", e);
}
- } catch (RegionDestroyedException ignore) {
+ } catch (RegionDestroyedException e) {
if (logger.isDebugEnabled()) {
- logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", ignore);
+ logger.debug("Oplog::readOplog:Error in recovery as Region was destroyed", e);
}
- } catch (IllegalStateException ex) {
- // @todo
- // if (!rgn.isClosed()) {
- throw ex;
- // }
+ } catch (IllegalStateException e) {
+ throw e;
}
// Add the Oplog size to the Directory Holder which owns this oplog,
@@ -2109,7 +2071,7 @@ public final class Oplog implements CompactableOplog, Flushable {
if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY)) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < OPLOG_TYPE.getLen(); i++) {
- sb.append(" " + seq[i]);
+ sb.append(" ").append(seq[i]);
}
logger.trace(LogMarker.PERSIST_RECOVERY, "oplog magic code: {}", sb);
}
@@ -2222,7 +2184,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
} else {
- boolean rvvTrusted = InternalDataSerializer.readBoolean(dis);
+ boolean rvvTrusted = DataSerializer.readBoolean(dis);
if (drs != null) {
if (latestOplog) {
// only set rvvtrust based on the newest oplog recovered
@@ -2491,8 +2453,6 @@ public final class Oplog implements CompactableOplog, Flushable {
*
* @param dis DataInputStream from which the oplog is being read
* @param opcode byte whether the id is short/int/long
- * @param recoverValue
- * @throws IOException
*/
private void readNewEntry(CountingDataInputStream dis, byte opcode, OplogEntryIdSet deletedIds,
boolean recoverValue, final LocalRegion currentRegion, Version version, ByteArrayDataInput in,
@@ -2679,10 +2639,6 @@ public final class Oplog implements CompactableOplog, Flushable {
*
* @param dis DataInputStream from which the oplog is being read
* @param opcode byte whether the id is short/int/long
- * @param recoverValue
- * @param currentRegion
- * @param keyRequiresRegionContext
- * @throws IOException
*/
private void readModifyEntry(CountingDataInputStream dis, byte opcode, OplogEntryIdSet deletedIds,
boolean recoverValue, LocalRegion currentRegion, Version version, ByteArrayDataInput in,
@@ -2891,7 +2847,7 @@ public final class Oplog implements CompactableOplog, Flushable {
DiskEntry.Helper.readSerializedValue(valueBytes, version, in, true);
} catch (SerializationException ex) {
if (logger.isDebugEnabled()) {
- logger.debug("Could not deserialize recovered value: {}" + ex.getCause(), ex);
+ logger.debug("Could not deserialize recovered value: {}", ex.getCause(), ex);
}
}
}
@@ -2904,9 +2860,6 @@ public final class Oplog implements CompactableOplog, Flushable {
*
* @param dis DataInputStream from which the oplog is being read
* @param opcode byte whether the id is short/int/long
- * @param deletedIds
- * @param recoverValue
- * @throws IOException
*/
private void readModifyEntryWithKey(CountingDataInputStream dis, byte opcode,
OplogEntryIdSet deletedIds, boolean recoverValue, final LocalRegion currentRegion,
@@ -3099,12 +3052,9 @@ public final class Oplog implements CompactableOplog, Flushable {
* @param dis DataInputStream from which the oplog is being read
* @param opcode byte whether the id is short/int/long
* @param parent instance of disk region
- * @throws IOException
*/
private void readDelEntry(CountingDataInputStream dis, byte opcode, OplogEntryIdSet deletedIds,
- DiskStoreImpl parent) throws IOException
-
- {
+ DiskStoreImpl parent) throws IOException {
int idByteCount = (opcode - OPLOG_DEL_ENTRY_1ID) + 1;
// long debugRecoverDelEntryId = this.recoverDelEntryId;
long oplogKeyId = getDelEntryId(dis, idByteCount);
@@ -3161,8 +3111,6 @@ public final class Oplog implements CompactableOplog, Flushable {
* Returns true if it is ok the skip the current modify record which had the given oplogEntryId.
* It is ok to skip if any of the following are true: 1. deletedIds contains the id 2. the last
* modification of the entry was done by a record read from an oplog other than this oplog
- *
- * @param tag
*/
private OkToSkipResult okToSkipModifyRecord(OplogEntryIdSet deletedIds, long drId,
DiskRecoveryStore drs, long oplogEntryId, boolean checkRecoveryMap, VersionTag tag) {
@@ -3230,8 +3178,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* Returns true if the drId region has been destroyed or if oplogKeyId preceeds the last clear
* done on the drId region
- *
- * @param tag
*/
private OkToSkipResult okToSkipRegion(DiskRegionView drv, long oplogKeyId, VersionTag tag) {
long lastClearKeyId = drv.getClearOplogEntryId();
@@ -3300,25 +3246,16 @@ public final class Oplog implements CompactableOplog, Flushable {
assert idByteCount >= 1 && idByteCount <= 8 : idByteCount;
long delta;
- byte firstByte = dis.readByte();
- // if (firstByte < 0) {
- // delta = 0xFFFFFFFFFFFFFF00L | firstByte;
- // } else {
- // delta = firstByte;
- // }
- delta = firstByte;
+ delta = dis.readByte();
idByteCount--;
while (idByteCount > 0) {
delta <<= 8;
delta |= (0x00FF & dis.readByte());
idByteCount--;
}
- // this.lastDelta = delta; // HACK DEBUG
return delta;
}
- // private long lastDelta; // HACK DEBUG
-
/**
* Call this when the cache is closed or region is destroyed. Deletes the lock files.
*/
@@ -3501,8 +3438,6 @@ public final class Oplog implements CompactableOplog, Flushable {
* @param opCode The int value identifying whether it is create/modify or delete operation
* @param entry The DiskEntry object being operated upon
* @param value The byte array representing the value
- * @param userBits
- * @throws IOException
*/
private void initOpState(byte opCode, DiskRegionView dr, DiskEntry entry, ValueWrapper value,
byte userBits, boolean notToUseUserBits) throws IOException {
@@ -3545,17 +3480,14 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: Modified the code so as to reuse the already created ByteBuffer during transition.
- * Creates a key/value pair from a region entry on disk. Updates all of the necessary
+ * Modified the code so as to reuse the already created ByteBuffer during transition. Creates a
+ * key/value pair from a region entry on disk. Updates all of the necessary
* {@linkplain DiskStoreStats statistics} and invokes basicCreate
*
* @param entry The DiskEntry object for this key/value pair.
* @param value byte array representing the value
- * @throws DiskAccessException
- * @throws IllegalStateException
- *
*/
- public final void create(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) {
+ public void create(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) {
if (this != getOplogSet().getChild()) {
getOplogSet().getChild().create(region, entry, value, async);
@@ -3612,13 +3544,11 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: A helper function which identifies whether to create the entry in the current oplog or to
- * make the switch to the next oplog. This function enables us to reuse the byte buffer which got
+ * A helper function which identifies whether to create the entry in the current oplog or to make
+ * the switch to the next oplog. This function enables us to reuse the byte buffer which got
* created for an oplog which no longer permits us to use itself
*
* @param entry DiskEntry object representing the current Entry
- * @throws IOException
- * @throws InterruptedException
*/
private void basicCreate(DiskRegion dr, DiskEntry entry, ValueWrapper value, byte userBits,
boolean async) throws IOException, InterruptedException {
@@ -3634,7 +3564,7 @@ public final class Oplog implements CompactableOplog, Flushable {
// contention point
// synchronized (this.crf) {
initOpState(OPLOG_NEW_ENTRY_0ID, dr, entry, value, userBits, false);
- // Asif : Check if the current data in ByteBuffer will cause a
+ // Check if the current data in ByteBuffer will cause a
// potential increase in the size greater than the max allowed
long temp = (getOpStateSize() + this.crf.currSize);
if (!this.wroteNewEntryBase) {
@@ -3662,10 +3592,10 @@ public final class Oplog implements CompactableOplog, Flushable {
id.setKeyId(createOplogEntryId);
// startPosForSynchOp = this.crf.currSize;
- // Asif: Allow it to be added to the OpLOg so increase the
+ // Allow it to be added to the OpLOg so increase the
// size of currenstartPosForSynchOpt oplog
int dataLength = getOpStateSize();
- // Asif: It is necessary that we set the
+ // It is necessary that we set the
// Oplog ID here without releasing the lock on object as we are
// writing to the file after releasing the lock. This can cause
// a situation where the
@@ -3705,7 +3635,7 @@ public final class Oplog implements CompactableOplog, Flushable {
if (logger.isTraceEnabled()) {
logger.trace("Oplog::basicCreate:Release dByteBuffer with data for Disk ID = {}", id);
}
- // Asif: As such for any put or get operation , a synch is taken
+ // As such for any put or get operation , a synch is taken
// on the Entry object in the DiskEntry's Helper functions.
// Compactor thread will also take a lock on entry object. Therefore
// we do not require a lock on DiskID, as concurrent access for
@@ -3767,15 +3697,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* This oplog will be forced to switch to a new oplog
- *
- *
- * public void forceRolling() { if (getOplogSet().getChild() == this) { synchronized (this.lock) {
- * if (getOplogSet().getChild() == this) { switchOpLog(0, null); } } if (!this.sync) {
- * this.writer.activateThreadToTerminate(); } } }
- */
-
- /**
- * This oplog will be forced to switch to a new oplog
*/
void forceRolling(DiskRegion dr) {
if (getOplogSet().getChild() == this) {
@@ -3798,11 +3719,11 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: This function is used to switch from one op Log to another , when the size of the current
- * oplog has reached the maximum permissible. It is always called from synch block with lock
- * object being the OpLog File object We will reuse the ByteBuffer Pool. We should add the current
- * Oplog for compaction first & then try to get next directory holder as in case there is only a
- * single directory with space being full, compaction has to happen before it can be given a new
+ * This function is used to switch from one op Log to another , when the size of the current oplog
+ * has reached the maximum permissible. It is always called from synch block with lock object
+ * being the OpLog File object We will reuse the ByteBuffer Pool. We should add the current Oplog
+ * for compaction first & then try to get next directory holder as in case there is only a single
+ * directory with space being full, compaction has to happen before it can be given a new
* directory. If the operation causing the switching is on an Entry which already is referencing
* the oplog to be compacted, then the compactor thread will skip compaction that entry & the
* switching thread will roll the entry explicitly.
@@ -3905,7 +3826,7 @@ public final class Oplog implements CompactableOplog, Flushable {
createKrfAsync();
}
} catch (DiskAccessException dae) {
- // Asif: Remove the Oplog which was added in the DiskStoreImpl
+ // Remove the Oplog which was added in the DiskStoreImpl
// for compaction as compaction cannot be done.
// However, it is also possible that compactor
// may have done the compaction of the Oplog but the switching thread
@@ -3919,7 +3840,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* Schedule a task to create a krf asynchronously
- *
*/
protected void createKrfAsync() {
getParent().executeDiskStoreTask(new Runnable() {
@@ -4313,10 +4233,10 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif:This function retrieves the value for an entry being compacted subject to entry
- * referencing the oplog being compacted. Attempt is made to retrieve the value from in memory ,
- * if available, else from asynch buffers ( if asynch mode is enabled), else from the Oplog being
- * compacted. It is invoked from switchOplog as well as OplogCompactor's compact function.
+ * This function retrieves the value for an entry being compacted subject to entry referencing the
+ * oplog being compacted. Attempt is made to retrieve the value from in memory , if available,
+ * else from asynch buffers ( if asynch mode is enabled), else from the Oplog being compacted. It
+ * is invoked from switchOplog as well as OplogCompactor's compact function.
*
* @param entry DiskEntry being compacted referencing the Oplog being compacted
* @param wrapper Object of type BytesAndBitsForCompactor. The data if found is set in the wrapper
@@ -4335,10 +4255,9 @@ public final class Oplog implements CompactableOplog, Flushable {
@Released
Object value = entry._getValueRetain(dr, true);
ReferenceCountHelper.unskipRefCountTracking();
- // TODO:KIRK:OK Object value = entry.getValueWithContext(dr);
boolean foundData = false;
if (value == null) {
- // Asif: If the mode is synch it is guaranteed to be present in the disk
+ // If the mode is synch it is guaranteed to be present in the disk
foundData = basicGetForCompactor(dr, oplogOffset, false, did.getValueLength(),
did.getUserBits(), wrapper);
// after we have done the get do one more check to see if the
@@ -4423,7 +4342,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
} else if (value instanceof byte[]) {
byte[] valueBytes = (byte[]) value;
- // Asif: If the value is already a byte array then the user bit
+ // If the value is already a byte array then the user bit
// is 0, which is the default value of the userBits variable,
// indicating that it is non serialized data. Thus it is
// to be used as it is & not to be deserialized to
@@ -4469,20 +4388,16 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* Modifies a key/value pair from a region entry on disk. Updates all of the necessary
* {@linkplain DiskStoreStats statistics} and invokes basicModify
+ * <p>
+ * Modified the code so as to reuse the already created ByteBuffer during transition. Minimizing
+ * the synchronization allowing multiple put operations for different entries to proceed
+ * concurrently for asynch mode
*
* @param entry DiskEntry object representing the current Entry
*
* @param value byte array representing the value
- * @throws DiskAccessException
- * @throws IllegalStateException
- */
- /*
- * Asif: Modified the code so as to reuse the already created ByteBuffer during transition.
- * Minimizing the synchronization allowing multiple put operations for different entries to
- * proceed concurrently for asynch mode
*/
- public final void modify(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) {
-
+ public void modify(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) {
if (getOplogSet().getChild() != this) {
getOplogSet().getChild().modify(region, entry, value, async);
} else {
@@ -4559,10 +4474,9 @@ public final class Oplog implements CompactableOplog, Flushable {
.toLocalizedString(this.diskFile.getPath()),
ie, drv.getName());
}
-
}
- public final void saveConflictVersionTag(LocalRegion region, VersionTag tag, boolean async) {
+ public void saveConflictVersionTag(LocalRegion region, VersionTag tag, boolean async) {
if (getOplogSet().getChild() != this) {
getOplogSet().getChild().saveConflictVersionTag(region, tag, async);
} else {
@@ -4581,8 +4495,8 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- private final void copyForwardForOfflineCompact(long oplogKeyId, byte[] keyBytes,
- byte[] valueBytes, byte userBits, long drId, VersionTag tag) {
+ private void copyForwardForOfflineCompact(long oplogKeyId, byte[] keyBytes, byte[] valueBytes,
+ byte userBits, long drId, VersionTag tag) {
try {
basicCopyForwardForOfflineCompact(oplogKeyId, keyBytes, valueBytes, userBits, drId, tag);
} catch (IOException ex) {
@@ -4600,7 +4514,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- private final void copyForwardModifyForCompact(DiskRegionView dr, DiskEntry entry,
+ private void copyForwardModifyForCompact(DiskRegionView dr, DiskEntry entry,
BytesAndBitsForCompactor wrapper) {
if (getOplogSet().getChild() != this) {
getOplogSet().getChild().copyForwardModifyForCompact(dr, entry, wrapper);
@@ -4646,14 +4560,12 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: A helper function which identifies whether to modify the entry in the current oplog or to
- * make the switch to the next oplog. This function enables us to reuse the byte buffer which got
+ * A helper function which identifies whether to modify the entry in the current oplog or to make
+ * the switch to the next oplog. This function enables us to reuse the byte buffer which got
* created for an oplog which no longer permits us to use itself. It will also take acre of
* compaction if required
*
* @param entry DiskEntry object representing the current Entry
- * @throws IOException
- * @throws InterruptedException
*/
private void basicModify(DiskRegionView dr, DiskEntry entry, ValueWrapper value, byte userBits,
boolean async, boolean calledByCompactor) throws IOException, InterruptedException {
@@ -4813,7 +4725,7 @@ public final class Oplog implements CompactableOplog, Flushable {
this.crf.currSize = temp;
if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) {
logger.trace(LogMarker.PERSIST_WRITES,
- "basicSaveConflictVersionTag: drId={} versionStamp={} oplog#", dr.getId(), tag,
+ "basicSaveConflictVersionTag: drId={} versionStamp={} oplog#{}", dr.getId(), tag,
getOplogId());
}
this.dirHolder.incrementTotalOplogSize(adjustment);
@@ -4870,7 +4782,7 @@ public final class Oplog implements CompactableOplog, Flushable {
logger.trace(LogMarker.PERSIST_WRITES,
"basicCopyForwardForOfflineCompact: id=<{}> keyBytes=<{}> valueOffset={} userBits={} valueLen={} valueBytes=<{}> drId={} oplog#{}",
oplogKeyId, baToString(keyBytes), startPosForSynchOp, userBits, valueBytes.length,
- baToString(valueBytes), getOplogId());
+ baToString(valueBytes), drId, getOplogId());
}
this.dirHolder.incrementTotalOplogSize(adjustment);
@@ -4878,7 +4790,6 @@ public final class Oplog implements CompactableOplog, Flushable {
}
clearOpState();
}
- // }
}
if (useNextOplog) {
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
@@ -4958,7 +4869,7 @@ public final class Oplog implements CompactableOplog, Flushable {
*
* @param entry DiskEntry object on which remove operation is called
*/
- public final void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear) {
+ public void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear) {
DiskRegion dr = region.getDiskRegion();
if (getOplogSet().getChild() != this) {
getOplogSet().getChild().remove(region, entry, async, isClear);
@@ -4987,16 +4898,14 @@ public final class Oplog implements CompactableOplog, Flushable {
did.setValueLength(len);
did.setUserBits(prevUsrBit);
}
-
}
-
}
}
/**
* Write the GC RVV for a single region to disk
*/
- public final void writeGCRVV(DiskRegion dr) {
+ public void writeGCRVV(DiskRegion dr) {
boolean useNextOplog = false;
synchronized (this.lock) {
if (getOplogSet().getChild() != this) {
@@ -5081,15 +4990,12 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- *
- * Asif: A helper function which identifies whether to record a removal of entry in the current
- * oplog or to make the switch to the next oplog. This function enables us to reuse the byte
- * buffer which got created for an oplog which no longer permits us to use itself. It will also
- * take acre of compaction if required
+ * A helper function which identifies whether to record a removal of entry in the current oplog or
+ * to make the switch to the next oplog. This function enables us to reuse the byte buffer which
+ * got created for an oplog which no longer permits us to use itself. It will also take acre of
+ * compaction if required
*
* @param entry DiskEntry object representing the current Entry
- * @throws IOException
- * @throws InterruptedException
*/
private void basicRemove(DiskRegionView dr, DiskEntry entry, boolean async, boolean isClear)
throws IOException, InterruptedException {
@@ -5133,7 +5039,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
// Write the data to the opLog for the synch mode
- // @todo if we don't sync write destroys what will happen if
+ // TODO: if we don't sync write destroys what will happen if
// we do 1. create k1 2. destroy k1 3. create k1?
// It would be possible for the crf to be flushed but not the drf.
// Then during recovery we will find identical keys with different
@@ -5145,13 +5051,11 @@ public final class Oplog implements CompactableOplog, Flushable {
// because we might be killed right after we do this write.
startPosForSynchOp = writeOpLogBytes(this.drf, async, true);
setHasDeletes(true);
- if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES)) {
+ if (logger.isDebugEnabled(LogMarker.PERSIST_WRITES)) {
logger.debug("basicRemove: id=<{}> key=<{}> drId={} oplog#{}", abs(id.getKeyId()),
entry.getKey(), dr.getId(), getOplogId());
}
- // new RuntimeException("STACK"));
-
if (logger.isTraceEnabled()) {
logger.trace("Oplog::basicRemove:Released ByteBuffer for Disk ID = {}", id);
}
@@ -5161,17 +5065,15 @@ public final class Oplog implements CompactableOplog, Flushable {
id.setOffsetInOplog(-1);
EntryLogger.logPersistDestroy(dr.getName(), entry.getKey(), dr.getDiskStoreID());
- {
- Oplog rmOplog = null;
- if (oldOplogId == getOplogId()) {
- rmOplog = this;
- } else {
- rmOplog = getOplogSet().getChild(oldOplogId);
- }
- if (rmOplog != null) {
- rmOplog.rmLive(dr, entry);
- emptyOplog = rmOplog;
- }
+ Oplog rmOplog = null;
+ if (oldOplogId == getOplogId()) {
+ rmOplog = this;
+ } else {
+ rmOplog = getOplogSet().getChild(oldOplogId);
+ }
+ if (rmOplog != null) {
+ rmOplog.rmLive(dr, entry);
+ emptyOplog = rmOplog;
}
clearOpState();
}
@@ -5193,21 +5095,15 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- // /**
- // * This is only used for an assertion check.
- // */
- // private long lastWritePos = -1;
-
/**
* test hook
*/
- public final ByteBuffer getWriteBuf() {
+ ByteBuffer getWriteBuf() {
return this.crf.writeBuf;
}
- private final void flushNoSync(OplogFile olf) throws IOException {
+ private void flushNoSync(OplogFile olf) throws IOException {
flushAllNoSync(false); // @todo
- // flush(olf, false);
}
@Override
@@ -5226,14 +5122,13 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- private final void flushAndSync(OplogFile olf) throws IOException {
- flushAll(false); // @todo
- // flush(olf, true);
+ private void flushAndSync(OplogFile olf) throws IOException {
+ flushAll(false);
}
private static final int MAX_CHANNEL_RETRIES = 5;
- private final void flush(OplogFile olf, boolean doSync) throws IOException {
+ private void flush(OplogFile olf, boolean doSync) throws IOException {
try {
synchronized (this.lock/* olf */) {
if (olf.RAFClosed) {
@@ -5290,7 +5185,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException {
+ private void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException {
try {
synchronized (this.lock/* olf */) {
if (olf.RAFClosed) {
@@ -5318,24 +5213,22 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- public final void flushAll() {
+ public void flushAll() {
flushAll(false);
}
- public final void flushAllNoSync(boolean skipDrf) {
+ public void flushAllNoSync(boolean skipDrf) {
flushAll(skipDrf, false);
}
- public final void flushAll(boolean skipDrf) {
+ public void flushAll(boolean skipDrf) {
flushAll(skipDrf, true/* doSync */);
}
- public final void flushAll(boolean skipDrf, boolean doSync) {
+ public void flushAll(boolean skipDrf, boolean doSync) {
try {
- // if (!skipDrf) {
- // @todo if skipDrf then only need to do drf if crf has flushable data
+ // TODO: if skipDrf then only need to do drf if crf has flushable data
flush(this.drf, doSync);
- // }
flush(this.crf, doSync);
} catch (IOException ex) {
getParent().getCancelCriterion().checkCancelInProgress(ex);
@@ -5346,13 +5239,13 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: Since the ByteBuffer being writen to can have additional bytes which are used for
- * extending the size of the file, it is necessary that the ByteBuffer provided should have limit
- * which is set to the position till which it contains the actual bytes. If the mode is synched
- * write then only we will write up to the capacity & opLogSpace variable have any meaning. For
- * asynch mode it will be zero. Also this method must be synchronized on the file , whether we use
- * synch or asynch write because the fault in operations can clash with the asynch writing. Write
- * the specified bytes to the oplog. Note that since extending a file is expensive this code will
+ * Since the ByteBuffer being writen to can have additional bytes which are used for extending the
+ * size of the file, it is necessary that the ByteBuffer provided should have limit which is set
+ * to the position till which it contains the actual bytes. If the mode is synched write then only
+ * we will write up to the capacity & opLogSpace variable have any meaning. For asynch mode it
+ * will be zero. Also this method must be synchronized on the file , whether we use synch or
+ * asynch write because the fault in operations can clash with the asynch writing. Write the
+ * specified bytes to the oplog. Note that since extending a file is expensive this code will
* possibly write OPLOG_EXTEND_SIZE zero bytes to reduce the number of times the file is extended.
*
*
@@ -5368,7 +5261,7 @@ public final class Oplog implements CompactableOplog, Flushable {
Assert.assertTrue(false, "The Oplog " + this.oplogId + " for store " + getParent().getName()
+ " has been closed for synch mode while writing is going on. This should not happen");
}
- // Asif : It is assumed that the file pointer is already at the
+ // It is assumed that the file pointer is already at the
// appropriate position in the file so as to allow writing at the end.
// Any fault in operations will set the pointer back to the write
// location.
@@ -5457,10 +5350,9 @@ public final class Oplog implements CompactableOplog, Flushable {
// + " oplog #" + getOplogId(), this.owner);
// }
this.beingRead = true;
- final long readPosition = offsetInOplog;
if (/*
* !getParent().isSync() since compactor groups writes &&
- */(readPosition + valueLength) > this.crf.bytesFlushed && !this.closed) {
+ */(offsetInOplog + valueLength) > this.crf.bytesFlushed && !this.closed) {
flushAllNoSync(true); // fix for bug 41205
}
try {
@@ -5482,20 +5374,19 @@ public final class Oplog implements CompactableOplog, Flushable {
try {
final long writePosition =
(this.doneAppending) ? this.crf.bytesFlushed : myRAF.getFilePointer();
- if ((readPosition + valueLength) > writePosition) {
+ if ((offsetInOplog + valueLength) > writePosition) {
throw new DiskAccessException(
LocalizedStrings.Oplog_TRIED_TO_SEEK_TO_0_BUT_THE_FILE_LENGTH_IS_1_OPLOG_FILE_OBJECT_USED_FOR_READING_2
- .toLocalizedString(
- new Object[] {readPosition + valueLength, writePosition, this.crf.raf}),
+ .toLocalizedString(offsetInOplog + valueLength, writePosition, this.crf.raf),
dr.getName());
- } else if (readPosition < 0) {
+ } else if (offsetInOplog < 0) {
throw new DiskAccessException(
- LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1.toLocalizedString(
- new Object[] {offsetInOplog, this.diskFile.getPath()}),
+ LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1
+ .toLocalizedString(offsetInOplog, this.diskFile.getPath()),
dr.getName());
}
try {
- myRAF.seek(readPosition);
+ myRAF.seek(offsetInOplog);
this.stats.incOplogSeeks();
byte[] valueBytes = new byte[valueLength];
myRAF.readFully(valueBytes);
@@ -5543,7 +5434,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: Extracts the Value byte array & UserBit from the OpLog
+ * Extracts the Value byte array & UserBit from the OpLog
*
* @param offsetInOplog The starting position from which to read the data in the opLog
* @param bitOnly boolean indicating whether the value needs to be extracted along with the
@@ -5574,7 +5465,7 @@ public final class Oplog implements CompactableOplog, Flushable {
try {
bb = attemptGet(dr, offsetInOplog, bitOnly, valueLength, userBits);
break;
- } catch (InterruptedIOException e) { // bug 39756
+ } catch (InterruptedIOException ignore) { // bug 39756
// ignore, we'll clear and retry.
} finally {
if (interrupted) {
@@ -5586,10 +5477,8 @@ public final class Oplog implements CompactableOplog, Flushable {
getParent().getCancelCriterion().checkCancelInProgress(ex);
throw new DiskAccessException(
LocalizedStrings.Oplog_FAILED_READING_FROM_0_OPLOGID_1_OFFSET_BEING_READ_2_CURRENT_OPLOG_SIZE_3_ACTUAL_FILE_SIZE_4_IS_ASYNCH_MODE_5_IS_ASYNCH_WRITER_ALIVE_6
- .toLocalizedString(new Object[] {this.diskFile.getPath(),
- Long.valueOf(this.oplogId), Long.valueOf(offsetInOplog),
- Long.valueOf(this.crf.currSize), Long.valueOf(this.crf.bytesFlushed),
- Boolean.valueOf(!dr.isSync()), Boolean.valueOf(false)}),
+ .toLocalizedString(this.diskFile.getPath(), this.oplogId, offsetInOplog,
+ this.crf.currSize, this.crf.bytesFlushed, !dr.isSync(), Boolean.FALSE),
ex, dr.getName());
} catch (IllegalStateException ex) {
checkClosed();
@@ -5600,8 +5489,8 @@ public final class Oplog implements CompactableOplog, Flushable {
}
/**
- * Asif: Extracts the Value byte array & UserBit from the OpLog and inserts it in the wrapper
- * Object of type BytesAndBitsForCompactor which is passed
+ * Extracts the Value byte array & UserBit from the OpLog and inserts it in the wrapper Object of
+ * type BytesAndBitsForCompactor which is passed
*
* @param offsetInOplog The starting position from which to read the data in the opLog
* @param bitOnly boolean indicating whether the value needs to be extracted along with the
@@ -5635,10 +5524,9 @@ public final class Oplog implements CompactableOplog, Flushable {
} else {
try {
synchronized (this.lock/* crf */) {
- final long readPosition = offsetInOplog;
if (/*
* !getParent().isSync() since compactor groups writes &&
- */(readPosition + valueLength) > this.crf.bytesFlushed && !this.closed) {
+ */(offsetInOplog + valueLength) > this.crf.bytesFlushed && !this.closed) {
flushAllNoSync(true); // fix for bug 41205
}
if (!reopenFileIfClosed()) {
@@ -5646,25 +5534,19 @@ public final class Oplog implements CompactableOplog, Flushable {
}
final long writePosition =
(this.doneAppending) ? this.crf.bytesFlushed : this.crf.raf.getFilePointer();
- if ((readPosition + valueLength) > writePosition) {
+ if ((offsetInOplog + valueLength) > writePosition) {
throw new DiskAccessException(
LocalizedStrings.Oplog_TRIED_TO_SEEK_TO_0_BUT_THE_FILE_LENGTH_IS_1_OPLOG_FILE_OBJECT_USED_FOR_READING_2
- .toLocalizedString(
- new Object[] {readPosition + valueLength, writePosition, this.crf.raf}),
+ .toLocalizedString(offsetInOplog + valueLength, writePosition, this.crf.raf),
dr.getName());
- } else if (readPosition < 0) {
+ } else if (offsetInOplog < 0) {
throw new DiskAccessException(
- LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1.toLocalizedString(
- new Object[] {Long.valueOf(offsetInOplog), this.diskFile.getPath()}),
+ LocalizedStrings.Oplog_CANNOT_FIND_RECORD_0_WHEN_READING_FROM_1
+ .toLocalizedString(offsetInOplog, this.diskFile.getPath()),
dr.getName());
}
- // if (this.closed || this.deleted.get()) {
- // throw new DiskAccessException("attempting get on "
- // + (this.deleted.get() ? "destroyed" : "closed")
- // + " oplog #" + getOplogId(), this.owner);
- // }
try {
- this.crf.raf.seek(readPosition);
+ this.crf.raf.seek(offsetInOplog);
this.stats.incOplogSeeks();
byte[] valueBytes = null;
if (wrapper.getBytes().length < valueLength) {
@@ -5694,14 +5576,8 @@ public final class Oplog implements CompactableOplog, Flushable {
getParent().getCancelCriterion().checkCancelInProgress(ex);
throw new DiskAccessException(
LocalizedStrings.Oplog_FAILED_READING_FROM_0_OPLOG_DETAILS_1_2_3_4_5_6
- .toLocalizedString(new Object[] {this.diskFile.getPath(),
- Long.valueOf(this.oplogId), Long.valueOf(offsetInOplog),
- Long.valueOf(this.crf.currSize), Long.valueOf(this.crf.bytesFlushed),
- Boolean.valueOf(/*
- * ! dr . isSync ( )
- *
- * @ todo
- */false), Boolean.valueOf(false)}),
+ .toLocalizedString(this.diskFile.getPath(), this.oplogId, offsetInOplog,
+ this.crf.currSize, this.crf.bytesFlushed, Boolean.FALSE, Boolean.FALSE),
ex, dr.getName());
} catch (IllegalStateException ex) {
@@ -5956,8 +5832,7 @@ public final class Oplog implements CompactableOplog, Flushable {
tlc = 0;
}
double rv = tlc;
- double rvHWM = rvHWMtmp;
- if (((rv / rvHWM) * 100) <= parent.getCompactionThreshold()) {
+ if (((rv / (double) rvHWMtmp) * 100) <= parent.getCompactionThreshold()) {
return true;
}
} else {
@@ -6058,7 +5933,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- private GemFireCacheImpl getGemFireCache() {
+ private InternalCache getInternalCache() {
return getParent().getCache();
}
@@ -6136,7 +6011,7 @@ public final class Oplog implements CompactableOplog, Flushable {
return 0; // do this while holding compactorLock
}
- // Asif:Start with a fresh wrapper on every compaction so that
+ // Start with a fresh wrapper on every compaction so that
// if previous run used some high memory byte array which was
// exceptional, it gets garbage collected.
long opStart = getStats().getStatTime();
@@ -6199,7 +6074,7 @@ public final class Oplog implements CompactableOplog, Flushable {
totalCount++;
getStats().endCompactionUpdate(opStart);
opStart = getStats().getStatTime();
- // Asif: Check if the value byte array happens to be any of the
+ // Check if the value byte array happens to be any of the
// constant
// static byte arrays or references the value byte array of
// underlying RegionEntry.
@@ -6259,8 +6134,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* This method is called by the async value recovery task to recover the values from the crf if
* the keys were recovered from the krf.
- *
- * @param diskRecoveryStores
*/
public void recoverValuesIfNeeded(Map<Long, DiskRecoveryStore> diskRecoveryStores) {
// Early out if we start closing the parent.
@@ -6359,7 +6232,7 @@ public final class Oplog implements CompactableOplog, Flushable {
try {
DiskEntry.Helper.recoverValue(diskEntry, getOplogId(), diskRecoveryStore, in);
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// This region has been destroyed, stop recovering from it.
diskRecoveryStores.remove(diskRegionId);
}
@@ -6417,7 +6290,7 @@ public final class Oplog implements CompactableOplog, Flushable {
InternalDataSerializer.writeUnsignedVL(gcVersion, out);
}
} else {
- InternalDataSerializer.writeBoolean(dr.getRVVTrusted(), out);
+ DataSerializer.writeBoolean(dr.getRVVTrusted(), out);
// Otherwise, we will write the version and exception list for each
// member
Map<VersionSource, RegionVersionHolder> memberToVersion = rvv.getMemberToVersion();
@@ -6437,27 +6310,12 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
}
- byte[] rvvBytes = out.toByteArray();
- return rvvBytes;
+ return out.toByteArray();
}
- // // Comparable code //
- // public int compareTo(Oplog o) {
- // return getOplogId() - o.getOplogId();
- // }
- // public boolean equals(Object o) {
- // if (o instanceof Oplog) {
- // return compareTo((Oplog)o) == 0;
- // } else {
- // return false;
- // }
- // }
- // public int hashCode() {
- // return getOplogId();
- // }
@Override
public String toString() {
- return "oplog#" + getOplogId() /* + "DEBUG" + System.identityHashCode(this) */;
+ return "oplog#" + getOplogId();
}
/**
@@ -6472,10 +6330,6 @@ public final class Oplog implements CompactableOplog, Flushable {
return chPrev;
}
- // //////// Methods used during recovery //////////////
-
- // ////////////////////Inner Classes //////////////////////
-
private static class OplogFile {
public File f;
public UninterruptibleRandomAccessFile raf;
@@ -6501,25 +6355,16 @@ public final class Oplog implements CompactableOplog, Flushable {
}
private static String baToString(byte[] ba, int len) {
- if (ba == null)
+ if (ba == null) {
return "null";
- StringBuffer sb = new StringBuffer();
+ }
+ StringBuilder sb = new StringBuilder();
for (int i = 0; i < len; i++) {
sb.append(ba[i]).append(", ");
}
return sb.toString();
}
- private static String laToString(long[] la) {
- if (la == null)
- return "null";
- StringBuffer sb = new StringBuffer();
- for (int i = 0; i < la.length; i++) {
- sb.append(la[i]).append(", ");
- }
- return sb.toString();
- }
-
void serializeVersionTag(VersionHolder tag, DataOutput out) throws IOException {
int entryVersion = tag.getEntryVersion();
long regionVersion = tag.getRegionVersion();
@@ -6551,8 +6396,7 @@ public final class Oplog implements CompactableOplog, Flushable {
VersionSource versionMember, long timestamp, int dsId) throws IOException {
HeapDataOutputStream out = new HeapDataOutputStream(4 + 8 + 4 + 8 + 4, Version.CURRENT);
serializeVersionTag(entryVersion, regionVersion, versionMember, timestamp, dsId, out);
- byte[] versionsBytes = out.toByteArray();
- return versionsBytes;
+ return out.toByteArray();
}
private void serializeVersionTag(int entryVersion, long regionVersion,
@@ -6592,11 +6436,7 @@ public final class Oplog implements CompactableOplog, Flushable {
private byte[] versionsBytes;
private short gfversion;
- // private int entryVersion;
- // private long regionVersion;
- // private int memberId; // canonicalId of memberID
-
- public final int getSize() {
+ public int getSize() {
return this.size;
}
@@ -6607,17 +6447,16 @@ public final class Oplog implements CompactableOplog, Flushable {
return sb.toString();
}
- private final void write(OplogFile olf, ValueWrapper vw) throws IOException {
+ private void write(OplogFile olf, ValueWrapper vw) throws IOException {
vw.sendTo(olf.writeBuf, Oplog.this);
}
- private final void write(OplogFile olf, byte[] bytes, int byteLength) throws IOException {
+ private void write(OplogFile olf, byte[] bytes, int byteLength) throws IOException {
int offset = 0;
- final int maxOffset = byteLength;
ByteBuffer bb = olf.writeBuf;
- while (offset < maxOffset) {
+ while (offset < byteLength) {
- int bytesThisTime = maxOffset - offset;
+ int bytesThisTime = byteLength - offset;
boolean needsFlush = false;
if (bytesThisTime > bb.remaining()) {
needsFlush = true;
@@ -6631,7 +6470,7 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- private final void writeByte(OplogFile olf, byte v) throws IOException {
+ private void writeByte(OplogFile olf, byte v) throws IOException {
ByteBuffer bb = olf.writeBuf;
if (1 > bb.remaining()) {
flushNoSync(olf);
@@ -6639,7 +6478,7 @@ public final class Oplog implements CompactableOplog, Flushable {
bb.put(v);
}
- private final void writeOrdinal(OplogFile olf, short ordinal) throws IOException {
+ private void writeOrdinal(OplogFile olf, short ordinal) throws IOException {
ByteBuffer bb = olf.writeBuf;
if (3 > bb.remaining()) {
flushNoSync(olf);
@@ -6648,7 +6487,7 @@ public final class Oplog implements CompactableOplog, Flushable {
Version.writeOrdinal(bb, ordinal, false);
}
- private final void writeInt(OplogFile olf, int v) throws IOException {
+ private void writeInt(OplogFile olf, int v) throws IOException {
ByteBuffer bb = olf.writeBuf;
if (4 > bb.remaining()) {
flushNoSync(olf);
@@ -6656,7 +6495,7 @@ public final class Oplog implements CompactableOplog, Flushable {
bb.putInt(v);
}
- private final void writeLong(OplogFile olf, long v) throws IOException {
+ private void writeLong(OplogFile olf, long v) throws IOException {
ByteBuffer bb = olf.writeBuf;
if (8 > bb.remaining()) {
flushNoSync(olf);
@@ -7001,9 +6840,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* returns the number of entries cleared
- *
- * @param rvv
- * @param pendingKrfTags
*/
public synchronized int clear(RegionVersionVector rvv,
Map<DiskEntry, VersionHolder> pendingKrfTags) {
@@ -7023,8 +6859,6 @@ public final class Oplog implements CompactableOplog, Flushable {
/**
* Clear using an RVV. Remove live entries that are contained within the clear RVV.
- *
- * @param pendingKrfTags
*/
private int clearWithRVV(RegionVersionVector rvv, Map<DiskEntry, VersionTag> pendingKrfTags) {
// TODO this doesn't work, because we can end up removing entries from
@@ -7033,35 +6867,6 @@ public final class Oplog implements CompactableOplog, Flushable {
// behavior
// until I fix the region map code.
return 0;
- // int result = 0;
- // DiskEntry n = getNext();
- // while (n != this) {
- // DiskEntry nextEntry = n.getNext();
- // VersionSource member = null;
- // long version = -1;
- // if(pendingKrfTags != null) {
- // VersionTag tag = pendingKrfTags.get(n);
- // if(tag != null) {
- // member = tag.getMemberID();
- // version = tag.getRegionVersion();
- // }
- // }
- // if(member == null) {
- // VersionStamp stamp = n.getVersionStamp();
- // member = stamp.getMemberID();
- // version = stamp.getRegionVersion();
- // }
- //
- // if(rvv.contains(member, version)) {
- // result++;
- // remove(n);
- // if(pendingKrfTags != null) {
- // pendingKrfTags.remove(n);
- // }
- // }
- // n = nextEntry;
- // }
- // return result;
}
/**
@@ -7127,23 +6932,6 @@ public final class Oplog implements CompactableOplog, Flushable {
}
}
- // private synchronized void checkForDuplicate(DiskEntry v) {
- // DiskEntry de = getPrev();
- // final long newKeyId = v.getDiskId().getKeyId();
- // while (de != this) {
- // if (de.getDiskId().getKeyId() == newKeyId) {
- // throw new IllegalStateException(
- // "DEBUG: found duplicate for oplogKeyId=" + newKeyId + " de="
- // + System.identityHashCode(v) + " ode="
- // + System.identityHashCode(de) + " deKey=" + v.getKey()
- // + " odeKey=" + de.getKey() + " deOffset="
- // + v.getDiskId().getOffsetInOplog() + " odeOffset="
- // + de.getDiskId().getOffsetInOplog());
- // }
- // de = de.getPrev();
- // }
- // }
-
@Override
public Object getKey() {
throw new IllegalStateException();
@@ -7214,7 +7002,6 @@ public final class Oplog implements CompactableOplog, Flushable {
* @param liveEntries the array to fill with the live entries
* @param idx the first free slot in liveEntries
* @param drv the disk region these entries are on
- * @param pendingKrfTags
* @return the next free slot in liveEntries
*/
public synchronized int addLiveEntriesToList(KRFEntry[] liveEntries, int idx,
@@ -7597,11 +7384,11 @@ public final class Oplog implements CompactableOplog, Flushable {
public abstract long clear(RegionVersionVector rvv);
- final public DiskRegionView getDiskRegion() {
+ public DiskRegionView getDiskRegion() {
return this.dr;
}
- final public void setDiskRegion(DiskRegionView dr) {
+ public void setDiskRegion(DiskRegionView dr) {
this.dr = dr;
}
@@ -7614,11 +7401,11 @@ public final class Oplog implements CompactableOplog, Flushable {
return result;
}
- final synchronized public boolean getUnrecovered() {
+ synchronized public boolean getUnrecovered() {
return this.unrecovered;
}
- final synchronized public boolean testAndSetRecovered(DiskRegionView dr) {
+ synchronized public boolean testAndSetRecovered(DiskRegionView dr) {
boolean result = this.unrecovered;
if (result) {
this.unrecovered = false;
@@ -7773,9 +7560,8 @@ public final class Oplog implements CompactableOplog, Flushable {
public int addLiveEntriesToList(KRFEntry[] liveEntries, int idx) {
synchronized (liveEntries) {
- int result = this.liveEntries.addLiveEntriesToList(liveEntries, idx, getDiskRegion(),
+ return this.liveEntries.addLiveEntriesToList(liveEntries, idx, getDiskRegion(),
pendingKrfTags);
- return result;
}
}
@@ -7816,8 +7602,10 @@ public final class Oplog implements CompactableOplog, Flushable {
* range.
*/
static class OplogEntryIdMap {
+
private final Int2ObjectOpenHashMap ints =
new Int2ObjectOpenHashMap((int) DiskStoreImpl.INVALID_ID);
+
private final Long2ObjectOpenHashMap longs =
new Long2ObjectOpenHashMap((int) DiskStoreImpl.INVALID_ID);