You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/23 21:24:13 UTC
[77/94] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
index 0000000,3a3b5a1..cc358f5
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/BytesAndBitsForCompactor.java
@@@ -1,0 -1,94 +1,94 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package com.gemstone.gemfire.internal.cache;
+
-import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+
+ /**
+ * Used to fetch a record's raw bytes and user bits.
+ * The actual data length in byte array may be less than
+ * the size of the byte array itself. An integer field contains
+ * the valid length. This class is used exclusively by the Oplog Compactor
+ * for rolling the entries. The reason for this class is to reuse the
+ * underlying byte array for rolling multiple entries there by
+ * reducing the garbage.
+ * @author Asif
+ * @since 5.5
+ */
+ public class BytesAndBitsForCompactor {
+ /**
+ * If dataChunk is set then ignore the "data" and "validLength" fields.
+ * The dataChunk field is unretained so it can only be used while the RegionEntry is still synced.
+ * When done with the dataChunk, null it out if you want to reuse the byte[] later.
+ */
- private @Unretained Chunk dataChunk;
++ private @Unretained ObjectChunk dataChunk;
+ private byte[] data;
+ private byte userBits=0;
+ // length of the data present in the byte array
+ private int validLength;
+ private static final byte[] INIT_FOR_WRAPPER = new byte[0];
+ // boolean indicating if the object can be reused.
+ // Typically if the data stores the reference of a value byte [] directly
+ // from the RegionEntry than this byte array cannot be reused for
+ //storing another entry's data
+ private boolean isReusable ;
+
+ public BytesAndBitsForCompactor() {
+ this.data = INIT_FOR_WRAPPER;
+ //this.userBits = userBits;
+ this.validLength = INIT_FOR_WRAPPER.length;
+ this.isReusable = true;
+ }
+
+
- public final Chunk getDataChunk() {
++ public final ObjectChunk getDataChunk() {
+ return this.dataChunk;
+ }
+ public final byte[] getBytes() {
+ return this.data;
+ }
+ public final byte getBits() {
+ return this.userBits;
+ }
+
+ public final int getValidLength() {
+ return this.validLength;
+ }
+
+ public boolean isReusable() {
+ return this.isReusable;
+ }
+
+ /**
+ *
+ * @param data byte array storing the data
+ * @param userBits byte with appropriate bits set
+ * @param validLength The number of bytes representing the data , starting from 0 as offset
+ * @param isReusable true if this object is safe for reuse as a data holder
+ */
+ public void setData(byte[] data, byte userBits, int validLength, boolean isReusable) {
+ this.data = data;
+ this.userBits = userBits;
+ this.validLength = validLength;
+ this.isReusable = isReusable;
+ }
- public void setChunkData(Chunk c, byte userBits) {
++ public void setChunkData(ObjectChunk c, byte userBits) {
+ this.dataChunk = c;
+ this.userBits = userBits;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index 0000000,c855cca..327279b
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@@ -1,0 -1,2028 +1,2028 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package com.gemstone.gemfire.internal.cache;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.cache.CacheClosedException;
+ import com.gemstone.gemfire.cache.DiskAccessException;
+ import com.gemstone.gemfire.distributed.internal.DM;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.ByteArrayDataInput;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
+ import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry;
+ import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
+ import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
+ import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+ import com.gemstone.gemfire.internal.cache.persistence.BytesAndBits;
+ import com.gemstone.gemfire.internal.cache.persistence.DiskRecoveryStore;
+ import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView;
+ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+ import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+ import com.gemstone.gemfire.internal.offheap.ReferenceCountHelper;
+ import com.gemstone.gemfire.internal.offheap.Releasable;
+ import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.annotations.Released;
+ import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.util.BlobHelper;
+
+ /**
+ * Represents an entry in an {@link RegionMap} whose value may be
+ * stored on disk. This interface provides accessor and mutator
+ * methods for a disk entry's state. This allows us to abstract all
+ * of the interesting behavior into a {@linkplain DiskEntry.Helper
+ * helper class} that we only need to implement once.
+ *
+ * <P>
+ *
+ * Each <code>DiskEntry</code> has a unique <code>id</code> that is
+ * used by the {@link DiskRegion} to identify the key/value pair.
+ * Before the disk entry is written to disk, the value of the
+ * <code>id</code> is {@link DiskRegion#INVALID_ID invalid}. Once the
+ * object has been written to disk, the <code>id</code> is a positive
+ * number. If the value is {@linkplain Helper#update updated}, then the
+ * <code>id</code> is negated to signify that the value on disk is
+ * dirty.
+ *
+ * @see DiskRegion
+ *
+ * @author David Whitlock
+ *
+ * @since 3.2
+ */
+ public interface DiskEntry extends RegionEntry {
+ /**
+ * Sets the value with a {@link RegionEntryContext}.
+ * @param context the value's context.
+ * @param value an entry value.
+ */
+ public void setValueWithContext(RegionEntryContext context,Object value);
+
+ /**
+ * In some cases we need to do something just before we drop the value
+ * from a DiskEntry that is being moved (i.e. overflowed) to disk.
+ * @param context
+ */
+ public void handleValueOverflow(RegionEntryContext context);
+
+ /**
+ * In some cases we need to do something just after we unset the value
+ * from a DiskEntry that has been moved (i.e. overflowed) to disk.
+ * @param context
+ */
+ public void afterValueOverflow(RegionEntryContext context);
+
+ /**
+ * Returns true if the DiskEntry value is equal to {@link Token#DESTROYED}, {@link Token#REMOVED_PHASE1}, or {@link Token#REMOVED_PHASE2}.
+ */
+ public boolean isRemovedFromDisk();
+
+ /**
+ * Returns the id of this <code>DiskEntry</code>
+ */
+ public DiskId getDiskId();
+
+ public void _removePhase1();
+
+ public int updateAsyncEntrySize(EnableLRU capacityController);
+
+ public DiskEntry getPrev();
+ public DiskEntry getNext();
+ public void setPrev(DiskEntry v);
+ public void setNext(DiskEntry v);
+
+ /**
+ * Used as the entry value if it was invalidated.
+ */
+ public static final byte[] INVALID_BYTES = new byte[0];
+ /**
+ * Used as the entry value if it was locally invalidated.
+ */
+ public static final byte[] LOCAL_INVALID_BYTES = new byte[0];
+ /**
+ * Used as the entry value if it was tombstone.
+ */
+ public static final byte[] TOMBSTONE_BYTES = new byte[0];
+
+ /////////////////////// Inner Classes //////////////////////
+
+ /**
+ * A Helper class for performing functions common to all
+ * <code>DiskEntry</code>s.
+ */
+ public static class Helper {
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * Testing purpose only
+ * Get the value of an entry that is on disk without faulting
+ * it in and without looking in the io buffer.
+ * @since 3.2.1
+ */
+ static Object getValueOnDisk(DiskEntry entry, DiskRegion dr) {
+ DiskId id = entry.getDiskId();
+ if (id == null) {
+ return null;
+ }
+ dr.acquireReadLock();
+ try {
+ synchronized (id) {
+ if (id == null
+ || (dr.isBackup() && id.getKeyId() == DiskRegion.INVALID_ID)
+ || (!entry.isValueNull() && id.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(id.getUserBits()))/*fix for bug 41942*/) {
+ return null;
+ }
+
+ return dr.getNoBuffer(id);
+ }
+ } finally {
+ dr.releaseReadLock();
+ }
+ }
+
+ /**
+ * Get the serialized value directly from disk. Returned object may be
+ * a {@link CachedDeserializable}. Goes straight to disk without faulting
+ * into memory. Only looks at the disk storage, not at heap storage.
+ * @param entry the entry used to identify the value to fetch
+ * @param dr the persistent storage from which to fetch the value
+ * @return either null, byte array, or CacheDeserializable
+ * @since gemfire57_hotfix
+ */
+ public static Object getSerializedValueOnDisk(
+ DiskEntry entry, DiskRegion dr) {
+ DiskId did = entry.getDiskId();
+ if (did == null) {
+ return null;
+ }
+ dr.acquireReadLock();
+ try {
+ synchronized (did) {
+ if (did == null
+ || (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID)) {
+ return null;
+ } else if (!entry.isValueNull() && did.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(did.getUserBits())/*fix for bug 41942*/) {
+ return null;
+ }
+ return dr.getSerializedData(did);
+ }
+ } finally {
+ dr.releaseReadLock();
+ }
+ }
+
+
+ /**
+ * Get the value of an entry that is on disk without
+ * faulting it in . It checks for the presence in the buffer also.
+ * This method is used for concurrent map operations, SQLFabric and CQ processing
+ *
+ * @throws DiskAccessException
+ * @since 5.1
+ */
+ static Object getValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) {
+ @Released Object v = getOffHeapValueOnDiskOrBuffer(entry, dr, context);
+ if (v instanceof CachedDeserializable) {
- if (v instanceof Chunk) {
- @Released Chunk ohv = (Chunk) v;
++ if (v instanceof ObjectChunk) {
++ @Released ObjectChunk ohv = (ObjectChunk) v;
+ try {
+ v = ohv.getDeserializedValue(null, null);
+ if (v == ohv) {
+ throw new IllegalStateException("sqlf tried to use getValueOnDiskOrBuffer");
+ }
+ } finally {
+ ohv.release(); // OFFHEAP the offheap ref is decremented here
+ }
+ } else {
+ v = ((CachedDeserializable)v).getDeserializedValue(null, null);
+ }
+ }
+ return v;
+ }
+
+ @Retained
+ static Object getOffHeapValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) {
+ DiskId did = entry.getDiskId();
+ Object syncObj = did;
+ if (syncObj == null) {
+ syncObj = entry;
+ }
+ if (syncObj == did) {
+ dr.acquireReadLock();
+ }
+ try {
+ synchronized (syncObj) {
+ if (did != null && did.isPendingAsync()) {
+ @Retained Object v = entry._getValueRetain(context, true); // TODO:KIRK:OK Rusty had Object v = entry.getValueWithContext(context);
+ if (Token.isRemovedFromDisk(v)) {
+ v = null;
+ }
+ return v;
+ }
+ if (did == null
+ || ( dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID)
+ || (!entry.isValueNull() && did.needsToBeWritten() && !EntryBits.isRecoveredFromDisk(did.getUserBits()))/*fix for bug 41942*/) {
+ return null;
+ }
+
+ return dr.getSerializedData(did);
+ }
+ } finally {
+ if (syncObj == did) {
+ dr.releaseReadLock();
+ }
+ }
+ }
+
+ /**
+ * Returns false if the entry is INVALID (or LOCAL_INVALID). Determines this
+ * without faulting in the value from disk.
+ *
+ * @since 3.2.1
+ */
+ /* TODO prpersist - Do we need this method? It was added by the sqlf merge
+ static boolean isValid(DiskEntry entry, DiskRegion dr) {
+ synchronized (entry) {
+ if (entry.isRecovered()) {
+ // We have a recovered entry whose value is still on disk.
+ // So take a peek at it without faulting it in.
+ //long id = entry.getDiskId().getKeyId();
+ //entry.getDiskId().setKeyId(-id);
+ byte bits = dr.getBits(entry.getDiskId());
+ //TODO Asif:Check if resetting is needed
+ return !EntryBits.isInvalid(bits) && !EntryBits.isLocalInvalid(bits);
+ }
+ }
+ }*/
+
+ static boolean isOverflowedToDisk(DiskEntry de, DiskRegion dr, DistributedRegion.DiskPosition dp,RegionEntryContext context) {
+ Object v = null;
+ DiskId did;
+ synchronized (de) {
+ did = de.getDiskId();
+ }
+ Object syncObj = did;
+ if (syncObj == null) {
+ syncObj = de;
+ }
+ if (syncObj == did) {
+ dr.acquireReadLock();
+ }
+ try {
+ synchronized (syncObj) {
+ if (de.isValueNull()) {
+ if (did == null) {
+ synchronized (de) {
+ did = de.getDiskId();
+ }
+ assert did != null;
+ return isOverflowedToDisk(de, dr, dp, context);
+ } else {
+ dp.setPosition(did.getOplogId(), did.getOffsetInOplog());
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+ } finally {
+ if (syncObj == did) {
+ dr.releaseReadLock();
+ }
+ }
+ }
+
+ /**
+ * Get the value of an entry that is on disk without faulting
+ * it in.
+ * @since 3.2.1
+ */
+ static boolean fillInValue(DiskEntry de, InitialImageOperation.Entry entry,
+ DiskRegion dr, DM mgr, ByteArrayDataInput in, RegionEntryContext context) {
+ @Retained @Released Object v = null;
+ DiskId did;
+ synchronized (de) {
+ did = de.getDiskId();
+ }
+ Object syncObj = did;
+ if (syncObj == null) {
+ syncObj = de;
+ }
+ if (syncObj == did) {
+ dr.acquireReadLock();
+ }
+ try {
+ synchronized (syncObj) {
+ entry.setLastModified(mgr, de.getLastModified());
+
+ ReferenceCountHelper.setReferenceCountOwner(entry);
+ v = de._getValueRetain(context, true); // OFFHEAP copied to heap entry; todo allow entry to refer to offheap since it will be copied to network.
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ if (v == null) {
+ if (did == null) {
+ // fix for bug 41449
+ synchronized (de) {
+ did = de.getDiskId();
+ }
+ assert did != null;
+ // do recursive call to get readLock on did
+ return fillInValue(de, entry, dr, mgr, in, context);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("DiskEntry.Helper.fillInValue, key={}; getting value from disk, disk id={}", entry.key, did);
+ }
+ BytesAndBits bb = null;
+ try {
+ bb = dr.getBytesAndBits(did, false);
+ }catch(DiskAccessException dae){
+ return false;
+ }
+ if (EntryBits.isInvalid(bb.getBits())) {
+ entry.setInvalid();
+ }
+ else if (EntryBits.isLocalInvalid(bb.getBits())) {
+ entry.setLocalInvalid();
+ }
+ else if (EntryBits.isTombstone(bb.getBits())) {
+ entry.setTombstone();
+ }
+ else {
+ entry.value = bb.getBytes();
+ entry.setSerialized(EntryBits.isSerialized(bb.getBits()));
+ }
+ return true;
+ }
+ }
+ } finally {
+ if (syncObj == did) {
+ dr.releaseReadLock();
+ }
+ }
+ final boolean isEagerDeserialize = entry.isEagerDeserialize();
+ if (isEagerDeserialize) {
+ entry.clearEagerDeserialize();
+ }
+ if (Token.isRemovedFromDisk(v)) {
+ // fix for bug 31757
+ return false;
+ } else if (v instanceof CachedDeserializable) {
+ try {
+ if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
+ entry.setSerialized(false);
+ entry.value = ((StoredObject) v).getDeserializedForReading();
+
+ //For SQLFire we prefer eager deserialized
+ // if(v instanceof ByteSource) {
+ // entry.setEagerDeserialize();
+ // }
+ } else {
+ // don't serialize here if it is not already serialized
+
+ Object tmp = ((CachedDeserializable)v).getValue();
+ //For SQLFire we prefer eager deserialized
+ // if(v instanceof ByteSource) {
+ // entry.setEagerDeserialize();
+ // }
+ if (tmp instanceof byte[]) {
+ byte[] bb = (byte[])tmp;
+ entry.value = bb;
+ entry.setSerialized(true);
+ }
+ else if (isEagerDeserialize && tmp instanceof byte[][]) {
+ // optimize for byte[][] since it will need to be eagerly deserialized
+ // for SQLFabric
+ entry.value = tmp;
+ entry.setEagerDeserialize();
+ entry.setSerialized(true);
+ }
+ else {
+ try {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ BlobHelper.serializeTo(tmp, hdos);
+ hdos.trim();
+ entry.value = hdos;
+ entry.setSerialized(true);
+ } catch (IOException e) {
+ RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+ }
+ } finally {
+ // If v == entry.value then v is assumed to be an OffHeapByteSource
+ // and release() will be called on v after the bytes have been read from
+ // off-heap.
+ if (v != entry.value) {
+ OffHeapHelper.releaseWithNoTracking(v);
+ }
+ }
+ }
+ else if (v instanceof byte[]) {
+ entry.value = v;
+ entry.setSerialized(false);
+ }
+ else if (isEagerDeserialize && v instanceof byte[][]) {
+ // optimize for byte[][] since it will need to be eagerly deserialized
+ // for SQLFabric
+ entry.value = v;
+ entry.setEagerDeserialize();
+ }
+ else if (v == Token.INVALID) {
+ entry.setInvalid();
+ }
+ else if (v == Token.LOCAL_INVALID) {
+ // fix for bug 31107
+ entry.setLocalInvalid();
+ } else if (v == Token.TOMBSTONE) {
+ entry.setTombstone();
+ }
+ else {
+ Object preparedValue = v;
+ if (preparedValue != null) {
+ preparedValue = AbstractRegionEntry.prepareValueForGII(preparedValue);
+ if (preparedValue == null) {
+ return false;
+ }
+ }
+ if (CachedDeserializableFactory.preferObject()) {
+ entry.value = preparedValue;
+ entry.setEagerDeserialize();
+ }
+ else {
+ try {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ BlobHelper.serializeTo(preparedValue, hdos);
+ hdos.trim();
+ entry.value = hdos;
+ entry.setSerialized(true);
+ } catch (IOException e) {
+ RuntimeException e2 = new IllegalArgumentException(LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING.toLocalizedString());
+ e2.initCause(e);
+ throw e2;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Used to initialize a new disk entry
+ */
+ static void initialize(DiskEntry entry, DiskRecoveryStore r, Object newValue) {
+ DiskRegionView drv = null;
+ if (r instanceof LocalRegion) {
+ drv = ((LocalRegion)r).getDiskRegion();
+ } else if (r instanceof DiskRegionView) {
+ drv = (DiskRegionView)r;
+ }
+ if (drv == null) {
+ throw new IllegalArgumentException(LocalizedStrings.DiskEntry_DISK_REGION_IS_NULL.toLocalizedString());
+ }
+
+ if (newValue == null || Token.isRemovedFromDisk(newValue)) {
+ // it is not in vm and it is not on disk
+ DiskId did = entry.getDiskId();
+ if (did != null) {
+ did.setKeyId(DiskRegion.INVALID_ID);
+ }
+ }
+ else if (newValue instanceof RecoveredEntry) {
+ // Set the id directly, the value will also be set if RECOVER_VALUES
+ RecoveredEntry re = (RecoveredEntry)newValue;
+ DiskId did = entry.getDiskId();
+ did.setOplogId(re.getOplogId());
+ did.setOffsetInOplog(re.getOffsetInOplog());
+ did.setKeyId(re.getRecoveredKeyId());
+ did.setUserBits(re.getUserBits());
+ did.setValueLength(re.getValueLength());
+ if (re.getRecoveredKeyId() < 0) {
+ drv.incNumOverflowOnDisk(1L);
+ drv.incNumOverflowBytesOnDisk(did.getValueLength());
+ incrementBucketStats(r, 0/*InVM*/, 1/*OnDisk*/, did.getValueLength());
+ }
+ else {
+ entry.setValueWithContext(drv, entry.prepareValueForCache((RegionEntryContext) r,
+ re.getValue(), false));
+ drv.incNumEntriesInVM(1L);
+ incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0);
+ }
+ }
+ else {
+ DiskId did = entry.getDiskId();
+ if (did != null) {
+ did.setKeyId(DiskRegion.INVALID_ID);
+ }
+ drv.incNumEntriesInVM(1L);
+ incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0);
+ }
+ }
+
+ private static final ValueWrapper INVALID_VW = new ByteArrayValueWrapper(true, INVALID_BYTES);
+ private static final ValueWrapper LOCAL_INVALID_VW = new ByteArrayValueWrapper(true, LOCAL_INVALID_BYTES);
+ private static final ValueWrapper TOMBSTONE_VW = new ByteArrayValueWrapper(true, TOMBSTONE_BYTES);
+
+ public static interface ValueWrapper {
+ public boolean isSerialized();
+ public int getLength();
+ public byte getUserBits();
+ public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException;
+ public String getBytesAsString();
+ }
+ public static interface Flushable {
+ public void flush() throws IOException;
+
+ public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException;
+ }
+ public static class ByteArrayValueWrapper implements ValueWrapper {
+ public final boolean isSerializedObject;
+ public final byte[] bytes;
+
+ public ByteArrayValueWrapper(boolean isSerializedObject, byte[] bytes) {
+ this.isSerializedObject = isSerializedObject;
+ this.bytes = bytes;
+ }
+
+ @Override
+ public boolean isSerialized() {
+ return this.isSerializedObject;
+ }
+
+ @Override
+ public int getLength() {
+ return (this.bytes != null) ? this.bytes.length : 0;
+ }
+
+ private boolean isInvalidToken() {
+ return this == INVALID_VW;
+ }
+
+ private boolean isLocalInvalidToken() {
+ return this == LOCAL_INVALID_VW;
+ }
+
+ private boolean isTombstoneToken() {
+ return this == TOMBSTONE_VW;
+ }
+
+ @Override
+ public byte getUserBits() {
+ byte userBits = 0x0;
+ if (isSerialized()) {
+ if (isTombstoneToken()) {
+ userBits = EntryBits.setTombstone(userBits, true);
+ } else if (isInvalidToken()) {
+ userBits = EntryBits.setInvalid(userBits, true);
+ } else if (isLocalInvalidToken()) {
+ userBits = EntryBits.setLocalInvalid(userBits, true);
+ } else {
+ if (this.bytes == null) {
+ throw new IllegalStateException("userBits==1 and value is null");
+ } else if (this.bytes.length == 0) {
+ throw new IllegalStateException("userBits==1 and value is zero length");
+ }
+ userBits = EntryBits.setSerialized(userBits, true);
+ }
+ }
+ return userBits;
+ }
+
+ @Override
+ public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException {
+ int offset = 0;
+ final int maxOffset = getLength();
+ while (offset < maxOffset) {
+ int bytesThisTime = maxOffset - offset;
+ boolean needsFlush = false;
+ if (bytesThisTime > bb.remaining()) {
+ needsFlush = true;
+ bytesThisTime = bb.remaining();
+ }
+ bb.put(this.bytes, offset, bytesThisTime);
+ offset += bytesThisTime;
+ if (needsFlush) {
+ flushable.flush();
+ }
+ }
+ }
+
+ @Override
+ public String getBytesAsString() {
+ if (this.bytes == null) {
+ return "null";
+ }
+ StringBuffer sb = new StringBuffer();
+ int len = getLength();
+ for (int i = 0; i < len; i++) {
+ sb.append(this.bytes[i]).append(", ");
+ }
+ return sb.toString();
+ }
+ }
+
+ /**
+ * This class is a bit of a hack used by the compactor.
+ * For the compactor always copies to a byte[] so
+ * this class is just a simple wrapper.
+ * It is possible that the length of the byte array is greater
+ * than the actual length of the wrapped data.
+ * At the time we create this we are all done with isSerialized
+ * and userBits so those methods are not supported.
+ */
+ public static class CompactorValueWrapper extends ByteArrayValueWrapper {
+ private final int length;
+
+ public CompactorValueWrapper(byte[] bytes, int length) {
+ super(false, bytes);
+ this.length = length;
+ }
+
+ @Override
+ public boolean isSerialized() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getLength() {
+ return this.length;
+ }
+
+ @Override
+ public byte getUserBits() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Note that the Chunk this ValueWrapper is created with
+ * is unretained so it must be used before the owner of
+ * the chunk releases it.
+ * Since the RegionEntry that has the value we are writing to
+ * disk has it retained we are ok as long as this ValueWrapper's
+ * life ends before the RegionEntry sync is released.
+ * Note that this class is only used with uncompressed chunks.
+ */
+ public static class ChunkValueWrapper implements ValueWrapper {
- private final @Unretained Chunk chunk;
- public ChunkValueWrapper(Chunk c) {
++ private final @Unretained ObjectChunk chunk;
++ public ChunkValueWrapper(ObjectChunk c) {
+ assert !c.isCompressed();
+ this.chunk = c;
+ }
+ @Override
+ public boolean isSerialized() {
+ return this.chunk.isSerialized();
+ }
+ @Override
+ public int getLength() {
+ return this.chunk.getDataSize();
+ }
+ @Override
+ public byte getUserBits() {
+ byte userBits = 0x0;
+ if (isSerialized()) {
+ userBits = EntryBits.setSerialized(userBits, true);
+ }
+ return userBits;
+ }
+ @Override
+ public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException {
+ final int maxOffset = getLength();
+ if (maxOffset == 0) {
+ return;
+ }
+ if (maxOffset > bb.capacity()) {
+ ByteBuffer chunkbb = this.chunk.createDirectByteBuffer();
+ if (chunkbb != null) {
+ flushable.flush(bb, chunkbb);
+ return;
+ }
+ }
- final long bbAddress = Chunk.getDirectByteBufferAddress(bb);
++ final long bbAddress = ObjectChunk.getDirectByteBufferAddress(bb);
+ if (bbAddress != 0L) {
+ int bytesRemaining = maxOffset;
+ int availableSpace = bb.remaining();
+ long addrToWrite = bbAddress + bb.position();
+ long addrToRead = this.chunk.getAddressForReading(0, maxOffset);
+ if (bytesRemaining > availableSpace) {
+ do {
+ UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, availableSpace);
+ bb.position(bb.position()+availableSpace);
+ addrToRead += availableSpace;
+ bytesRemaining -= availableSpace;
+ flushable.flush();
+ addrToWrite = bbAddress + bb.position();
+ availableSpace = bb.remaining();
+ } while (bytesRemaining > availableSpace);
+ }
+ UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, bytesRemaining);
+ bb.position(bb.position()+bytesRemaining);
+ } else {
+ long addr = this.chunk.getAddressForReading(0, maxOffset);
+ final long endAddr = addr + maxOffset;
+ while (addr != endAddr) {
+ bb.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+ addr++;
+ if (!bb.hasRemaining()) {
+ flushable.flush();
+ }
+ }
+ }
+ }
+ @Override
+ public String getBytesAsString() {
+ return this.chunk.getStringForm();
+ }
+ }
+
+ public static ValueWrapper createValueWrapper(Object value, EntryEventImpl event) {
+ if (value == Token.INVALID) {
+ // even though it is not serialized we say it is because
+ // bytes will never be an empty array when it is serialized
+ // so that gives us a way to specify the invalid value
+ // given a byte array and a boolean flag.
+ return INVALID_VW;
+ }
+ else if (value == Token.LOCAL_INVALID) {
+ // even though it is not serialized we say it is because
+ // bytes will never be an empty array when it is serialized
+ // so that gives us a way to specify the local-invalid value
+ // given a byte array and a boolean flag.
+ return LOCAL_INVALID_VW;
+ }
+ else if (value == Token.TOMBSTONE) {
+ return TOMBSTONE_VW;
+ }
+ else {
+ boolean isSerializedObject = true;
+ byte[] bytes;
+ if (value instanceof CachedDeserializable) {
+ CachedDeserializable proxy = (CachedDeserializable)value;
- if (proxy instanceof Chunk) {
- return new ChunkValueWrapper((Chunk) proxy);
++ if (proxy instanceof ObjectChunk) {
++ return new ChunkValueWrapper((ObjectChunk) proxy);
+ }
+ if (proxy instanceof StoredObject) {
+ StoredObject ohproxy = (StoredObject) proxy;
+ isSerializedObject = ohproxy.isSerialized();
+ if (isSerializedObject) {
+ bytes = ohproxy.getSerializedValue();
+ } else {
+ bytes = (byte[]) ohproxy.getDeserializedForReading();
+ }
+ } else {
+ bytes = proxy.getSerializedValue();
+ }
+ if (event != null && isSerializedObject) {
+ event.setCachedSerializedNewValue(bytes);
+ }
+ }
+ else if (value instanceof byte[]) {
+ isSerializedObject = false;
+ bytes = (byte[])value;
+ }
+ else {
+ Assert.assertTrue(!Token.isRemovedFromDisk(value));
+ if (event != null && event.getCachedSerializedNewValue() != null) {
+ bytes = event.getCachedSerializedNewValue();
+ } else {
+ bytes = EntryEventImpl.serialize(value);
+ if (bytes.length == 0) {
+ throw new IllegalStateException("serializing <" + value + "> produced empty byte array");
+ }
+ if (event != null) {
+ event.setCachedSerializedNewValue(bytes);
+ }
+ }
+ }
+ return new ByteArrayValueWrapper(isSerializedObject, bytes);
+ }
+ }
+ public static ValueWrapper createValueWrapperFromEntry(DiskEntry entry, LocalRegion region, EntryEventImpl event) {
+ if (event != null) {
+ // For off-heap it should be faster to pass a reference to the
+ // StoredObject instead of using the cached byte[] (unless it is also compressed).
+ // Since NIO is used if the chunk of memory is large we can write it
+ // to the file with using the off-heap memory with no extra copying.
+ // So we give preference to getRawNewValue over getCachedSerializedNewValue
+ Object rawValue = null;
+ if (!event.hasDelta()) {
+ // We don't do this for the delta case because getRawNewValue returns delta
+ // and we want to write the entire new value to disk.
+ rawValue = event.getRawNewValue();
- if (rawValue instanceof Chunk) {
- return new ChunkValueWrapper((Chunk) rawValue);
++ if (rawValue instanceof ObjectChunk) {
++ return new ChunkValueWrapper((ObjectChunk) rawValue);
+ }
+ }
+ if (event.getCachedSerializedNewValue() != null) {
+ return new ByteArrayValueWrapper(true, event.getCachedSerializedNewValue());
+ }
+ if (rawValue != null) {
+ return createValueWrapper(rawValue, event);
+ }
+ }
+ // TODO OFFHEAP: No need to retain since we hold the sync on entry but we need a flavor of _getValue that will decompress
+ @Retained Object value = entry._getValueRetain(region, true);
+ try {
+ return createValueWrapper(value, event);
+ } finally {
+ OffHeapHelper.release(value);
+ }
+ }
+
+ private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async) throws RegionClearedException {
+ writeToDisk(entry, region, async, null);
+ }
+
+ /**
+ * Writes the key/value object stored in the given entry to disk
+ * @throws RegionClearedException
+ *
+ * @see DiskRegion#put
+ */
+ private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async, EntryEventImpl event) throws RegionClearedException {
+ writeBytesToDisk(entry, region, async, createValueWrapperFromEntry(entry, region, event));
+ }
+
+ private static void writeBytesToDisk(DiskEntry entry, LocalRegion region, boolean async, ValueWrapper vw) throws RegionClearedException {
+ // @todo does the following unmark need to be called when an async
+ // write is scheduled or is it ok for doAsyncFlush to do it?
+ entry.getDiskId().unmarkForWriting();
+ region.getDiskRegion().put(entry, region, vw, async);
+ }
+
+ public static void update(DiskEntry entry, LocalRegion region, Object newValue) throws RegionClearedException {
+ update(entry, region, newValue, null);
+ }
+ /**
+ * Updates the value of the disk entry with a new value. This allows us to
+ * free up disk space in the non-backup case.
+ *
+ * @throws RegionClearedException
+ */
+ public static void update(DiskEntry entry, LocalRegion region, Object newValue, EntryEventImpl event) throws RegionClearedException {
+ DiskRegion dr = region.getDiskRegion();
+ if (newValue == null) {
+ throw new NullPointerException(LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString());
+ }
+
+ //If we have concurrency checks enabled for a persistent region, we need
+ //to add an entry to the async queue for every update to maintain the RVV
+ boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
+
+ Token oldValue = null;
+ int oldValueLength = 0;
+ boolean scheduleAsync = false;
+ boolean callRemoveFromDisk = false;
+ DiskId did = entry.getDiskId();
+ VersionTag tag = null;
+ Object syncObj = did;
+ if (syncObj == null) {
+ syncObj = entry;
+ }
+ if (syncObj == did) {
+ dr.acquireReadLock();
+ }
+ try {
+ synchronized (syncObj) {
+ oldValue = entry.getValueAsToken();
+ if (Token.isRemovedFromDisk(newValue)) {
+ if (dr.isBackup()) {
+ dr.testIsRecoveredAndClear(did); // fixes bug 41409
+ }
+ RuntimeException rte = null;
+ try {
+ if (!Token.isRemovedFromDisk(oldValue)) {
+ // removeFromDisk takes care of oldValueLength
+ if (dr.isSync()) {
+ removeFromDisk(entry, region, false);
+ } else {
+ callRemoveFromDisk = true; // do it outside the sync
+ }
+ }
+ } catch (RuntimeException e) {
+ rte = e;
+ throw e;
+ }
+ finally {
+ if (rte != null && (rte instanceof CacheClosedException)) {
+ // 47616: not to set the value to be removedFromDisk since it failed to persist
+ } else {
+ // Asif Ensure that the value is rightly set despite clear so
+ // that it can be distributed correctly
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already preparedForCache
+ }
+ }
+ }
+ else if (newValue instanceof RecoveredEntry) {
+ // Now that oplog creates are immediately put in cache
+ // a later oplog modify will get us here
+ RecoveredEntry re = (RecoveredEntry)newValue;
+ long oldKeyId = did.getKeyId();
+ long oldOplogId = did.getOplogId();
+ long newOplogId = re.getOplogId();
+ if (newOplogId != oldOplogId) {
+ did.setOplogId(newOplogId);
+ re.setOplogId(oldOplogId); // so caller knows oldoplog id
+ }
+ did.setOffsetInOplog(re.getOffsetInOplog());
+ // id already set
+ did.setUserBits(re.getUserBits());
+ oldValueLength = did.getValueLength();
+ did.setValueLength(re.getValueLength());
+ // The following undo and then do fixes bug 41849
+ // First, undo the stats done for the previous recovered value
+ if (oldKeyId < 0) {
+ dr.incNumOverflowOnDisk(-1L);
+ dr.incNumOverflowBytesOnDisk(-oldValueLength);
+ incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+ } else {
+ dr.incNumEntriesInVM(-1L);
+ incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
+ }
+ // Second, do the stats done for the current recovered value
+ if (re.getRecoveredKeyId() < 0) {
+ if (!entry.isValueNull()) {
+ try {
+ entry.handleValueOverflow(region);
+ entry.setValueWithContext(region, null); // fixes bug 41119
+ }finally {
+ entry.afterValueOverflow(region);
+ }
+
+ }
+ dr.incNumOverflowOnDisk(1L);
+ dr.incNumOverflowBytesOnDisk(did.getValueLength());
+ incrementBucketStats(region, 0/*InVM*/, 1/*OnDisk*/,
+ did.getValueLength());
+ } else {
+ entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(), false));
+ dr.incNumEntriesInVM(1L);
+ incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0);
+ }
+ }
+ else {
+ //The new value in the entry needs to be set after the disk writing
+ // has succeeded. If not , for GemFireXD , it is possible that other thread
+ // may pick this transient value from region entry ( which for
+ //offheap will eventually be released ) as index key,
+ //given that this operation is bound to fail in case of
+ //disk access exception.
+
+ //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+
+ if(did != null && did.isPendingAsync()) {
+ //if the entry was not yet written to disk, we didn't update
+ //the bytes on disk.
+ oldValueLength = 0;
+ } else {
+ oldValueLength = getValueLength(did);
+ }
+
+ if (dr.isBackup()) {
+ dr.testIsRecoveredAndClear(did); // fixes bug 41409
+ if (dr.isSync()) {
+ //In case of compression the value is being set first
+ // because atleast for now , GemFireXD does not support compression
+ // if and when it does support, this needs to be taken care of else
+ // we risk Bug 48965
+ if (AbstractRegionEntry.isCompressible(dr, newValue)) {
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+
+ // newValue is prepared and compressed. We can't write compressed values to disk.
+ writeToDisk(entry, region, false, event);
+ } else {
+ writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+ }
+
+ } else if (did.isPendingAsync() && !maintainRVV) {
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+
+ // nothing needs to be done except
+ // fixing up LRU stats
+ // @todo fixup LRU stats if needed
+ // I'm not sure anything needs to be done here.
+ // If we have overflow and it decided to evict this entry
+ // how do we handle that case when we are async?
+ // Seems like the eviction code needs to leave the value
+ // in memory until the pendingAsync is done.
+ } else {
+ //if the entry is not async, we need to schedule it
+ //for regions with concurrency checks enabled, we add an entry
+ //to the queue for every entry.
+ scheduleAsync = true;
+ did.setPendingAsync(true);
+ VersionStamp stamp = entry.getVersionStamp();
+ if(stamp != null) {
+ tag = stamp.asVersionTag();
+ }
+ entry.setValueWithContext(region, newValue);
+ }
+ } else if (did != null) {
+ entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+
+ // Mark the id as needing to be written
+ // The disk remove that this section used to do caused bug 30961
+ // @todo this seems wrong. How does leaving it on disk fix the bug?
+ did.markForWriting();
+ //did.setValueSerializedSize(0);
+ }else {
+ entry.setValueWithContext(region, newValue);
+ }
+
+ if (Token.isRemovedFromDisk(oldValue)) {
+ // Note we now initialize entries removed and then set their
+ // value once we find no existing entry.
+ // So this is the normal path for a brand new entry.
+ dr.incNumEntriesInVM(1L);
+ incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0);
+ }
+ }
+ if (entry instanceof LRUEntry) {
+ LRUEntry le = (LRUEntry)entry;
+ boolean wasEvicted = le.testEvicted();
+ le.unsetEvicted();
+ if (!Token.isRemovedFromDisk(newValue)) {
+ if (oldValue == null
+ // added null check for bug 41759
+ || wasEvicted && did != null && did.isPendingAsync()) {
+ // Note we do not append this entry because that will be
+ // done by lruEntryUpdate
+ dr.incNumEntriesInVM(1L);
+ dr.incNumOverflowOnDisk(-1L);
+ dr.incNumOverflowBytesOnDisk(-oldValueLength);
+ incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+ }
+ }
+ }
+ }
+ } finally {
+ if (syncObj == did) {
+ dr.releaseReadLock();
+ }
+ }
+ if (callRemoveFromDisk) {
+ removeFromDisk(entry, region, false, oldValue == null, false);
+ } else if (scheduleAsync && did.isPendingAsync()) {
+ // this needs to be done outside the above sync
+ scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
+ }
+ }
+
+ private static int getValueLength(DiskId did) {
+ int result = 0;
+ if (did != null) {
+ synchronized (did) {
+ result = did.getValueLength();
+ }
+ }
+ return result;
+ }
+
+ public static void updateRecoveredEntry(PlaceHolderDiskRegion drv,
+ DiskEntry entry,
+ RecoveredEntry newValue,RegionEntryContext context)
+ {
+ if (newValue == null) {
+ throw new NullPointerException(LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString());
+ }
+ DiskId did = entry.getDiskId();
+ synchronized (did) {
+ boolean oldValueWasNull = entry.isValueNull();
+ int oldValueLength = did.getValueLength();
+ // Now that oplog creates are immediately put in cache
+ // a later oplog modify will get us here
+ long oldOplogId = did.getOplogId();
+ long newOplogId = newValue.getOplogId();
+ if (newOplogId != oldOplogId) {
+ did.setOplogId(newOplogId);
+ newValue.setOplogId(oldOplogId); // so caller knows oldoplog id
+ }
+ did.setOffsetInOplog(newValue.getOffsetInOplog());
+ // id already set
+ did.setUserBits(newValue.getUserBits());
+ did.setValueLength(newValue.getValueLength());
+ if (newValue.getRecoveredKeyId() >= 0) {
+ entry.setValueWithContext(context, entry.prepareValueForCache(drv, newValue.getValue(),
+ false));
+ } else {
+ if (!oldValueWasNull) {
+ try {
+ entry.handleValueOverflow(context);
+ entry.setValueWithContext(context,null); // fixes bug 41119
+ }finally {
+ entry.afterValueOverflow(context);
+ }
+ }
+ }
+ if (entry instanceof LRUEntry) {
+ LRUEntry le = (LRUEntry)entry;
+ assert !le.testEvicted();
+ // we don't allow eviction during recovery
+ if (oldValueWasNull) {
+ // Note we do not append this entry because that will be
+ // done by lruEntryUpdate
+ drv.incNumEntriesInVM(1L);
+ drv.incNumOverflowOnDisk(-1L);
+ drv.incNumOverflowBytesOnDisk(-oldValueLength);
+ //No need to call incrementBucketStats here because we don't have
+ //a real bucket region, this is during recovery from disk.
+ }
+ }
+ }
+ }
+
+ public static Object getValueInVMOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) {
+ Object result = OffHeapHelper.copyAndReleaseIfNeeded(getValueOffHeapOrDiskWithoutFaultIn(entry, region));
+ if (result instanceof CachedDeserializable) {
+ result = ((CachedDeserializable)result).getDeserializedValue(null, null);
+ }
+ if (result instanceof StoredObject) {
+ ((StoredObject) result).release();
+ throw new IllegalStateException("sqlf tried to use getValueInVMOrDiskWithoutFaultIn");
+ }
+ return result;
+ }
+
+ @Retained
+ public static Object getValueOffHeapOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) {
+ @Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
+ if (v == null || Token.isRemovedFromDisk(v)
+ && !region.isIndexCreationThread()) {
+ synchronized (entry) {
+ v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
+ if (v == null) {
+ v = Helper.getOffHeapValueOnDiskOrBuffer(entry, region.getDiskRegion(),region);
+ }
+ }
+ }
+ if (Token.isRemovedFromDisk(v)) {
+ // fix for bug 31800
+ v = null;
+ // } else if (v instanceof ByteSource) {
+ // // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it
+ // Object deserVal = ((CachedDeserializable)v).getDeserializedForReading();
+ // if (deserVal != v) {
+ // OffHeapHelper.release(v);
+ // v = deserVal;
+ // }
+ }
+ return v;
+ }
+
+ /**
+ *
+ * @param entry
+ * @param region
+ * @return Value
+ * @throws DiskAccessException
+ */
+ public static Object faultInValue(DiskEntry entry, LocalRegion region) {
+ return faultInValue(entry, region, false);
+ }
+ @Retained
+ public static Object faultInValueRetain(DiskEntry entry, LocalRegion region) {
+ return faultInValue(entry, region, true);
+ }
+ /**
+ * @param retainResult if true then the result may be a retained off-heap reference
+ */
+ @Retained
+ private static Object faultInValue(DiskEntry entry, LocalRegion region, boolean retainResult)
+ {
+ DiskRegion dr = region.getDiskRegion();
+ @Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
+ boolean lruFaultedIn = false;
+ boolean done = false;
+ try {
+ //Asif: If the entry is instance of LRU then DidkRegion cannot be null.
+ //Since SqlFabric is accessing this method direcly & it passes the owning region,
+ //if the region happens to be persistent PR type, the owning region passed is PR,
+ // but it will have DiskRegion as null. SqlFabric takes care of passing owning region
+ // as BucketRegion in case of Overflow type entry. This is fix for Bug # 41804
+ if ( entry instanceof LRUEntry && !dr.isSync() ) {
+ synchronized (entry) {
+ DiskId did = entry.getDiskId();
+ if (did != null && did.isPendingAsync()) {
+ done = true;
+ // See if it is pending async because of a faultOut.
+ // If so then if we are not a backup then we can unschedule the pending async.
+ // In either case we need to do the lruFaultIn logic.
+ boolean evicted = ((LRUEntry)entry).testEvicted();
+ if (evicted) {
+ if (!dr.isBackup()) {
+ // @todo do we also need a bit that tells us if it is in the async queue?
+ // Seems like we could end up adding it to the queue multiple times.
+ did.setPendingAsync(false);
+ }
+ // since it was evicted fix the stats here
+ dr.incNumEntriesInVM(1L);
+ dr.incNumOverflowOnDisk(-1L);
+ // no need to dec overflowBytesOnDisk because it was not inced in this case.
+ incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, 0);
+ }
+ lruEntryFaultIn((LRUEntry) entry, region);
+ lruFaultedIn = true;
+ }
+ }
+ }
+ if (!done
+ && (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread())) {
+ synchronized (entry) {
+ v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
+ if (v == null) {
+ v = readValueFromDisk(entry, region);
+ if (entry instanceof LRUEntry) {
+ if (v != null && !Token.isInvalid(v)) {
+ lruEntryFaultIn((LRUEntry) entry, region);
+
+ lruFaultedIn = true;
+ }
+ }
+ }
+ }
+ }
+ } finally {
+ if (!retainResult) {
+ v = OffHeapHelper.copyAndReleaseIfNeeded(v);
+ // At this point v should be either a heap object
+ }
+ }
+ if (Token.isRemoved(v)) {
+ // fix for bug 31800
+ v = null;
+ } else {
+ ((RegionEntry)entry).setRecentlyUsed();
+ }
+ if (lruFaultedIn) {
+ lruUpdateCallback(region);
+ }
+ return v; // OFFHEAP: the value ends up being returned by RegionEntry.getValue
+ }
+
+ public static void recoverValue(DiskEntry entry, long oplogId, DiskRecoveryStore recoveryStore, ByteArrayDataInput in) {
+ boolean lruFaultedIn = false;
+ synchronized (entry) {
+ if (entry.isValueNull()) {
+ DiskId did = entry.getDiskId();
+ if (did != null) {
+ Object value = null;
+ DiskRecoveryStore region = recoveryStore;
+ DiskRegionView dr = region.getDiskRegionView();
+ dr.acquireReadLock();
+ try {
+ synchronized (did) {
+ // don't read if the oplog has changed.
+ if (oplogId == did.getOplogId()) {
+ value = getValueFromDisk(dr, did, in);
+ if (value != null) {
+ setValueOnFaultIn(value, did, entry, dr, region);
+ }
+ }
+ }
+ } finally {
+ dr.releaseReadLock();
+ }
+ if (entry instanceof LRUEntry) {
+ if (value != null && !Token.isInvalid(value)) {
+ lruEntryFaultIn((LRUEntry) entry, recoveryStore);
+ lruFaultedIn = true;
+ }
+ }
+ }
+ }
+ }
+ if (lruFaultedIn) {
+ lruUpdateCallback(recoveryStore);
+ }
+ }
+
+ /**
+ * Caller must have "did" synced.
+ */
+ private static Object getValueFromDisk(DiskRegionView dr, DiskId did, ByteArrayDataInput in) {
+ Object value;
+ if (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID) {
+ // must have been destroyed
+ value = null;
+ } else {
+ if (did.isKeyIdNegative()) {
+ did.setKeyId(- did.getKeyId());
+ }
+ // if a bucket region then create a CachedDeserializable here instead of object
+ value = dr.getRaw(did); // fix bug 40192
+ if (value instanceof BytesAndBits) {
+ BytesAndBits bb = (BytesAndBits)value;
+ if (EntryBits.isInvalid(bb.getBits())) {
+ value = Token.INVALID;
+ } else if (EntryBits.isLocalInvalid(bb.getBits())) {
+ value = Token.LOCAL_INVALID;
+ } else if (EntryBits.isTombstone(bb.getBits())) {
+ value = Token.TOMBSTONE;
+ } else if (EntryBits.isSerialized(bb.getBits())) {
+ value = readSerializedValue(bb.getBytes(), bb.getVersion(), in, false);
+ } else {
+ value = readRawValue(bb.getBytes(), bb.getVersion(), in);
+ }
+ }
+ }
+ return value;
+ }
+
+ private static void lruUpdateCallback(DiskRecoveryStore recoveryStore) {
+ /*
+ * Used conditional check to see if
+ * if its a LIFO Enabled,
+ * yes then disable lruUpdateCallback()
+ * and called updateStats()
+ * its keep track of actual entries
+ * present in memory - useful when
+ * checking capacity constraint
+ */
+ try {
+ if (recoveryStore.getEvictionAttributes() != null
+ && recoveryStore.getEvictionAttributes().getAlgorithm().isLIFO()) {
+ ((VMLRURegionMap) recoveryStore.getRegionMap()).updateStats();
+ return;
+ }
+ // this must be done after releasing synchronization
+ recoveryStore.getRegionMap().lruUpdateCallback();
+ }catch( DiskAccessException dae) {
+ recoveryStore.handleDiskAccessException(dae);
+ throw dae;
+ }
+ }
+
+ private static void lruEntryFaultIn(LRUEntry entry, DiskRecoveryStore recoveryStore) {
+ RegionMap rm = (RegionMap)recoveryStore.getRegionMap();
+ try {
+ rm.lruEntryFaultIn((LRUEntry) entry);
+ }catch(DiskAccessException dae) {
+ recoveryStore.handleDiskAccessException(dae);
+ throw dae;
+ }
+ }
+
+ /**
+ * Returns the value of this map entry, reading it from disk, if necessary.
+ * Sets the value in the entry.
+ * This is only called by the faultIn code once it has determined that
+ * the value is no longer in memory.
+ * return the result will only be off-heap if the value is a sqlf ByteSource. Otherwise result will be on-heap.
+ * Caller must have "entry" synced.
+ */
+ @Retained
+ private static Object readValueFromDisk(DiskEntry entry, DiskRecoveryStore region) {
+
+ DiskRegionView dr = region.getDiskRegionView();
+ DiskId did = entry.getDiskId();
+ if (did == null) {
+ return null;
+ }
+ dr.acquireReadLock();
+ try {
+ synchronized (did) {
+ Object value = getValueFromDisk(dr, did, null);
+ if (value == null) return null;
+ @Unretained Object preparedValue = setValueOnFaultIn(value, did, entry, dr, region);
+ // For Sqlfire we want to return the offheap representation.
+ // So we need to retain it for the caller to release.
+ /*if (preparedValue instanceof ByteSource) {
+ // This is the only case in which we return a retained off-heap ref.
+ ((ByteSource)preparedValue).retain();
+ return preparedValue;
+ } else */{
+ return value;
+ }
+ }
+ } finally {
+ dr.releaseReadLock();
+ }
+ }
+
+ /**
+ * Caller must have "entry" and "did" synced and "dr" readLocked.
+ * @return the unretained result must be used by the caller before it releases the sync on "entry".
+ */
+ @Unretained
+ private static Object setValueOnFaultIn(Object value, DiskId did, DiskEntry entry, DiskRegionView dr, DiskRecoveryStore region) {
+ // dr.getOwner().getCache().getLogger().info("DEBUG: faulting in entry with key " + entry.getKey());
+ int bytesOnDisk = getValueLength(did);
+ // Retained by the prepareValueForCache call for the region entry.
+ // NOTE that we return this value unretained because the retain is owned by the region entry not the caller.
+ @Retained Object preparedValue = entry.prepareValueForCache((RegionEntryContext) region, value,
+ false);
+ region.updateSizeOnFaultIn(entry.getKey(), region.calculateValueSize(preparedValue), bytesOnDisk);
+ //did.setValueSerializedSize(0);
+ // I think the following assertion is true but need to run
+ // a regression with it. Reenable this post 6.5
+ //Assert.assertTrue(entry._getValue() == null);
+ entry.setValueWithContext((RegionEntryContext) region, preparedValue);
+ dr.incNumEntriesInVM(1L);
+ dr.incNumOverflowOnDisk(-1L);
+ dr.incNumOverflowBytesOnDisk(-bytesOnDisk);
+ incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
+ return preparedValue;
+ }
+
+ static Object readSerializedValue(byte[] valueBytes, Version version,
+ ByteArrayDataInput in, boolean forceDeserialize) {
+ if (forceDeserialize) {
+ // deserialize checking for product version change
+ return EntryEventImpl.deserialize(valueBytes, version, in);
+ }
+ else {
+ // TODO: upgrades: is there a case where GemFire values are internal
+ // ones that need to be upgraded transparently; probably messages
+ // being persisted (gateway events?)
+ return CachedDeserializableFactory.create(valueBytes);
+ }
+ }
+
+ static Object readRawValue(byte[] valueBytes, Version version,
+ ByteArrayDataInput in) {
+ /*
+ final StaticSystemCallbacks sysCb;
+ if (version != null && (sysCb = GemFireCacheImpl.FactoryStatics
+ .systemCallbacks) != null) {
+ // may need to change serialized shape for SQLFire
+ return sysCb.fromVersion(valueBytes, false, version, in);
+ }
+ else */ {
+ return valueBytes;
+ }
+ }
+
+ public static void incrementBucketStats(Object owner,
+ int entriesInVmDelta,
+ int overflowOnDiskDelta,
+ int overflowBytesOnDiskDelta) {
+ if (owner instanceof BucketRegion) {
+ ((BucketRegion)owner).incNumEntriesInVM(entriesInVmDelta);
+ ((BucketRegion)owner).incNumOverflowOnDisk(overflowOnDiskDelta);
+ ((BucketRegion)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
+ } else if (owner instanceof DiskRegionView) {
+ ((DiskRegionView)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
+ }
+ }
+
+ /**
+ * Writes the value of this <code>DiskEntry</code> to disk and
+ * <code>null</code> s out the reference to the value to free up VM space.
+ * <p>
+ * Note that if the value had already been written to disk, it is not
+ * written again.
+ * <p>
+ * Caller must synchronize on entry and it is assumed the entry is evicted
+ *
+ * see #writeToDisk
+ * @throws RegionClearedException
+ */
+ public static int overflowToDisk(DiskEntry entry, LocalRegion region, EnableLRU ccHelper) throws RegionClearedException {
+ {
+ Token entryVal = entry.getValueAsToken();
+ if (entryVal == null || Token.isRemovedFromDisk(entryVal)) {
+ // Note it could be removed token now because
+ // freeAllEntriesOnDisk is not able to sync on entry
+ return 0;
+ }
+ }
+ DiskRegion dr = region.getDiskRegion();
+ final int oldSize = region.calculateRegionEntryValueSize(entry);;
+ //Asif:Get diskID . If it is null, it implies it is
+ // overflow only mode.
+ //long id = entry.getDiskId().getKeyId();
+ DiskId did = entry.getDiskId();
+ if (did == null) {
+ ((LRUEntry)entry).setDelayedDiskId(region);
+ did = entry.getDiskId();
+ }
+
+ // Notify the SQLFire IndexManager if present
+ /* final IndexUpdater indexUpdater = region.getIndexUpdater();
+ if(indexUpdater != null && dr.isSync()) {
+ indexUpdater.onOverflowToDisk(entry);
+ }*/
+
+ int change = 0;
+ boolean scheduledAsyncHere = false;
+ dr.acquireReadLock();
+ try {
+ synchronized (did) {
+ // check for a concurrent freeAllEntriesOnDisk
+ if (entry.isRemovedFromDisk()) {
+ return 0;
+ }
+
+ //TODO:Asif: Check if we need to overflow even when id is = 0
+ boolean wasAlreadyPendingAsync = did.isPendingAsync();
+ if (did.needsToBeWritten()) {
+ if (dr.isSync()) {
+ writeToDisk(entry, region, false);
+ } else if (!wasAlreadyPendingAsync) {
+ scheduledAsyncHere = true;
+ did.setPendingAsync(true);
+ } else {
+ // it may have been scheduled to be written (isBackup==true)
+ // and now we are faulting it out
+ }
+ }
+
+ boolean movedValueToDisk = false; // added for bug 41849
+
+ // If async then if it does not need to be written (because it already was)
+ // then treat it like the sync case. This fixes bug 41310
+ if (scheduledAsyncHere || wasAlreadyPendingAsync) {
+ // we call _setValue(null) after it is actually written to disk
+ change = entry.updateAsyncEntrySize(ccHelper);
+ // do the stats when it is actually written to disk
+ } else {
+ region.updateSizeOnEvict(entry.getKey(), oldSize);
+ //did.setValueSerializedSize(byteSizeOnDisk);
+ try {
+ entry.handleValueOverflow(region);
+ entry.setValueWithContext(region,null);
+ }finally {
+ entry.afterValueOverflow(region);
+ }
+ movedValueToDisk = true;
+ change = ((LRUClockNode)entry).updateEntrySize(ccHelper);
+ }
+ int valueLength = 0;
+ if (movedValueToDisk) {
+ valueLength = getValueLength(did);
+ }
+ dr.incNumEntriesInVM(-1L);
+ dr.incNumOverflowOnDisk(1L);
+ dr.incNumOverflowBytesOnDisk(valueLength);
+ incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/, valueLength);
+ }
+ } finally {
+ dr.releaseReadLock();
+ }
+ if (scheduledAsyncHere && did.isPendingAsync()) {
+ // this needs to be done outside the above sync
+ // the version tag is null here because this method only needs
+ // to write to disk for overflow only regions, which do not need
+ // to maintain an RVV on disk.
+ scheduleAsyncWrite(new AsyncDiskEntry(region, entry, null));
+ }
+ return change;
+ }
+
+ private static void scheduleAsyncWrite(AsyncDiskEntry ade) {
+ DiskRegion dr = ade.region.getDiskRegion();
+ dr.scheduleAsyncWrite(ade);
+ }
+
+
+ public static void handleFullAsyncQueue(DiskEntry entry, LocalRegion region, VersionTag tag) {
+ DiskRegion dr = region.getDiskRegion();
+ DiskId did = entry.getDiskId();
+ synchronized (entry) {
+ dr.acquireReadLock();
+ try {
+ synchronized (did) {
+ if (did.isPendingAsync()) {
+ did.setPendingAsync(false);
+ final Token entryVal = entry.getValueAsToken();
+ final int entryValSize = region.calculateRegionEntryValueSize(entry);
+ boolean remove = false;
+ try {
+ if (Token.isRemovedFromDisk(entryVal)) {
+ // onDisk was already deced so just do the valueLength here
+ dr.incNumOverflowBytesOnDisk(-did.getValueLength());
+ incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
+ -did.getValueLength());
+ dr.remove(region, entry, true, false);
+ if (dr.isBackup()) {
+ did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
+ }
+ remove = true;
+ } else if (Token.isInvalid(entryVal) && !dr.isBackup()) {
+ // no need to write invalid to disk if overflow only
+ } else if (entryVal != null) {
+ writeToDisk(entry, region, true);
+ } else {
+ //if we have a version tag we need to record the operation
+ //to update the RVV
+ if(tag != null) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
+ }
+ return;
+ }
+ assert !dr.isSync();
+ // Only setValue to null if this was an evict.
+ // We could just be a backup that is writing async.
+ if (!remove
+ && !Token.isInvalid(entryVal)
+ && entry instanceof LRUEntry
+ && ((LRUEntry)entry).testEvicted()) {
+ // Moved this here to fix bug 40116.
+ region.updateSizeOnEvict(entry.getKey(), entryValSize);
+ // note the old size was already accounted for
+ // onDisk was already inced so just do the valueLength here
+ dr.incNumOverflowBytesOnDisk(did.getValueLength());
+ incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
+ did.getValueLength());
+ try {
+ entry.handleValueOverflow(region);
+ entry.setValueWithContext(region,null);
+ }finally {
+ entry.afterValueOverflow(region);
+ }
+ }
+
+ //See if we the entry we wrote to disk has the same tag
+ //as this entry. If not, write the tag as a conflicting operation.
+ //to update the RVV.
+ VersionStamp stamp = entry.getVersionStamp();
+ if(tag != null && stamp != null
+ && (stamp.getMemberID() != tag.getMemberID()
+ || stamp.getRegionVersion() != tag.getRegionVersion())) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
+ }
+ } catch (RegionClearedException ignore) {
+ // no need to do the op since it was clobbered by a region clear
+ }
+ } else {
+ //if we have a version tag we need to record the operation
+ //to update the RVV, even if we don't write the entry
+ if(tag != null) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
+ }
+ }
+ }
+ } finally {
+ dr.releaseReadLock();
+ }
+ } // sync entry
+ }
+
+ public static void doAsyncFlush(VersionTag tag, LocalRegion region) {
+ if (region.isThisRegionBeingClosedOrDestroyed()) return;
+ DiskRegion dr = region.getDiskRegion();
+ if (!dr.isBackup()) {
+ return;
+ }
+ assert !dr.isSync();
+ dr.acquireReadLock();
+ try {
+ dr.getDiskStore().putVersionTagOnly(region, tag, true);
+ } finally {
+ dr.releaseReadLock();
+ }
+ }
+
+ /**
+ * Flush an entry that was previously scheduled to be written to disk.
+ * @param tag
+ * @since prPersistSprint1
+ */
+ public static void doAsyncFlush(DiskEntry entry, LocalRegion region, VersionTag tag) {
+ if (region.isThisRegionBeingClosedOrDestroyed()) return;
+ DiskRegion dr = region.getDiskRegion();
+ dr.setClearCountReference();
+ synchronized (entry) { // fixes 40116
+ // If I don't sync the entry and this method ends up doing an eviction
+ // thus setting value to null
+ // some other thread is free to fetch the value while the entry is synced
+ // and think it has removed it or replaced it. This results in updateSizeOn*
+ // being called twice for the same value (once when it is evicted and once
+ // when it is removed/updated).
+ try {
+ dr.acquireReadLock();
+ try {
+ DiskId did = entry.getDiskId();
+ synchronized (did) {
+ if (did.isPendingAsync()) {
+ did.setPendingAsync(false);
+ final Token entryVal = entry.getValueAsToken();
+ final int entryValSize = region.calculateRegionEntryValueSize(entry);
+ boolean remove = false;
+ try {
+ if (Token.isRemovedFromDisk(entryVal)) {
+ if (region.isThisRegionBeingClosedOrDestroyed()) return;
+ // onDisk was already deced so just do the valueLength here
+ dr.incNumOverflowBytesOnDisk(-did.getValueLength());
+ incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
+ -did.getValueLength());
+ dr.remove(region, entry, true, false);
+ if (dr.isBackup()) {
+ did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
+ }
+ remove = true;
+ } else if ((Token.isInvalid(entryVal) || entryVal == Token.TOMBSTONE) && !dr.isBackup()) {
+ // no need to write invalid or tombstones to disk if overflow only
+ } else if (entryVal != null) {
+ writeToDisk(entry, region, true);
+ } else {
+ // @todo why would we have a null value here?
+ // I'm seeing it show up in tests:
+ // java.lang.IllegalArgumentException: Must not serialize null in this context.
+ // at com.gemstone.gemfire.internal.cache.EntryEventImpl.serialize(EntryEventImpl.java:1024)
+ // at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.writeToDisk(DiskEntry.java:351)
+ // at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.doAsyncFlush(DiskEntry.java:683)
+ // at com.gemstone.gemfire.internal.cache.DiskRegion$FlusherThread.run(DiskRegion.java:1055)
+ //if we have a version tag we need to record the operation
+ //to update the RVV
+ if(tag != null) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
+ }
+ return;
+ }
+ assert !dr.isSync();
+ // Only setValue to null if this was an evict.
+ // We could just be a backup that is writing async.
+ if (!remove
+ && !Token.isInvalid(entryVal)
+ && (entryVal != Token.TOMBSTONE)
+ && entry instanceof LRUEntry
+ && ((LRUEntry)entry).testEvicted()) {
+ // Moved this here to fix bug 40116.
+ region.updateSizeOnEvict(entry.getKey(), entryValSize);
+ // note the old size was already accounted for
+ // onDisk was already inced so just do the valueLength here
+ dr.incNumOverflowBytesOnDisk(did.getValueLength());
+ incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
+ did.getValueLength());
+ try {
+ entry.handleValueOverflow(region);
+ entry.setValueWithContext(region,null);
+ }finally {
+ entry.afterValueOverflow(region);
+ }
+ }
+ } catch (RegionClearedException ignore) {
+ // no need to do the op since it was clobbered by a region clear
+ }
+
+ //See if we the entry we wrote to disk has the same tag
+ //as this entry. If not, write the tag as a conflicting operation.
+ //to update the RVV.
+ VersionStamp stamp = entry.getVersionStamp();
+ if(tag != null && stamp != null
+ && (stamp.getMemberID() != tag.getMemberID()
+ || stamp.getRegionVersion() != tag.getRegionVersion())) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
+ }
+ } else {
+ //if we have a version tag we need to record the operation
+ //to update the RVV
+ if(tag != null) {
+ DiskEntry.Helper.doAsyncFlush(tag, region);
+ }
+ }
+ }
+ } finally {
+ dr.releaseReadLock();
+ }
+ } finally {
+ dr.removeClearCountReference();
+ }
+ } // sync entry
+ }
+
+ /**
+ * Removes the key/value pair in the given entry from disk
+ *
+ * @throws RegionClearedException If the operation is aborted due to a clear
+ * @see DiskRegion#remove
+ */
+ public static void removeFromDisk(DiskEntry entry, LocalRegion region, boolean isClear) throws RegionClearedException {
+ removeFromDisk(entry, region, true, false, isClear);
+ }
+ private static void removeFromDisk(DiskEntry entry, LocalRegion region,
+ boolean checkValue, boolean valueWasNull, boolean isClear) throws RegionClearedException {
+ DiskRegion dr = region.getDiskRegion();
+
+ //If we have concurrency checks enabled for a persistent region, we need
+ //to add an entry to the async queue for every update to maintain the RVV
+ boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
+
+ DiskId did = entry.getDiskId();
+ VersionTag tag = null;
+ Object syncObj = did;
+ if (did == null) {
+ syncObj = entry;
+ }
+ boolean scheduledAsyncHere = false;
+ if (syncObj == did) {
+ dr.acquireReadLock();
+ }
+ try {
+ synchronized (syncObj) {
+
+ if (did == null || (dr.isBackup() && did.getKeyId()== DiskRegion.INVALID_ID)) {
+ // Not on disk yet
+ dr.incNumEntriesInVM(-1L);
+ incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
+ dr.unscheduleAsyncWrite(did);
+ return;
+ }
+ //Asif: This will convert the -ve OplogKeyId to positive as part of fixing
+ //Bug # 39989
+ did.unmarkForWriting();
+
+ //System.out.println("DEBUG: removeFromDisk doing remove(" + id + ")");
+ int oldValueLength = 0;
+ if (dr.isSync() || isClear) {
+ oldValueLength = did.getValueLength();
+ dr.remove(region, entry, false, isClear);
+ if (dr.isBackup()) {
+ did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
+ }
+ //If this is a clear, we should unschedule the async write for this
+ //entry
+ did.setPendingAsync(false);
+ } else {
+ if (!did.isPendingAsync() || maintainRVV) {
+ scheduledAsyncHere = true;
+ did.setPendingAsync(true);
+ VersionStamp stamp = entry.getVersionStamp();
+ if(stamp != null) {
+ tag = stamp.asVersionTag();
+ }
+ }
+ }
+ if (checkValue) {
+ valueWasNull = entry.isValueNull();
+ entry._removePhase1();
+ }
+ if (valueWasNull) {
+ dr.incNumOverflowOnDisk(-1L);
+ dr.incNumOverflowBytesOnDisk(-oldValueLength);
+ incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+ }
+ else {
+ dr.incNumEntriesInVM(-1L);
+ incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
+ if (!dr.isSync()) {
+ // we are going to do an async remove of an entry that is not currently
+ // overflowed to disk so we don't want to count its value length as being
+ // on disk when we finally do the async op. So we clear it here.
+ did.setValueLength(0);
+ }
+ }
+ }
+ } finally {
+ if (syncObj == did) {
+ dr.releaseReadLock();
+ }
+ }
+ if (scheduledAsyncHere && did.isPendingAsync()) {
+ // do this outside the sync
+ scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
+ }
+ }
+
+ /**
+ * @param entry
+ * @param region
+ * @param tag
+ */
+ public static void updateVersionOnly(DiskEntry entry, LocalRegion region,
+ VersionTag tag) {
+ DiskRegion dr = region.getDiskRegion();
+ if (!dr.isBackup()) {
+ return;
+ }
+
+ assert tag != null && tag.getMemberID()!=null;
+ boolean scheduleAsync = false;
+ DiskId did = entry.getDiskId();
+ Object syncObj = did;
+ if (syncObj == null) {
+ syncObj = entry;
+ }
+ if (syncObj == did) {
+ dr.acquireReadLock();
+ }
+ try {
+ synchronized (syncObj) {
+ if (dr.isSync()) {
+ dr.getDiskStore().putVersionTagOnly(region, tag, false);
+ } else {
+ scheduleAsync = true;
+ }
+ }
+ } finally {
+ if (syncObj == did) {
+ dr.releaseReadLock();
+ }
+ }
+ if (scheduleAsync) {
+ // this needs to be done outside the above sync
+ scheduleAsyncWrite(new AsyncDiskEntry(region, tag));
+ }
+ }
+
+ }
+
+ /**
+ * A marker object for an entry that has been recovered from disk.
+ * It is handled specially when it is placed in a region.
+ */
+ public static class RecoveredEntry {
+
+ /** The disk id of the entry being recovered */
+ private final long recoveredKeyId;
+
+ /** The value of the recovered entry */
+ private final Object value;
+
+ private final long offsetInOplog;
+ private final byte userBits;
+ private final int valueLength;
+
+ private long oplogId;
+ private VersionTag tag;
+
+ /**
+ * Only for this constructor, the value is not loaded into the region & it is lying
+ * on the oplogs. Since Oplogs rely on DiskId to furnish user bits so as to correctly
+ * interpret bytes, the userbit needs to be set correctly here.
+ */
+ public RecoveredEntry(long keyId, long oplogId, long offsetInOplog,
+ byte userBits, int valueLength) {
+ this(-keyId, oplogId, offsetInOplog, userBits, valueLength, null);
+ }
+
+ public RecoveredEntry(long keyId, long oplogId, long offsetInOplog,
+ byte userBits, int valueLength, Object value) {
+ this.recoveredKeyId = keyId;
+ this.value = value;
+ this.oplogId = oplogId;
+ this.offsetInOplog = offsetInOplog;
+ this.userBits = EntryBits.setRecoveredFromDisk(userBits, true);
+ this.valueLength = valueLength;
+ }
+
+ /**
+ * Returns the disk id of the entry being recovered
+ */
+ public long getRecoveredKeyId() {
+ return this.recoveredKeyId;
+ }
+ /**
+ * Returns the value of the recovered entry. Note that if the
+ * disk id is < 0 then the value has not been faulted in and
+ * this method will return null.
+ */
+ public Object getValue() {
+ return this.value;
+ }
+ /**
+ *
+ * @return byte indicating the user bits. The correct value is returned only in the specific case of
+ * entry recovered from oplog ( & not rolled to Htree) & the RECOVER_VALUES flag is false . In other cases
+ * the exact value is not needed
+ */
+ public byte getUserBits() {
+ return this.userBits;
+ }
+ public int getValueLength() {
+ return this.valueLength;
+ }
+ public long getOffsetInOplog() {
+ return offsetInOplog;
+ }
+ public long getOplogId() {
+ return this.oplogId;
+ }
+
+ public void setOplogId(long v) {
+ this.oplogId = v;
+ }
+ public VersionTag getVersionTag() {
+ return this.tag;
+ }
+ public void setVersionTag(VersionTag tag) {
+ this.tag = tag;
+ }
+ }
+ }