You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/02/13 16:59:23 UTC

[2/3] cassandra git commit: Introduce SafeMemory for CompressionMetadata.Writer

Introduce SafeMemory for CompressionMetadata.Writer

patch by benedict; reviewed by marcus for CASSANDRA-8758


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/746c4585
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/746c4585
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/746c4585

Branch: refs/heads/trunk
Commit: 746c45853827eefaf9254b772c3dc9e72662c986
Parents: 2a167ea
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Fri Feb 13 15:54:52 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Feb 13 15:54:52 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../io/compress/CompressionMetadata.java        | 30 ++----
 .../org/apache/cassandra/io/util/Memory.java    | 28 +++++-
 .../apache/cassandra/io/util/SafeMemory.java    | 98 ++++++++++++++++++++
 .../apache/cassandra/utils/concurrent/Ref.java  | 10 ++
 5 files changed, 143 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bca3dc8..0d204a5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
  * 'nodetool info' prints exception against older node (CASSANDRA-8796)
  * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750)
  * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index fd8248e..b29e259 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemory;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -135,7 +136,7 @@ public class CompressionMetadata
         this.chunkOffsetsSize = chunkOffsets.size();
     }
 
-    private CompressionMetadata(String filePath, CompressionParameters parameters, RefCountedMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
+    private CompressionMetadata(String filePath, CompressionParameters parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength, boolean hasPostCompressionAdlerChecksums)
     {
         this.indexFilePath = filePath;
         this.parameters = parameters;
@@ -143,7 +144,6 @@ public class CompressionMetadata
         this.compressedFileLength = compressedLength;
         this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums;
         this.chunkOffsets = offsets;
-        offsets.reference();
         this.chunkOffsetsSize = offsetsSize;
     }
 
@@ -261,10 +261,7 @@ public class CompressionMetadata
 
     public void close()
     {
-        if (chunkOffsets instanceof RefCountedMemory)
-            ((RefCountedMemory) chunkOffsets).unreference();
-        else
-            chunkOffsets.free();
+        chunkOffsets.close();
     }
 
     public static class Writer
@@ -273,7 +270,7 @@ public class CompressionMetadata
         private final CompressionParameters parameters;
         private final String filePath;
         private int maxCount = 100;
-        private RefCountedMemory offsets = new RefCountedMemory(maxCount * 8);
+        private SafeMemory offsets = new SafeMemory(maxCount * 8);
         private int count = 0;
 
         private Writer(CompressionParameters parameters, String path)
@@ -291,8 +288,8 @@ public class CompressionMetadata
         {
             if (count == maxCount)
             {
-                RefCountedMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
-                offsets.unreference();
+                SafeMemory newOffsets = offsets.copy((maxCount *= 2) * 8);
+                offsets.close();
                 offsets = newOffsets;
             }
             offsets.setLong(8 * count++, offset);
@@ -336,7 +333,7 @@ public class CompressionMetadata
 
         public CompressionMetadata open(long dataLength, long compressedLength, OpenType type)
         {
-            RefCountedMemory offsets = this.offsets;
+            SafeMemory offsets = this.offsets;
             int count = this.count;
             switch (type)
             {
@@ -348,17 +345,14 @@ public class CompressionMetadata
                         // release our reference to the original shared data;
                         // we don't do this if not resizing since we must pass out existing
                         // reference onto our caller
-                        this.offsets.unreference();
+                        this.offsets.free();
                     }
                     // null out our reference to the original shared data to catch accidental reuse
                     // note that since noone is writing to this Writer while we open it, null:ing out this.offsets is safe
                     this.offsets = null;
                     if (type == OpenType.SHARED_FINAL)
-                    {
                         // we will use the data again, so stash our resized data back, and take an extra reference to it
-                        this.offsets = offsets;
-                        this.offsets.reference();
-                    }
+                        this.offsets = offsets.sharedCopy();
                     break;
 
                 case SHARED:
@@ -417,11 +411,7 @@ public class CompressionMetadata
 
         public void abort()
         {
-            if (offsets != null)
-            {
-                offsets.unreference();
-                offsets = null;
-            }
+            offsets.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/io/util/Memory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/Memory.java b/src/java/org/apache/cassandra/io/util/Memory.java
index a02ea15..2e7f28f 100644
--- a/src/java/org/apache/cassandra/io/util/Memory.java
+++ b/src/java/org/apache/cassandra/io/util/Memory.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
 import com.sun.jna.Native;
+import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -34,7 +35,7 @@ import sun.nio.ch.DirectBuffer;
 public class Memory implements AutoCloseable
 {
     private static final Unsafe unsafe = NativeAllocator.unsafe;
-    private static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
+    static final IAllocator allocator = DatabaseDescriptor.getoffHeapMemoryAllocator();
     private static final long BYTE_ARRAY_BASE_OFFSET = unsafe.arrayBaseOffset(byte[].class);
 
     private static final boolean bigEndian = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
@@ -49,7 +50,7 @@ public class Memory implements AutoCloseable
 
     protected long peer;
     // size of the memory region
-    private final long size;
+    protected final long size;
 
     protected Memory(long bytes)
     {
@@ -59,6 +60,14 @@ public class Memory implements AutoCloseable
             throw new OutOfMemoryError();
     }
 
+    // create a memory object that references the exacy same memory location as the one provided.
+    // this should ONLY be used by SafeMemory
+    protected Memory(Memory copyOf)
+    {
+        size = copyOf.size;
+        peer = copyOf.peer;
+    }
+
     public static Memory allocate(long bytes)
     {
         if (bytes < 0)
@@ -280,7 +289,8 @@ public class Memory implements AutoCloseable
         FastByteOperations.UnsafeOperations.copy(null, peer + memoryOffset, buffer, bufferOffset, count);
     }
 
-    private void checkPosition(long offset)
+    @Inline
+    protected void checkPosition(long offset)
     {
         assert peer != 0 : "Memory was freed";
         assert offset >= 0 && offset < size : "Illegal offset: " + offset + ", size: " + size;
@@ -305,7 +315,7 @@ public class Memory implements AutoCloseable
         peer = 0;
     }
 
-    public void close() throws Exception
+    public void close()
     {
         free();
     }
@@ -345,4 +355,14 @@ public class Memory implements AutoCloseable
         result[result.length - 1] = MemoryUtil.getByteBuffer(peer + offset, (int) (size() - offset));
         return result;
     }
+
+    public String toString()
+    {
+        return toString(peer, size);
+    }
+
+    protected static String toString(long peer, long size)
+    {
+        return String.format("Memory@[%x..%x)", peer, peer + size);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/io/util/SafeMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemory.java b/src/java/org/apache/cassandra/io/util/SafeMemory.java
new file mode 100644
index 0000000..d722348
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/SafeMemory.java
@@ -0,0 +1,98 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.util;
+
+import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SharedCloseable;
+
+public class SafeMemory extends Memory implements SharedCloseable
+{
+    private final Ref<?> ref;
+    public SafeMemory(long size)
+    {
+        super(size);
+        ref = new Ref<>(null, new MemoryTidy(peer, size));
+    }
+
+    private SafeMemory(SafeMemory copyOf)
+    {
+        super(copyOf);
+        ref = copyOf.ref.ref();
+        /** see {@link Memory#Memory(long)} re: null pointers*/
+        if (peer == 0 && size != 0)
+        {
+            ref.ensureReleased();
+            throw new IllegalStateException("Cannot create a sharedCopy of a SafeMemory object that has already been closed");
+        }
+    }
+
+    public SafeMemory sharedCopy()
+    {
+        return new SafeMemory(this);
+    }
+
+    public void free()
+    {
+        ref.release();
+    }
+
+    public void close()
+    {
+        ref.ensureReleased();
+    }
+
+    public SafeMemory copy(long newSize)
+    {
+        SafeMemory copy = new SafeMemory(newSize);
+        copy.put(0, this, 0, Math.min(size(), newSize));
+        return copy;
+    }
+
+    private static final class MemoryTidy implements RefCounted.Tidy
+    {
+        final long peer;
+        final long size;
+        private MemoryTidy(long peer, long size)
+        {
+            this.peer = peer;
+            this.size = size;
+        }
+
+        public void tidy() throws Exception
+        {
+            /** see {@link Memory#Memory(long)} re: null pointers*/
+            if (peer != 0)
+                Memory.allocator.free(peer);
+        }
+
+        public String name()
+        {
+            return Memory.toString(peer, size);
+        }
+    }
+
+    @Inline
+    protected void checkPosition(long offset)
+    {
+        assert peer != 0 || size == 0 : ref.printDebugInfo();
+        super.checkPosition(offset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/746c4585/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index ad1293b..dd04051 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -107,6 +107,16 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
         return ref;
     }
 
+    public String printDebugInfo()
+    {
+        if (DEBUG_ENABLED)
+        {
+            state.debug.log(state.toString());
+            return "Memory was freed by " + state.debug.deallocateThread;
+        }
+        return "Memory was freed";
+    }
+
     /**
      * A convenience method for reporting:
      * @return the number of currently extant references globally, including the shared reference