You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/13 16:59:23 UTC
[2/3] cassandra git commit: Introduce SafeMemory for
CompressionMetadata.Writer
Introduce SafeMemory for CompressionMetadata.Writer
patch by benedict; reviewed by marcus for CASSANDRA-8758
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/746c4585
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/746c4585
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/746c4585
Branch: refs/heads/trunk
Commit: 746c45853827eefaf9254b772c3dc9e72662c986
Parents: 2a167ea
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Feb 13 15:54:52 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Feb 13 15:54:52 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../io/compress/CompressionMetadata.java | 30 ++----
.../org/apache/cassandra/io/util/Memory.java | 28 +++++-
.../apache/cassandra/io/util/SafeMemory.java | 98 ++++++++++++++++++++
.../apache/cassandra/utils/concurrent/Ref.java | 10 ++
5 files changed, 143 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bca3dc8..0d204a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.4
+ * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
* 'nodetool info' prints exception against older node (CASSANDRA-8796)
* Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
* Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index fd8248e..b29e259 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemory;
import org.apache.cassandra.utils.Pair;
/**
@@ -135,7 +136,7 @@ public class CompressionMetadata
this.chunkOffsetsSize = chunkOffsets.size();
}
- private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+ private CompressionMetadata(String filePath, CompressionParameters parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
{
this.indexFilePath = filePath;
this.parameters = parameters;
@@ -143,7 +144,6 @@ public class CompressionMetadata
this.compressedFileLength = compressedLength;
this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
this.chunkOffsets = offsets;
- offsets.reference();
this.chunkOffsetsSize = offsetsSize;
}
@@ -261,10 +261,7 @@ public class CompressionMetadata
public void close()
{
- if (chunkOffsets instanceof RefCountedMemory)
- ((RefCountedMemory) chunkOffsets).unreference();
- else
- chunkOffsets.free();
+ chunkOffsets.close();
}
public static class Writer
@@ -273,7 +270,7 @@ public class CompressionMetadata
private final CompressionParameters parameters;
private final String filePath;
private int maxCount = 100;
- private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8);
+ private SafeMemory offsets = new SafeMemory(maxCount * 8);
private int count = 0;
private Writer(CompressionParameters parameters, String path)
@@ -291,8 +288,8 @@ public class CompressionMetadata
{
if (count == maxCount)
{
- RefCountedMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
- offsets.unreference();
+ SafeMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
+ offsets.close();
offsets = newOffsets;
}
offsets.setLong(8 * count++, offset);
@@ -336,7 +333,7 @@ public class CompressionMetadata
public CompressionMetadata open(long dataLength, long compressedLength, OpenType type)
{
- RefCountedMemory offsets = this.offsets;
+ SafeMemory offsets = this.offsets;
int count = this.count;
switch (type)
{
@@ -348,17 +345,14 @@ public class CompressionMetadata
// release our reference to the original shared data;
// we don't do this if not resizing since we must pass out existing
// reference onto our caller
- this.offsets.unreference();
+ this.offsets.free();
}
// null out our reference to the original shared data to catch accidental reuse
// note that since noone is writing to this Writer while we open it, null:ing out this.offsets is safe
this.offsets = null;
if (type == OpenType.SHARED_FINAL)
- {
// we will use the data again, so stash our resized data back, and take an extra reference to it
- this.offsets = offsets;
- this.offsets.reference();
- }
+ this.offsets = offsets.sharedCopy();
break;
case SHARED:
@@ -417,11 +411,7 @@ public class CompressionMetadata
public void abort()
{
- if (offsets != null)
- {
- offsets.unreference();
- offsets = null;
- }
+ offsets.close();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index a02ea15..2e7f28f 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import com.sun.jna.Native;
+import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -34,7 +35,7 @@ import sun.nio.ch.DirectBuffer;
public class Memory implements AutoCloseable
{
private static final Unsafe unsafe = NativeAllocator.unsafe;
- private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
+ static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
private static final long BYTE_ARRAY_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class);
private static final boolean bigEndian = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
@@ -49,7 +50,7 @@ public class Memory implements AutoCloseable
protected long peer;
// size of the memory region
- private final long size;
+ protected final long size;
protected Memory(long bytes)
{
@@ -59,6 +60,14 @@ public class Memory implements AutoCloseable
throw new OutOfMemoryError();
}
+ // create a memory object that references the exacy same memory location as the one provided.
+ // this should ONLY be used by SafeMemory
+ protected Memory(Memory copyOf)
+ {
+ size = copyOf.size;
+ peer = copyOf.peer;
+ }
+
public static Memory allocate(long bytes)
{
if (bytes < 0)
@@ -280,7 +289,8 @@ public class Memory implements AutoCloseable
FastByteOperations.UnsafeOperations.copy(null, peer + memoryOffset, buffer, bufferOffset, count);
}
- private void checkPosition(long offset)
+ @Inline
+ protected void checkPosition(long offset)
{
assert peer != 0 : "Memory was freed";
assert offset >= 0 && offset < size : "Illegal offset: " + offset + ", size: " + size;
@@ -305,7 +315,7 @@ public class Memory implements AutoCloseable
peer = 0;
}
- public void close() throws Exception
+ public void close()
{
free();
}
@@ -345,4 +355,14 @@ public class Memory implements AutoCloseable
result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) (size() - offset));
return result;
}
+
+ public String toString()
+ {
+ return toString(peer, size);
+ }
+
+ protected static String toString(long peer, long size)
+ {
+ return String.format("Memory@[%x..%x)", peer, peer + size);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/io/util/SafeMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemory.java b/src/java/org/apache/cassandra/io/util/SafeMemory.java
new file mode 100644
index 0000000..d722348
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SafeMemory.java
@@ -0,0 +1,98 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SharedCloseable;
+
+public class SafeMemory extends Memory implements SharedCloseable
+{
+ private final Ref<?> ref;
+ public SafeMemory(long size)
+ {
+ super(size);
+ ref = new Ref<>(null, new MemoryTidy(peer, size));
+ }
+
+ private SafeMemory(SafeMemory copyOf)
+ {
+ super(copyOf);
+ ref = copyOf.ref.ref();
+ /** see {@link Memory#Memory(long)} re: null pointers*/
+ if (peer == 0 && size != 0)
+ {
+ ref.ensureReleased();
+ throw new IllegalStateException("Cannot create a sharedCopy of a SafeMemory object that has already been closed");
+ }
+ }
+
+ public SafeMemory sharedCopy()
+ {
+ return new SafeMemory(this);
+ }
+
+ public void free()
+ {
+ ref.release();
+ }
+
+ public void close()
+ {
+ ref.ensureReleased();
+ }
+
+ public SafeMemory copy(long newSize)
+ {
+ SafeMemory copy = new SafeMemory(newSize);
+ copy.put(0, this, 0, Math.min(size(), newSize));
+ return copy;
+ }
+
+ private static final class MemoryTidy implements RefCounted.Tidy
+ {
+ final long peer;
+ final long size;
+ private MemoryTidy(long peer, long size)
+ {
+ this.peer = peer;
+ this.size = size;
+ }
+
+ public void tidy() throws Exception
+ {
+ /** see {@link Memory#Memory(long)} re: null pointers*/
+ if (peer != 0)
+ Memory.allocator.free(peer);
+ }
+
+ public String name()
+ {
+ return Memory.toString(peer, size);
+ }
+ }
+
+ @Inline
+ protected void checkPosition(long offset)
+ {
+ assert peer != 0 || size == 0 : ref.printDebugInfo();
+ super.checkPosition(offset);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index ad1293b..dd04051 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -107,6 +107,16 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
return ref;
}
+ public String printDebugInfo()
+ {
+ if (DEBUG_ENABLED)
+ {
+ state.debug.log(state.toString());
+ return "Memory was freed by " + state.debug.deallocateThread;
+ }
+ return "Memory was freed";
+ }
+
/**
* A convenience method for reporting:
* @return the number of currently extant references globally, including the shared reference