You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/15 19:38:45 UTC
[07/45] incubator-geode git commit: GEODE-982: refactor off-heap
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
index 14bde59..209a4a4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SimpleMemoryAllocatorImpl.java
@@ -90,17 +90,17 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
public static MemoryAllocator create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw,
int slabCount, long offHeapMemorySize, long maxSlabSize) {
return create(ooohml, stats, lw, slabCount, offHeapMemorySize, maxSlabSize,
- null, new AddressableMemoryChunkFactory() {
+ null, new SlabFactory() {
@Override
- public AddressableMemoryChunk create(int size) {
- return new UnsafeMemoryChunk(size);
+ public Slab create(int size) {
+ return new SlabImpl(size);
}
});
}
private static SimpleMemoryAllocatorImpl create(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw,
int slabCount, long offHeapMemorySize, long maxSlabSize,
- AddressableMemoryChunk[] slabs, AddressableMemoryChunkFactory memChunkFactory) {
+ Slab[] slabs, SlabFactory slabFactory) {
SimpleMemoryAllocatorImpl result = singleton;
boolean created = false;
try {
@@ -118,16 +118,16 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
if (lw != null) {
lw.config("Allocating " + offHeapMemorySize + " bytes of off-heap memory. The maximum size of a single off-heap object is " + maxSlabSize + " bytes.");
}
- slabs = new UnsafeMemoryChunk[slabCount];
+ slabs = new SlabImpl[slabCount];
long uncreatedMemory = offHeapMemorySize;
for (int i=0; i < slabCount; i++) {
try {
if (uncreatedMemory >= maxSlabSize) {
- slabs[i] = memChunkFactory.create((int) maxSlabSize);
+ slabs[i] = slabFactory.create((int) maxSlabSize);
uncreatedMemory -= maxSlabSize;
} else {
// the last slab can be smaller then maxSlabSize
- slabs[i] = memChunkFactory.create((int) uncreatedMemory);
+ slabs[i] = slabFactory.create((int) uncreatedMemory);
}
} catch (OutOfMemoryError err) {
if (i > 0) {
@@ -137,7 +137,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
}
for (int j=0; j < i; j++) {
if (slabs[j] != null) {
- slabs[j].release();
+ slabs[j].free();
}
}
throw err;
@@ -163,11 +163,11 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
return result;
}
static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener ooohml, OffHeapMemoryStats stats, LogWriter lw,
- int slabCount, long offHeapMemorySize, long maxSlabSize, AddressableMemoryChunkFactory memChunkFactory) {
+ int slabCount, long offHeapMemorySize, long maxSlabSize, SlabFactory memChunkFactory) {
return create(ooohml, stats, lw, slabCount, offHeapMemorySize, maxSlabSize,
null, memChunkFactory);
}
- public static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, AddressableMemoryChunk[] slabs) {
+ public static SimpleMemoryAllocatorImpl createForUnitTest(OutOfOffHeapMemoryListener oooml, OffHeapMemoryStats stats, Slab[] slabs) {
int slabCount = 0;
long offHeapMemorySize = 0;
long maxSlabSize = 0;
@@ -185,7 +185,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
}
- private void reuse(OutOfOffHeapMemoryListener oooml, LogWriter lw, OffHeapMemoryStats newStats, long offHeapMemorySize, AddressableMemoryChunk[] slabs) {
+ private void reuse(OutOfOffHeapMemoryListener oooml, LogWriter lw, OffHeapMemoryStats newStats, long offHeapMemorySize, Slab[] slabs) {
if (isClosed()) {
throw new IllegalStateException("Can not reuse a closed off-heap memory manager.");
}
@@ -205,7 +205,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
this.stats = newStats;
}
- private SimpleMemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, final OffHeapMemoryStats stats, final AddressableMemoryChunk[] slabs) {
+ private SimpleMemoryAllocatorImpl(final OutOfOffHeapMemoryListener oooml, final OffHeapMemoryStats stats, final Slab[] slabs) {
if (oooml == null) {
throw new IllegalArgumentException("OutOfOffHeapMemoryListener is null");
}
@@ -224,20 +224,20 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
this.stats.incFreeMemory(this.freeList.getTotalMemory());
}
- public List<ObjectChunk> getLostChunks() {
- List<ObjectChunk> liveChunks = this.freeList.getLiveChunks();
- List<ObjectChunk> regionChunks = getRegionLiveChunks();
- Set<ObjectChunk> liveChunksSet = new HashSet<>(liveChunks);
- Set<ObjectChunk> regionChunksSet = new HashSet<>(regionChunks);
+ public List<OffHeapStoredObject> getLostChunks() {
+ List<OffHeapStoredObject> liveChunks = this.freeList.getLiveChunks();
+ List<OffHeapStoredObject> regionChunks = getRegionLiveChunks();
+ Set<OffHeapStoredObject> liveChunksSet = new HashSet<>(liveChunks);
+ Set<OffHeapStoredObject> regionChunksSet = new HashSet<>(regionChunks);
liveChunksSet.removeAll(regionChunksSet);
- return new ArrayList<ObjectChunk>(liveChunksSet);
+ return new ArrayList<OffHeapStoredObject>(liveChunksSet);
}
/**
* Returns a possibly empty list that contains all the Chunks used by regions.
*/
- private List<ObjectChunk> getRegionLiveChunks() {
- ArrayList<ObjectChunk> result = new ArrayList<ObjectChunk>();
+ private List<OffHeapStoredObject> getRegionLiveChunks() {
+ ArrayList<OffHeapStoredObject> result = new ArrayList<OffHeapStoredObject>();
RegionService gfc = GemFireCacheImpl.getInstance();
if (gfc != null) {
Iterator<Region<?,?>> rootIt = gfc.rootRegions().iterator();
@@ -253,7 +253,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
return result;
}
- private void getRegionLiveChunks(Region<?,?> r, List<ObjectChunk> result) {
+ private void getRegionLiveChunks(Region<?,?> r, List<OffHeapStoredObject> result) {
if (r.getAttributes().getOffHeap()) {
if (r instanceof PartitionedRegion) {
@@ -277,7 +277,7 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
}
- private void basicGetRegionLiveChunks(LocalRegion r, List<ObjectChunk> result) {
+ private void basicGetRegionLiveChunks(LocalRegion r, List<OffHeapStoredObject> result) {
for (Object key : r.keySet()) {
RegionEntry re = ((LocalRegion) r).getRegionEntry(key);
if (re != null) {
@@ -286,30 +286,30 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
*/
@Unretained(OffHeapIdentifier.GATEWAY_SENDER_EVENT_IMPL_VALUE)
Object value = re._getValue();
- if (value instanceof ObjectChunk) {
- result.add((ObjectChunk) value);
+ if (value instanceof OffHeapStoredObject) {
+ result.add((OffHeapStoredObject) value);
}
}
}
}
- private ObjectChunk allocateChunk(int size) {
- ObjectChunk result = this.freeList.allocate(size);
+ private OffHeapStoredObject allocateOffHeapStoredObject(int size) {
+ OffHeapStoredObject result = this.freeList.allocate(size);
int resultSize = result.getSize();
stats.incObjects(1);
stats.incUsedMemory(resultSize);
stats.incFreeMemory(-resultSize);
notifyListeners();
if (ReferenceCountHelper.trackReferenceCounts()) {
- ReferenceCountHelper.refCountChanged(result.getMemoryAddress(), false, 1);
+ ReferenceCountHelper.refCountChanged(result.getAddress(), false, 1);
}
return result;
}
@Override
- public MemoryChunk allocate(int size) {
+ public StoredObject allocate(int size) {
//System.out.println("allocating " + size);
- ObjectChunk result = allocateChunk(size);
+ OffHeapStoredObject result = allocateOffHeapStoredObject(size);
//("allocated off heap object of size " + size + " @" + Long.toHexString(result.getMemoryAddress()), true);
return result;
}
@@ -324,16 +324,23 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
@Override
public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed) {
+ return allocateAndInitialize(v, isSerialized, isCompressed, null);
+ }
+ @Override
+ public StoredObject allocateAndInitialize(byte[] v, boolean isSerialized, boolean isCompressed, byte[] originalHeapData) {
long addr = OffHeapRegionEntryHelper.encodeDataAsAddress(v, isSerialized, isCompressed);
if (addr != 0L) {
- return new DataAsAddress(addr);
+ return new TinyStoredObject(addr);
}
- ObjectChunk result = allocateChunk(v.length);
+ OffHeapStoredObject result = allocateOffHeapStoredObject(v.length);
//debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()), true);
//debugLog("allocated off heap object of size " + v.length + " @" + Long.toHexString(result.getMemoryAddress()) + "chunkSize=" + result.getSize() + " isSerialized=" + isSerialized + " v=" + Arrays.toString(v), true);
result.setSerializedValue(v);
result.setSerialized(isSerialized);
result.setCompressed(isCompressed);
+ if (originalHeapData != null) {
+ result = new OffHeapStoredObjectWithHeapForm(result, originalHeapData);
+ }
return result;
}
@@ -485,18 +492,18 @@ public class SimpleMemoryAllocatorImpl implements MemoryAllocator {
}
public synchronized List<MemoryBlock> getOrphans() {
- List<ObjectChunk> liveChunks = this.freeList.getLiveChunks();
- List<ObjectChunk> regionChunks = getRegionLiveChunks();
+ List<OffHeapStoredObject> liveChunks = this.freeList.getLiveChunks();
+ List<OffHeapStoredObject> regionChunks = getRegionLiveChunks();
liveChunks.removeAll(regionChunks);
List<MemoryBlock> orphans = new ArrayList<MemoryBlock>();
- for (ObjectChunk chunk: liveChunks) {
+ for (OffHeapStoredObject chunk: liveChunks) {
orphans.add(new MemoryBlockNode(this, chunk));
}
Collections.sort(orphans,
new Comparator<MemoryBlock>() {
@Override
public int compare(MemoryBlock o1, MemoryBlock o2) {
- return Long.valueOf(o1.getMemoryAddress()).compareTo(o2.getMemoryAddress());
+ return Long.valueOf(o1.getAddress()).compareTo(o2.getAddress());
}
});
//this.memoryBlocks = new WeakReference<List<MemoryBlock>>(orphans);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java
new file mode 100644
index 0000000..000b8cb
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/Slab.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+/**
+ * A "slab" of memory.
+ * Slabs can be created by calling {@link AddressableMemoryManager#allocateSlab(int)}.
+ * Slabs have an address, a size, and can be freed.
+ */
+public interface Slab {
+ /**
+ * Return the address of the memory of this slab.
+ */
+ public long getMemoryAddress();
+ /**
+ * Returns the size of this memory chunk in bytes.
+ */
+ public int getSize();
+ /**
+ * Returns any memory allocated for this slab.
+ * Note that after free is called the address of
+ * this slab should no longer be used.
+ */
+ public void free();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java
new file mode 100644
index 0000000..a3f457d
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+/**
+ * Used to create Slab instances.
+ */
+public interface SlabFactory {
+ /** Create and return a Slab
+ * @throws OutOfMemoryError if the create fails
+ */
+ public Slab create(int size);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java
new file mode 100644
index 0000000..1c88bde
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SlabImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+/**
+ * Implements the Slab interface using AddressableMemoryManager.
+ *
+ * @since 9.0
+ */
+public class SlabImpl implements Slab {
+ private final long address;
+ private final int size;
+
+ public SlabImpl(int size) {
+ this(AddressableMemoryManager.allocate(size), size);
+ }
+
+ public SlabImpl(long addr, int size) {
+ this.address = addr;
+ this.size = size;
+ }
+
+ @Override
+ public int getSize() {
+ return this.size;
+ }
+
+ @Override
+ public long getMemoryAddress() {
+ return this.address;
+ }
+
+ @Override
+ public void free() {
+ AddressableMemoryManager.free(this.address);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(getClass().getSimpleName());
+ sb.append("{");
+ sb.append("MemoryAddress=").append(getMemoryAddress());
+ sb.append(", Size=").append(getSize());
+ sb.append("}");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
index 4d93a07..26cb81f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/StoredObject.java
@@ -18,38 +18,27 @@ package com.gemstone.gemfire.internal.offheap;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import com.gemstone.gemfire.internal.Sendable;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
/**
* Represents an object stored in the cache.
* Currently this interface is only used for values stored in off-heap regions.
+ * This interface provides methods that let you read and write the bytes
+ * of addressable memory used to store the bytes of the object.
+ * A reference count is used to determine if the object is still allocated.
+ * To increment the count call {@link #retain()}.
+ * To decrement the count call {@link #release()}.
* At some point in the future it may also be used for values stored in heap regions.
*
* @author darrel
* @since 9.0
*/
-public interface StoredObject extends Releasable, Sendable, CachedDeserializable {
- /**
- * Call to indicate that this object's memory is in use by the caller.
- * The memory will stay allocated until {@link #release()} is called.
- * It is ok for a thread other than the one that called this method to call release.
- * This method is called implicitly at the time the chunk is allocated.
- * Note: @Retained tells you that "this" is retained by this method.
- *
- * @throws IllegalStateException if the max ref count is exceeded.
- * @return true if we are able to retain this chunk; false if we need to retry
- */
- @Retained
- public boolean retain();
-
- /**
- * Returns true if the value stored in this memory chunk is a serialized object. Returns false if it is a byte array.
- */
- public boolean isSerialized();
-
+public interface StoredObject extends Sendable, CachedDeserializable, Releasable {
/**
* Returns true if the value stored in this memory chunk is compressed. Returns false if it is uncompressed.
*/
@@ -92,4 +81,94 @@ public interface StoredObject extends Releasable, Sendable, CachedDeserializable
* @throws IOException
*/
void sendAsCachedDeserializable(DataOutput out) throws IOException;
+
+ /**
+ * Call to indicate that this object's memory is in use by the caller.
+ * The memory will stay allocated until {@link #release()} is called.
+ * It is ok for a thread other than the one that called this method to call release.
+ * This method is called implicitly at the time the chunk is allocated.
+ * Note: @Retained tells you that "this" is retained by this method.
+ *
+ * @throws IllegalStateException if the max ref count is exceeded.
+ * @return true if we are able to retain this chunk; false if we need to retry
+ */
+ @Retained
+ public boolean retain();
+
+ /**
+ * Returns true if this type of StoredObject uses a references count; false otherwise.
+ */
+ public boolean hasRefCount();
+ /**
+ * Returns the number of users of this memory. If this type of StoredObject does not
+ * have a reference count then -1 is returned.
+ */
+ public int getRefCount();
+
+ /**
+ * Returns the address of the memory used to store this object.
+ * This address may not be to the first byte of stored data since
+ * the implementation may store some internal data in the first bytes of the memory.
+ * This address can be used with AddressableMemoryManager.
+ */
+ public long getAddress();
+
+ /**
+ * Returns the number of bytes of memory used by this object to store an object.
+ * This size includes any bytes used for padding and meta-information.
+ */
+ public int getSize();
+
+ /**
+ * Returns the number of bytes of memory used to store the object.
+ * This size does not include any bytes used for padding.
+ */
+ public int getDataSize();
+ public byte readDataByte(int offset);
+ public void writeDataByte(int offset, byte value);
+ public void readDataBytes(int offset, byte[] bytes);
+ public void writeDataBytes(int offset, byte[] bytes);
+ public void readDataBytes(int offset, byte[] bytes, int bytesOffset, int size);
+ public void writeDataBytes(int offset, byte[] bytes, int bytesOffset, int size);
+ /**
+ * Returns an address that can read data from this StoredObject at the given offset.
+ */
+ public long getAddressForReadingData(int offset, int size);
+
+ /**
+ * Returns a StoredObject that acts as if its data is our data starting
+ * at the given offset and limited to the given number of bytes.
+ */
+ public StoredObject slice(int offset, int limit);
+
+ /**
+ * Returns true if our data is equal to other's data; false otherwise.
+ */
+ public boolean checkDataEquals(StoredObject other);
+ /**
+ * Returns true if the given bytes are equal to our data bytes; false otherwise
+ */
+ public boolean checkDataEquals(byte[] serializedObj);
+
+ /**
+ * Creates and returns a direct ByteBuffer that contains the data of this stored object.
+ * Note that the returned ByteBuffer has a reference to the
+ * address of this stored object so it can only be used while this stored object is retained.
+ * @return the created direct byte buffer or null if it could not be created.
+ */
+ @Unretained
+ public ByteBuffer createDirectByteBuffer();
+ /**
+ * Returns true if the data is serialized with PDX
+ */
+ public boolean isSerializedPdxInstance();
+
+ /**
+ * Returns a StoredObject that does not cache the heap form.
+ * If a StoredObject is going to be kept around for a while then
+ * it is good to call this so that it will not also keep the heap
+ * form in memory.
+ */
+ public StoredObject getStoredObjectWithoutHeapForm();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
deleted file mode 100644
index 99fd96f..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/SyncChunkStack.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.offheap;
-
-import com.gemstone.gemfire.LogWriter;
-
-/**
- * A "stack" of "chunk" instances. The chunks are not kept
- * in java object form but instead each "chunk" is just an
- * off-heap address.
- * This class is used for each "tiny" free-list of the off-heap memory allocator.
- */
-public class SyncChunkStack {
- // Ok to read without sync but must be synced on write
- private volatile long topAddr;
-
- public SyncChunkStack(long addr) {
- if (addr != 0L) SimpleMemoryAllocatorImpl.validateAddress(addr);
- this.topAddr = addr;
- }
- public SyncChunkStack() {
- this.topAddr = 0L;
- }
- public boolean isEmpty() {
- return this.topAddr == 0L;
- }
- public void offer(long e) {
- assert e != 0;
- SimpleMemoryAllocatorImpl.validateAddress(e);
- synchronized (this) {
- ObjectChunk.setNext(e, this.topAddr);
- this.topAddr = e;
- }
- }
- public long poll() {
- long result;
- synchronized (this) {
- result = this.topAddr;
- if (result != 0L) {
- this.topAddr = ObjectChunk.getNext(result);
- }
- }
- return result;
- }
- /**
- * Returns the address of the "top" item in this stack.
- */
- public long getTopAddress() {
- return this.topAddr;
- }
- /**
- * Removes all the Chunks from this stack
- * and returns the address of the first chunk.
- * The caller owns all the Chunks after this call.
- */
- public long clear() {
- long result;
- synchronized (this) {
- result = this.topAddr;
- if (result != 0L) {
- this.topAddr = 0L;
- }
- }
- return result;
- }
- public void logSizes(LogWriter lw, String msg) {
- long headAddr = this.topAddr;
- long addr;
- boolean concurrentModDetected;
- do {
- concurrentModDetected = false;
- addr = headAddr;
- while (addr != 0L) {
- int curSize = ObjectChunk.getSize(addr);
- addr = ObjectChunk.getNext(addr);
- testHookDoConcurrentModification();
- long curHead = this.topAddr;
- if (curHead != headAddr) {
- headAddr = curHead;
- concurrentModDetected = true;
- // Someone added or removed from the stack.
- // So we break out of the inner loop and start
- // again at the new head.
- break;
- }
- // TODO construct a single log msg
- // that gets reset when concurrentModDetected.
- lw.info(msg + curSize);
- }
- } while (concurrentModDetected);
- }
- public long computeTotalSize() {
- long result;
- long headAddr = this.topAddr;
- long addr;
- boolean concurrentModDetected;
- do {
- concurrentModDetected = false;
- result = 0;
- addr = headAddr;
- while (addr != 0L) {
- result += ObjectChunk.getSize(addr);
- addr = ObjectChunk.getNext(addr);
- testHookDoConcurrentModification();
- long curHead = this.topAddr;
- if (curHead != headAddr) {
- headAddr = curHead;
- concurrentModDetected = true;
- // Someone added or removed from the stack.
- // So we break out of the inner loop and start
- // again at the new head.
- break;
- }
- }
- } while (concurrentModDetected);
- return result;
- }
-
- /**
- * This method allows tests to override it
- * and do a concurrent modification to the stack.
- * For production code it will be a noop.
- */
- protected void testHookDoConcurrentModification() {
- // nothing needed in production code
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java
new file mode 100644
index 0000000..e8878fa
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/TinyStoredObject.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.offheap;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.cache.BytesAndBitsForCompactor;
+import com.gemstone.gemfire.internal.cache.EntryBits;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.RegionEntry;
+import com.gemstone.gemfire.internal.cache.RegionEntryContext;
+
+/**
+ * Used to represent stored objects that can be stored
+ * in the address field.
+ * The RegionEntry for an off-heap region uses a primitive
+ * long to store the off-heap address of the entry's value.
+ * If the value can be encoded as a long (i.e. its serialized
+ * representation will fit in the 8 bytes of a long without looking
+ * like an actual off-heap address) then these tiny values on an
+ * off-heap regions are actually stored on the heap in the primitive
+ * long field. When these values are "objectified" they will be an
+ * instance of this class.
+ * Instances of this class have a very short lifetime.
+ */
+public class TinyStoredObject extends AbstractStoredObject {
+ private final long address;
+
+ public TinyStoredObject(long addr) {
+ this.address = addr;
+ }
+
+ @Override
+ public long getAddress() {
+ return this.address;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o instanceof TinyStoredObject) {
+ return getAddress() == ((TinyStoredObject) o).getAddress();
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ long value = getAddress();
+ return (int)(value ^ (value >>> 32));
+ }
+
+ @Override
+ public int getSizeInBytes() {
+ return 0;
+ }
+
+ public byte[] getDecompressedBytes(RegionEntryContext r) {
+ byte[] bytes = OffHeapRegionEntryHelper.decodeAddressToBytes(getAddress(), true, true);
+ if (isCompressed()) {
+ long time = r.getCachePerfStats().startDecompression();
+ bytes = r.getCompressor().decompress(bytes);
+ r.getCachePerfStats().endDecompression(time);
+ }
+ return bytes;
+ }
+
+ /**
+ * If we contain a byte[] return it.
+ * Otherwise return the serialize bytes in us in a byte array.
+ */
+ public byte[] getRawBytes() {
+ return OffHeapRegionEntryHelper.decodeAddressToBytes(getAddress(), true, false);
+ }
+
+ @Override
+ public byte[] getSerializedValue() {
+ byte[] value = OffHeapRegionEntryHelper.decodeAddressToBytes(this.address, true, false);
+ if (!isSerialized()) {
+ value = EntryEventImpl.serialize(value);
+ }
+ return value;
+ }
+
+ @Override
+ public Object getDeserializedValue(Region r, RegionEntry re) {
+ return OffHeapRegionEntryHelper.decodeAddressToObject(this.address);
+ }
+
+ @Override
+ public void fillSerializedValue(BytesAndBitsForCompactor wrapper,
+ byte userBits) {
+ byte[] value;
+ if (isSerialized()) {
+ value = getSerializedValue();
+ userBits = EntryBits.setSerialized(userBits, true);
+ } else {
+ value = (byte[]) getDeserializedForReading();
+ }
+ wrapper.setData(value, userBits, value.length, true);
+ }
+
+ @Override
+ public int getValueSizeInBytes() {
+ return 0;
+ }
+
+ @Override
+ public boolean isSerialized() {
+ return OffHeapRegionEntryHelper.isSerialized(this.address);
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return OffHeapRegionEntryHelper.isCompressed(this.address);
+ }
+
+ @Override
+ public void release() {
+ // nothing needed
+ }
+
+ @Override
+ public boolean retain() {
+ return true;
+ }
+
+ @Override
+ public int getRefCount() {
+ return -1;
+ }
+
+ @Override
+ public int getSize() {
+ return Long.BYTES;
+ }
+
+ @Override
+ public int getDataSize() {
+ return OffHeapRegionEntryHelper.decodeAddressToDataSize(this.address);
+ }
+
+ @Override
+ public byte readDataByte(int offset) {
+ // TODO OFFHEAP: what if the data is compressed?
+ return getRawBytes()[offset];
+ }
+
+ @Override
+ public void writeDataByte(int offset, byte value) {
+ throw new UnsupportedOperationException("ObjectStoredAsAddress does not support modifying the data bytes");
+ }
+
+ @Override
+ public void readDataBytes(int offset, byte[] bytes) {
+ readDataBytes(offset, bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void writeDataBytes(int offset, byte[] bytes) {
+ writeDataBytes(offset, bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void readDataBytes(int offset, byte[] bytes, int bytesOffset, int size) {
+ // TODO OFFHEAP: what if the data is compressed?
+ byte[] src = getRawBytes();
+ int dstIdx = bytesOffset;
+ for (int i = offset; i < offset+size; i++) {
+ bytes[dstIdx++] = src[i];
+ }
+ }
+
+ @Override
+ public void writeDataBytes(int offset, byte[] bytes, int bytesOffset, int size) {
+ throw new UnsupportedOperationException("ObjectStoredAsAddress does not support modifying the data bytes");
+ }
+
+ @Override
+ public ByteBuffer createDirectByteBuffer() {
+ return null;
+ }
+
+ @Override
+ public boolean hasRefCount() {
+ return false;
+ }
+
+ @Override
+ public boolean checkDataEquals(StoredObject so) {
+ // TODO OFFHEAP: what if the data is compressed?
+ return equals(so);
+ }
+
+ @Override
+ public boolean checkDataEquals(byte[] serializedObj) {
+ // TODO OFFHEAP: what if the data is compressed?
+ byte[] myBytes = getSerializedValue();
+ return Arrays.equals(myBytes, serializedObj);
+ }
+
+ @Override
+ public long getAddressForReadingData(int offset, int size) {
+ throw new UnsupportedOperationException("ObjectStoredAsAddress does not support reading at an address");
+ }
+
+ @Override
+ public StoredObject slice(int offset, int limit) {
+ throw new UnsupportedOperationException("ObjectStoredAsAddress does not support slice");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
deleted file mode 100644
index aebc459..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/offheap/UnsafeMemoryChunk.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.offheap;
-
-import com.gemstone.gemfire.internal.SharedLibrary;
-import com.gemstone.gemfire.pdx.internal.unsafe.UnsafeWrapper;
-
-/**
- * Represents a single addressable chunk of off-heap memory. The size specifies
- * the number of bytes stored at the address.
- *
- * @since 9.0
- */
-public class UnsafeMemoryChunk implements AddressableMemoryChunk {
- private static final UnsafeWrapper unsafe;
- private static final int ARRAY_BYTE_BASE_OFFSET;
- private static String reason;
- static {
- UnsafeWrapper tmp = null;
- try {
- tmp = new UnsafeWrapper();
- reason = null;
- } catch (RuntimeException ignore) {
- reason = ignore.toString();
- } catch (Error ignore) {
- reason = ignore.toString();
- }
- unsafe = tmp;
- ARRAY_BYTE_BASE_OFFSET = unsafe != null ? unsafe.arrayBaseOffset(byte[].class) : 0;
- }
-
- private final long data;
- private final int size;
-
- public UnsafeMemoryChunk(int size) {
- if (unsafe == null) {
- throw new OutOfMemoryError("Off-heap memory is not available because: " + reason);
- }
- try {
- this.data = unsafe.allocateMemory(size);
- this.size = size;
- } catch (OutOfMemoryError err) {
- String msg = "Failed creating " + size + " bytes of off-heap memory during cache creation.";
- if (err.getMessage() != null && !err.getMessage().isEmpty()) {
- msg += " Cause: " + err.getMessage();
- }
- if (!SharedLibrary.is64Bit() && size >= (1024*1024*1024)) {
- msg += " The JVM looks like a 32-bit one. For large amounts of off-heap memory a 64-bit JVM is needed.";
- }
- throw new OutOfMemoryError(msg);
- }
- }
-
- @Override
- public int getSize() {
- return (int)this.size;
- }
-
- /* (non-Javadoc)
- * @see com.gemstone.gemfire.internal.offheap.AddressableMemoryChunk#getMemoryAddress()
- */
- @Override
- public long getMemoryAddress() {
- return this.data;
- }
-
- public static byte readAbsoluteByte(long addr) {
- return unsafe.getByte(addr);
- }
- public static char readAbsoluteChar(long addr) {
- return unsafe.getChar(null, addr);
- }
- public static short readAbsoluteShort(long addr) {
- return unsafe.getShort(null, addr);
- }
- public static int readAbsoluteInt(long addr) {
- return unsafe.getInt(null, addr);
- }
- public static int readAbsoluteIntVolatile(long addr) {
- return unsafe.getIntVolatile(null, addr);
- }
- public static long readAbsoluteLong(long addr) {
- return unsafe.getLong(null, addr);
- }
- public static long readAbsoluteLongVolatile(long addr) {
- return unsafe.getLongVolatile(null, addr);
- }
-
- @Override
- public byte readByte(int offset) {
- return readAbsoluteByte(this.data+offset);
- }
-
- public static void writeAbsoluteByte(long addr, byte value) {
- unsafe.putByte(addr, value);
- }
-
- public static void writeAbsoluteInt(long addr, int value) {
- unsafe.putInt(null, addr, value);
- }
- public static void writeAbsoluteIntVolatile(long addr, int value) {
- unsafe.putIntVolatile(null, addr, value);
- }
- public static boolean writeAbsoluteIntVolatile(long addr, int expected, int value) {
- return unsafe.compareAndSwapInt(null, addr, expected, value);
- }
- public static void writeAbsoluteLong(long addr, long value) {
- unsafe.putLong(null, addr, value);
- }
- public static void writeAbsoluteLongVolatile(long addr, long value) {
- unsafe.putLongVolatile(null, addr, value);
- }
- public static boolean writeAbsoluteLongVolatile(long addr, long expected, long value) {
- return unsafe.compareAndSwapLong(null, addr, expected, value);
- }
-
- @Override
- public void writeByte(int offset, byte value) {
- writeAbsoluteByte(this.data+offset, value);
- }
-
- @Override
- public void readBytes(int offset, byte[] bytes) {
- readBytes(offset, bytes, 0, bytes.length);
- }
-
- @Override
- public void writeBytes(int offset, byte[] bytes) {
- writeBytes(offset, bytes, 0, bytes.length);
- }
-
- public static void readAbsoluteBytes(long addr, byte[] bytes, int bytesOffset, int size) {
- // Throwing an Error instead of using the "assert" keyword because passing < 0 to
- // copyMemory(...) can lead to a core dump with some JVMs and we don't want to
- // require the -ea JVM flag.
- if (size < 0) {
- throw new AssertionError("Size=" + size + ", but size must be >= 0");
- }
-
- assert bytesOffset >= 0 : "byteOffset=" + bytesOffset;
- assert bytesOffset + size <= bytes.length : "byteOffset=" + bytesOffset + ",size=" + size + ",bytes.length=" + bytes.length;
-
- if (size == 0) {
- return; // No point in wasting time copying 0 bytes
- }
- unsafe.copyMemory(null, addr, bytes, ARRAY_BYTE_BASE_OFFSET+bytesOffset, size);
- }
-
- @Override
- public void readBytes(int offset, byte[] bytes, int bytesOffset, int size) {
- readAbsoluteBytes(this.data+offset, bytes, bytesOffset, size);
- }
-
- public static void copyMemory(long srcAddr, long dstAddr, long size) {
- unsafe.copyMemory(srcAddr, dstAddr, size);
- }
-
- public static void writeAbsoluteBytes(long addr, byte[] bytes, int bytesOffset, int size) {
- // Throwing an Error instead of using the "assert" keyword because passing < 0 to
- // copyMemory(...) can lead to a core dump with some JVMs and we don't want to
- // require the -ea JVM flag.
- if (size < 0) {
- throw new AssertionError("Size=" + size + ", but size must be >= 0");
- }
-
- assert bytesOffset >= 0 : "byteOffset=" + bytesOffset;
- assert bytesOffset + size <= bytes.length : "byteOffset=" + bytesOffset + ",size=" + size + ",bytes.length=" + bytes.length;
-
- if (size == 0) {
- return; // No point in wasting time copying 0 bytes
- }
- unsafe.copyMemory(bytes, ARRAY_BYTE_BASE_OFFSET+bytesOffset, null, addr, size);
- }
-
- public static void fill(long addr, int size, byte fill) {
- unsafe.setMemory(addr, size, fill);
- }
-
- @Override
- public void writeBytes(int offset, byte[] bytes, int bytesOffset, int size) {
- writeAbsoluteBytes(this.data+offset, bytes, bytesOffset, size);
- }
-
- @Override
- public void release() {
- unsafe.freeMemory(this.data);
- }
-
- @Override
- public void copyBytes(int src, int dst, int size) {
- unsafe.copyMemory(this.data+src, this.data+dst, size);
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder(getClass().getSimpleName());
- sb.append("{");
- sb.append("MemoryAddress=").append(getMemoryAddress());
- sb.append(", Size=").append(getSize());
- sb.append("}");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
index cfc05f2..d7ec947 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
@@ -31,8 +31,8 @@ import java.nio.ByteOrder;
import com.gemstone.gemfire.internal.ByteBufferWriter;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+import com.gemstone.gemfire.internal.offheap.AddressableMemoryManager;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
/**
* <p>
@@ -109,15 +109,15 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
public static ByteSource create(ByteBuffer bb) {
return new ByteBufferByteSource(bb);
}
- public static ByteSource create(ObjectChunk chunk) {
- // Since I found a way to create a DirectByteBuffer (using reflection) from a Chunk
+ public static ByteSource create(StoredObject so) {
+ // Since I found a way to create a DirectByteBuffer (using reflection) from a StoredObject
// we might not even need the ByteSource abstraction any more.
// But it is possible that createByteBuffer will not work on a different jdk so keep it for now.
- ByteBuffer bb = chunk.createDirectByteBuffer();
+ ByteBuffer bb = so.createDirectByteBuffer();
if (bb != null) {
return create(bb);
} else {
- return new OffHeapByteSource(chunk);
+ return new OffHeapByteSource(so);
}
}
}
@@ -323,10 +323,10 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
public static class OffHeapByteSource implements ByteSource {
private int position;
private int limit;
- private final ObjectChunk chunk;
+ private final StoredObject chunk;
- public OffHeapByteSource(ObjectChunk c) {
- this.chunk = c;
+ public OffHeapByteSource(StoredObject so) {
+ this.chunk = so;
this.position = 0;
this.limit = capacity();
}
@@ -474,17 +474,17 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
}
int p = this.position;
this.position += length;
- this.chunk.readBytes(p, dst, offset, length);
+ this.chunk.readDataBytes(p, dst, offset, length);
}
@Override
public byte get() {
- return this.chunk.readByte(nextGetIndex());
+ return this.chunk.readDataByte(nextGetIndex());
}
@Override
public byte get(int pos) {
checkIndex(pos);
- return this.chunk.readByte(pos);
+ return this.chunk.readDataByte(pos);
}
/**
@@ -513,16 +513,16 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
return basicGetShort(pos);
}
private short basicGetShort(int pos) {
- long addr = this.chunk.getAddressForReading(pos, 2);
+ long addr = this.chunk.getAddressForReadingData(pos, 2);
if (unaligned) {
- short result = UnsafeMemoryChunk.readAbsoluteShort(addr);
+ short result = AddressableMemoryManager.readShort(addr);
if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
result = Short.reverseBytes(result);
}
return result;
} else {
- int ch1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- int ch2 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+ int ch1 = AddressableMemoryManager.readByte(addr++);
+ int ch2 = AddressableMemoryManager.readByte(addr);
return (short)((ch1 << 8) + (ch2 << 0));
}
}
@@ -537,16 +537,16 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
return basicGetChar(pos);
}
private char basicGetChar(int pos) {
- long addr = this.chunk.getAddressForReading(pos, 2);
+ long addr = this.chunk.getAddressForReadingData(pos, 2);
if (unaligned) {
- char result = UnsafeMemoryChunk.readAbsoluteChar(addr);
+ char result = AddressableMemoryManager.readChar(addr);
if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
result = Character.reverseBytes(result);
}
return result;
} else {
- int ch1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- int ch2 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+ int ch1 = AddressableMemoryManager.readByte(addr++);
+ int ch2 = AddressableMemoryManager.readByte(addr);
return (char)((ch1 << 8) + (ch2 << 0));
}
}
@@ -562,18 +562,18 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
}
private int basicGetInt(final int pos) {
- long addr = this.chunk.getAddressForReading(pos, 4);
+ long addr = this.chunk.getAddressForReadingData(pos, 4);
if (unaligned) {
- int result = UnsafeMemoryChunk.readAbsoluteInt(addr);
+ int result = AddressableMemoryManager.readInt(addr);
if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
result = Integer.reverseBytes(result);
}
return result;
} else {
- byte b0 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b2 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b3 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+ byte b0 = AddressableMemoryManager.readByte(addr++);
+ byte b1 = AddressableMemoryManager.readByte(addr++);
+ byte b2 = AddressableMemoryManager.readByte(addr++);
+ byte b3 = AddressableMemoryManager.readByte(addr);
return (b0 << 24) + ((b1 & 255) << 16) + ((b2 & 255) << 8) + ((b3 & 255) << 0);
}
}
@@ -588,22 +588,22 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
return basicGetLong(pos);
}
private long basicGetLong(final int pos) {
- long addr = this.chunk.getAddressForReading(pos, 8);
+ long addr = this.chunk.getAddressForReadingData(pos, 8);
if (unaligned) {
- long result = UnsafeMemoryChunk.readAbsoluteLong(addr);
+ long result = AddressableMemoryManager.readLong(addr);
if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
result = Long.reverseBytes(result);
}
return result;
} else {
- byte b0 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b2 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b3 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b4 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b5 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b6 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
- byte b7 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+ byte b0 = AddressableMemoryManager.readByte(addr++);
+ byte b1 = AddressableMemoryManager.readByte(addr++);
+ byte b2 = AddressableMemoryManager.readByte(addr++);
+ byte b3 = AddressableMemoryManager.readByte(addr++);
+ byte b4 = AddressableMemoryManager.readByte(addr++);
+ byte b5 = AddressableMemoryManager.readByte(addr++);
+ byte b6 = AddressableMemoryManager.readByte(addr++);
+ byte b7 = AddressableMemoryManager.readByte(addr);
return (((long)b0 << 56) +
((long)(b1 & 255) << 48) +
((long)(b2 & 255) << 40) +
@@ -724,7 +724,7 @@ public class ByteBufferInputStream extends InputStream implements DataInput, jav
this.buffer = copy.buffer.duplicate();
}
- public ByteBufferInputStream(ObjectChunk blob) {
+ public ByteBufferInputStream(StoredObject blob) {
this.buffer = ByteSourceFactory.create(blob);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
index d632158..ff0871a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
@@ -18,7 +18,7 @@ package com.gemstone.gemfire.internal.tcp;
import java.nio.ByteBuffer;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
/**
* You should only create an instance of this class if the bytes this buffer reads
@@ -67,7 +67,7 @@ public class ImmutableByteBufferInputStream extends ByteBufferInputStream {
// for serialization
}
- public ImmutableByteBufferInputStream(ObjectChunk blob) {
+ public ImmutableByteBufferInputStream(StoredObject blob) {
super(blob);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
index 40015a4..28252c3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
@@ -27,7 +27,7 @@ import com.gemstone.gemfire.internal.DSCODE;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
import com.gemstone.gemfire.pdx.internal.PdxInputStream;
@@ -140,7 +140,7 @@ public class BlobHelper {
* If a PdxInstance is returned then it will refer to Chunk's off-heap memory
* with an unretained reference.
*/
- public static @Unretained Object deserializeOffHeapBlob(ObjectChunk blob) throws IOException, ClassNotFoundException {
+ public static @Unretained Object deserializeOffHeapBlob(StoredObject blob) throws IOException, ClassNotFoundException {
Object result;
final long start = startDeserialization();
// For both top level and nested pdxs we just want a reference to this off-heap blob.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java b/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
index 4a5a9df..85c6cd5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/pdx/internal/PdxInputStream.java
@@ -26,7 +26,7 @@ import java.util.Date;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.pdx.PdxSerializationException;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.tcp.ByteBufferInputStream;
import com.gemstone.gemfire.internal.tcp.ImmutableByteBufferInputStream;
@@ -76,7 +76,7 @@ public class PdxInputStream extends ImmutableByteBufferInputStream {
// for serialization
}
- public PdxInputStream(ObjectChunk blob) {
+ public PdxInputStream(StoredObject blob) {
super(blob);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index 7a4d09e..dce68cf 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -110,8 +110,8 @@ import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VMRegionVersionVector;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.DistributedTestUtils;
@@ -2004,8 +2004,8 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
LocalRegion reRegion;
reRegion = (LocalRegion) region;
RegionEntry re = reRegion.getRegionEntry(key2);
- MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) re._getValue();
- assertEquals(1, mc.getRefCount());
+ StoredObject so = (StoredObject) re._getValue();
+ assertEquals(1, so.getRefCount());
assertEquals(1, ma.getStats().getObjects());
}
}
@@ -2091,8 +2091,8 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
assertEquals(2, ma.getStats().getObjects());
LocalRegion reRegion;
reRegion = (LocalRegion) region;
- MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
- assertEquals(1, mc.getRefCount());
+ StoredObject so = (StoredObject) reRegion.getRegionEntry(key)._getValue();
+ assertEquals(1, so.getRefCount());
}
}
});
@@ -2157,8 +2157,8 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
assertEquals(2, ma.getStats().getObjects());
LocalRegion reRegion;
reRegion = (LocalRegion) region;
- MemoryChunkWithRefCount mc = (MemoryChunkWithRefCount) reRegion.getRegionEntry(key)._getValue();
- assertEquals(1, mc.getRefCount());
+ StoredObject so = (StoredObject) reRegion.getRegionEntry(key)._getValue();
+ assertEquals(1, so.getRefCount());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java
deleted file mode 100644
index b69f82e..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ChunkValueWrapperJUnitTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ChunkValueWrapper;
-import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.Flushable;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
-import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
-import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
-import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class ChunkValueWrapperJUnitTest {
-
- private static ChunkValueWrapper createChunkValueWrapper(byte[] bytes, boolean isSerialized) {
- ObjectChunk c = (ObjectChunk)SimpleMemoryAllocatorImpl.getAllocator().allocateAndInitialize(bytes, isSerialized, false);
- return new ChunkValueWrapper(c);
- }
-
- @Before
- public void setUp() throws Exception {
- SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
- }
-
- @After
- public void tearDown() throws Exception {
- SimpleMemoryAllocatorImpl.freeOffHeapMemory();
- }
-
- @Test
- public void testIsSerialized() {
- assertEquals(true, createChunkValueWrapper(new byte[16], true).isSerialized());
- assertEquals(false, createChunkValueWrapper(new byte[16], false).isSerialized());
- }
-
- @Test
- public void testGetUserBits() {
- assertEquals((byte)1, createChunkValueWrapper(new byte[16], true).getUserBits());
- assertEquals((byte)0, createChunkValueWrapper(new byte[16], false).getUserBits());
- }
-
- @Test
- public void testGetLength() {
- assertEquals(32, createChunkValueWrapper(new byte[32], true).getLength());
- assertEquals(17, createChunkValueWrapper(new byte[17], false).getLength());
- }
-
- @Test
- public void testGetBytesAsString() {
- assertEquals("byte[0, 0, 0, 0, 0, 0, 0, 0]", createChunkValueWrapper(new byte[8], false).getBytesAsString());
- }
-
- @Test
- public void testSendTo() throws IOException {
- final ByteBuffer bb = ByteBuffer.allocateDirect(18);
- bb.limit(8);
- ChunkValueWrapper vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8}, false);
- vw.sendTo(bb, new Flushable() {
- @Override
- public void flush() throws IOException {
- fail("should not have been called");
- }
-
- @Override
- public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
- fail("should not have been called");
- }
- });
- assertEquals(8, bb.position());
- bb.flip();
- assertEquals(1, bb.get());
- assertEquals(2, bb.get());
- assertEquals(3, bb.get());
- assertEquals(4, bb.get());
- assertEquals(5, bb.get());
- assertEquals(6, bb.get());
- assertEquals(7, bb.get());
- assertEquals(8, bb.get());
-
- bb.clear();
- bb.limit(8);
- vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9}, false);
- final int[] flushCalls = new int[1];
- vw.sendTo(bb, new Flushable() {
- @Override
- public void flush() throws IOException {
- if (flushCalls[0] != 0) {
- fail("expected flush to only be called once");
- }
- flushCalls[0]++;
- assertEquals(8, bb.position());
- for (int i=0; i < 8; i++) {
- assertEquals(i+1, bb.get(i));
- }
- bb.clear();
- bb.limit(8);
- }
-
- @Override
- public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
- fail("should not have been called");
- }
- });
- assertEquals(1, bb.position());
- bb.flip();
- assertEquals(9, bb.get());
-
- bb.clear();
- bb.limit(8);
- flushCalls[0] = 0;
- vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17}, false);
- vw.sendTo(bb, new Flushable() {
- @Override
- public void flush() throws IOException {
- if (flushCalls[0] > 1) {
- fail("expected flush to only be called twice");
- }
- assertEquals(8, bb.position());
- for (int i=0; i < 8; i++) {
- assertEquals((flushCalls[0]*8)+i+1, bb.get(i));
- }
- flushCalls[0]++;
- bb.clear();
- bb.limit(8);
- }
-
- @Override
- public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
- fail("should not have been called");
- }
- });
- assertEquals(1, bb.position());
- bb.flip();
- assertEquals(17, bb.get());
-
- // now test with a chunk that will not fit in bb.
- bb.clear();
- flushCalls[0] = 0;
- bb.put((byte) 0);
- vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19}, false);
- vw.sendTo(bb, new Flushable() {
- @Override
- public void flush() throws IOException {
- fail("should not have been called");
- }
-
- @Override
- public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
- flushCalls[0]++;
- assertEquals(1, bb.position());
- bb.flip();
- assertEquals(0, bb.get());
- assertEquals(19, chunkbb.remaining());
- for (int i=1; i <= 19; i++) {
- assertEquals(i, chunkbb.get());
- }
- }
- });
- assertEquals(1, flushCalls[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
index cbf3bf6..8fd6895 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapTestUtil.java
@@ -52,7 +52,7 @@ public class OffHeapTestUtil {
}
if(orphans != null && ! orphans.isEmpty()) {
- List<RefCountChangeInfo> info = ReferenceCountHelper.getRefCountInfo(orphans.get(0).getMemoryAddress());
+ List<RefCountChangeInfo> info = ReferenceCountHelper.getRefCountInfo(orphans.get(0).getAddress());
System.out.println("FOUND ORPHAN!!");
System.out.println("Sample orphan: " + orphans.get(0));
System.out.println("Orphan info: " + info);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java
new file mode 100644
index 0000000..0829009
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OffHeapValueWrapperJUnitTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.OffHeapValueWrapper;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.Flushable;
+import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
+import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.SlabImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class OffHeapValueWrapperJUnitTest {
+
+ private static OffHeapValueWrapper createChunkValueWrapper(byte[] bytes, boolean isSerialized) {
+ StoredObject c = SimpleMemoryAllocatorImpl.getAllocator().allocateAndInitialize(bytes, isSerialized, false);
+ return new OffHeapValueWrapper(c);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ SimpleMemoryAllocatorImpl.freeOffHeapMemory();
+ }
+
+ @Test
+ public void testIsSerialized() {
+ assertEquals(true, createChunkValueWrapper(new byte[16], true).isSerialized());
+ assertEquals(false, createChunkValueWrapper(new byte[16], false).isSerialized());
+ }
+
+ @Test
+ public void testGetUserBits() {
+ assertEquals((byte)1, createChunkValueWrapper(new byte[16], true).getUserBits());
+ assertEquals((byte)0, createChunkValueWrapper(new byte[16], false).getUserBits());
+ }
+
+ @Test
+ public void testGetLength() {
+ assertEquals(32, createChunkValueWrapper(new byte[32], true).getLength());
+ assertEquals(17, createChunkValueWrapper(new byte[17], false).getLength());
+ }
+
+ @Test
+ public void testGetBytesAsString() {
+ assertEquals("byte[0, 0, 0, 0, 0, 0, 0, 0]", createChunkValueWrapper(new byte[8], false).getBytesAsString());
+ }
+
+ @Test
+ public void testSendTo() throws IOException {
+ final ByteBuffer bb = ByteBuffer.allocateDirect(18);
+ bb.limit(8);
+ OffHeapValueWrapper vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8}, false);
+ vw.sendTo(bb, new Flushable() {
+ @Override
+ public void flush() throws IOException {
+ fail("should not have been called");
+ }
+
+ @Override
+ public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+ fail("should not have been called");
+ }
+ });
+ assertEquals(8, bb.position());
+ bb.flip();
+ assertEquals(1, bb.get());
+ assertEquals(2, bb.get());
+ assertEquals(3, bb.get());
+ assertEquals(4, bb.get());
+ assertEquals(5, bb.get());
+ assertEquals(6, bb.get());
+ assertEquals(7, bb.get());
+ assertEquals(8, bb.get());
+
+ bb.clear();
+ bb.limit(8);
+ vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9}, false);
+ final int[] flushCalls = new int[1];
+ vw.sendTo(bb, new Flushable() {
+ @Override
+ public void flush() throws IOException {
+ if (flushCalls[0] != 0) {
+ fail("expected flush to only be called once");
+ }
+ flushCalls[0]++;
+ assertEquals(8, bb.position());
+ for (int i=0; i < 8; i++) {
+ assertEquals(i+1, bb.get(i));
+ }
+ bb.clear();
+ bb.limit(8);
+ }
+
+ @Override
+ public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+ fail("should not have been called");
+ }
+ });
+ assertEquals(1, bb.position());
+ bb.flip();
+ assertEquals(9, bb.get());
+
+ bb.clear();
+ bb.limit(8);
+ flushCalls[0] = 0;
+ vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17}, false);
+ vw.sendTo(bb, new Flushable() {
+ @Override
+ public void flush() throws IOException {
+ if (flushCalls[0] > 1) {
+ fail("expected flush to only be called twice");
+ }
+ assertEquals(8, bb.position());
+ for (int i=0; i < 8; i++) {
+ assertEquals((flushCalls[0]*8)+i+1, bb.get(i));
+ }
+ flushCalls[0]++;
+ bb.clear();
+ bb.limit(8);
+ }
+
+ @Override
+ public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+ fail("should not have been called");
+ }
+ });
+ assertEquals(1, bb.position());
+ bb.flip();
+ assertEquals(17, bb.get());
+
+ // now test with a chunk that will not fit in bb.
+ bb.clear();
+ flushCalls[0] = 0;
+ bb.put((byte) 0);
+ vw = createChunkValueWrapper(new byte[]{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19}, false);
+ vw.sendTo(bb, new Flushable() {
+ @Override
+ public void flush() throws IOException {
+ fail("should not have been called");
+ }
+
+ @Override
+ public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException {
+ flushCalls[0]++;
+ assertEquals(1, bb.position());
+ bb.flip();
+ assertEquals(0, bb.get());
+ assertEquals(19, chunkbb.remaining());
+ for (int i=1; i <= 19; i++) {
+ assertEquals(i, chunkbb.get());
+ }
+ }
+ });
+ assertEquals(1, flushCalls[0]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
index 690b55a..84d7fc7 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/OldValueImporterTestBase.java
@@ -26,12 +26,12 @@ import org.junit.Test;
import com.gemstone.gemfire.internal.HeapDataOutputStream;
import com.gemstone.gemfire.internal.cache.EntryEventImpl.OldValueImporter;
-import com.gemstone.gemfire.internal.offheap.ObjectChunk;
-import com.gemstone.gemfire.internal.offheap.DataAsAddress;
+import com.gemstone.gemfire.internal.offheap.OffHeapStoredObject;
+import com.gemstone.gemfire.internal.offheap.TinyStoredObject;
import com.gemstone.gemfire.internal.offheap.NullOffHeapMemoryStats;
import com.gemstone.gemfire.internal.offheap.NullOutOfOffHeapMemoryListener;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
-import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+import com.gemstone.gemfire.internal.offheap.SlabImpl;
import com.gemstone.gemfire.internal.util.BlobHelper;
public abstract class OldValueImporterTestBase {
@@ -110,10 +110,10 @@ public abstract class OldValueImporterTestBase {
// off-heap DataAsAddress byte array
{
SimpleMemoryAllocatorImpl sma =
- SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+ SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
try {
byte[] baValue = new byte[] {1,2};
- DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValue, false, false);
+ TinyStoredObject baValueSO = (TinyStoredObject) sma.allocateAndInitialize(baValue, false, false);
OldValueImporter omsg = createImporter();
omsg.importOldObject(baValueSO, false);
hdos = new HeapDataOutputStream(bytes);
@@ -127,10 +127,10 @@ public abstract class OldValueImporterTestBase {
// off-heap Chunk byte array
{
SimpleMemoryAllocatorImpl sma =
- SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+ SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
try {
byte[] baValue = new byte[] {1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17};
- ObjectChunk baValueSO = (ObjectChunk) sma.allocateAndInitialize(baValue, false, false);
+ OffHeapStoredObject baValueSO = (OffHeapStoredObject) sma.allocateAndInitialize(baValue, false, false);
OldValueImporter omsg = createImporter();
omsg.importOldObject(baValueSO, false);
hdos = new HeapDataOutputStream(bytes);
@@ -144,11 +144,11 @@ public abstract class OldValueImporterTestBase {
// off-heap DataAsAddress String
{
SimpleMemoryAllocatorImpl sma =
- SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+ SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
try {
String baValue = "12";
byte[] baValueBlob = BlobHelper.serializeToBlob(baValue);
- DataAsAddress baValueSO = (DataAsAddress) sma.allocateAndInitialize(baValueBlob, true, false);
+ TinyStoredObject baValueSO = (TinyStoredObject) sma.allocateAndInitialize(baValueBlob, true, false);
OldValueImporter omsg = createImporter();
omsg.importOldObject(baValueSO, true);
hdos = new HeapDataOutputStream(bytes);
@@ -162,11 +162,11 @@ public abstract class OldValueImporterTestBase {
// off-heap Chunk String
{
SimpleMemoryAllocatorImpl sma =
- SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new UnsafeMemoryChunk[]{new UnsafeMemoryChunk(1024*1024)});
+ SimpleMemoryAllocatorImpl.createForUnitTest(new NullOutOfOffHeapMemoryListener(), new NullOffHeapMemoryStats(), new SlabImpl[]{new SlabImpl(1024*1024)});
try {
String baValue = "12345678";
byte[] baValueBlob = BlobHelper.serializeToBlob(baValue);
- ObjectChunk baValueSO = (ObjectChunk) sma.allocateAndInitialize(baValueBlob, true, false);
+ OffHeapStoredObject baValueSO = (OffHeapStoredObject) sma.allocateAndInitialize(baValueBlob, true, false);
OldValueImporter omsg = createImporter();
omsg.importOldObject(baValueSO, true);
hdos = new HeapDataOutputStream(bytes);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
index b7bd47a..8caf3f6 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.offheap.HeapByteBufferMemoryChunkJUnitTest;
import com.gemstone.gemfire.test.junit.categories.UnitTest;
@Category(UnitTest.class)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3087c86f/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java
deleted file mode 100644
index e9972a5..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/offheap/ByteArrayMemoryChunkJUnitTest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.offheap;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class ByteArrayMemoryChunkJUnitTest extends MemoryChunkJUnitTestBase {
- @Override
- protected MemoryChunk createChunk(int size) {
- return new ByteArrayMemoryChunk(size);
- }
-
-}