You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/11/30 13:34:50 UTC

[06/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ea69be62
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ea69be62
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ea69be62

Branch: refs/heads/cassandra-3.11
Commit: ea69be62c84e51bbfa465204a8d4373a0d553553
Parents: f00e431 8cb9693
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Nov 30 14:30:45 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Nov 30 14:30:45 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Memtable.java  |  4 -
 .../db/partitions/AtomicBTreePartition.java     | 20 -----
 .../apache/cassandra/utils/memory/HeapPool.java | 84 ++++++--------------
 .../utils/memory/MemtableAllocator.java         | 36 ---------
 .../cassandra/utils/memory/NativeAllocator.java |  6 --
 .../cassandra/utils/memory/SlabAllocator.java   |  5 --
 .../org/apache/cassandra/tools/ToolsTester.java |  2 +-
 8 files changed, 26 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b238018,58a29e7..ea91a4e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -111,12 -2,23 +111,13 @@@ Merged from 3.0
   * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
   * AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
   * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+  * Reenable HeapPool (CASSANDRA-12900)
 -Merged from 2.2:
 - * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
 - * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
 - * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
 -
 -
 -3.0.10
 - * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
 - * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
   * Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
 - * Fix partition count log during compaction (CASSANDRA-12184)
   * Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
   * Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
 - * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
   * Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
 - * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
 + * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
 + * Avoid deadlock due to MV lock contention (CASSANDRA-12689)
   * Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
   * Include SSTable filename in compacting large row message (CASSANDRA-12384)
   * Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index c7113d4,7f2de82..c9c6006
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@@ -366,24 -303,9 +363,8 @@@ public class AtomicBTreePartition exten
              this.dataSize = 0;
              this.heapSize = 0;
              if (inserted != null)
-             {
-                 for (Row row : inserted)
-                     abort(row);
                  inserted.clear();
-             }
-             reclaimer.cancel();
-         }
- 
-         protected void abort(Row abort)
-         {
-             reclaimer.reclaimImmediately(abort);
          }
- 
-         protected void discard(Row discard)
-         {
-             reclaimer.reclaim(discard);
-         }
--
          public boolean abortEarly()
          {
              return updating.ref != ref;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 46f4111,57242c4..abcc241
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@@ -25,68 -29,27 +29,28 @@@ public class HeapPool extends MemtableP
          super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
      }
  
 -    public boolean needToCopyOnHeap()
 -    {
 -        return false;
 -    }
 -
      public MemtableAllocator newAllocator()
      {
-         // TODO
-         throw new UnsupportedOperationException();
-         //return new Allocator(this);
+         return new Allocator(this);
      }
  
-     // TODO
-     //public static class Allocator extends MemtableBufferAllocator
-     //{
-     //    Allocator(HeapPool pool)
-     //    {
-     //        super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
-     //    }
- 
-     //    public ByteBuffer allocate(int size, OpOrder.Group opGroup)
-     //    {
-     //        super.onHeap().allocate(size, opGroup);
-     //        return ByteBuffer.allocate(size);
-     //    }
- 
-     //    public DataReclaimer reclaimer()
-     //    {
-     //        return new Reclaimer();
-     //    }
- 
-     //    private class Reclaimer implements DataReclaimer
-     //    {
-     //        List<Cell> delayed;
- 
-     //        public Reclaimer reclaim(Cell cell)
-     //        {
-     //            if (delayed == null)
-     //                delayed = new ArrayList<>();
-     //            delayed.add(cell);
-     //            return this;
-     //        }
- 
-     //        public Reclaimer reclaimImmediately(Cell cell)
-     //        {
-     //            onHeap().release(cell.name().dataSize() + cell.value().remaining());
-     //            return this;
-     //        }
- 
-     //        public Reclaimer reclaimImmediately(DecoratedKey key)
-     //        {
-     //            onHeap().release(key.getKey().remaining());
-     //            return this;
-     //        }
- 
-     //        public void cancel()
-     //        {
-     //            if (delayed != null)
-     //                delayed.clear();
-     //        }
- 
-     //        public void commit()
-     //        {
-     //            if (delayed != null)
-     //                for (Cell cell : delayed)
-     //                    reclaimImmediately(cell);
-     //        }
-     //    }
-     //}
+     private static class Allocator extends MemtableBufferAllocator
+     {
++        private static final EnsureOnHeap ENSURE_NOOP = new EnsureOnHeap.NoOp();
+         Allocator(HeapPool pool)
+         {
+             super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
+         }
+ 
+         public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+         {
+             super.onHeap().allocate(size, opGroup);
+             return ByteBuffer.allocate(size);
+         }
++
++        public EnsureOnHeap ensureOnHeap()
++        {
++            return ENSURE_NOOP;
++        }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index b326af7,fa547ce..8635dd6
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@@ -60,8 -61,6 +60,7 @@@ public abstract class MemtableAllocato
  
      public abstract Row.Builder rowBuilder(OpOrder.Group opGroup);
      public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup);
-     public abstract DataReclaimer reclaimer();
 +    public abstract EnsureOnHeap ensureOnHeap();
  
      public SubAllocator onHeap()
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 5bdaf08,67f2a36..61e8407
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@@ -98,17 -74,6 +98,11 @@@ public class NativeAllocator extends Me
          return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey());
      }
  
-     @Override
-     public MemtableAllocator.DataReclaimer reclaimer()
-     {
-         return NO_OP;
-     }
- 
 +    public EnsureOnHeap ensureOnHeap()
 +    {
 +        return cloneToHeap;
 +    }
 +
      public long allocate(int size, OpOrder.Group opGroup)
      {
          assert size >= 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/test/unit/org/apache/cassandra/tools/ToolsTester.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/tools/ToolsTester.java
index 5094a53,0000000..97b19c9
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/tools/ToolsTester.java
+++ b/test/unit/org/apache/cassandra/tools/ToolsTester.java
@@@ -1,296 -1,0 +1,296 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.tools;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.lang.management.ManagementFactory;
 +import java.lang.management.ThreadInfo;
 +import java.lang.management.ThreadMXBean;
 +import java.lang.reflect.InvocationTargetException;
 +import java.lang.reflect.Method;
 +import java.security.Permission;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Set;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.junit.BeforeClass;
 +
 +import org.slf4j.LoggerFactory;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertTrue;
 +import static org.junit.Assert.fail;
 +
 +/**
 + * Base unit test class for standalone tools
 + */
 +public abstract class ToolsTester
 +{
 +    private static List<ThreadInfo> initialThreads;
 +
 +    static final String[] EXPECTED_THREADS_WITH_SCHEMA = {
-     "NativePoolCleaner",
++    "(NativePool|SlabPool|HeapPool)Cleaner",
 +    "COMMIT-LOG-ALLOCATOR",
 +    "COMMIT-LOG-WRITER",
 +    "PerDiskMemtableFlushWriter_0:[1-9]",
 +    "MemtablePostFlush:[1-9]",
 +    "MemtableFlushWriter:[1-9]",
 +    "MemtableReclaimMemory:[1-9]",
 +    };
 +    static final String[] OPTIONAL_THREADS_WITH_SCHEMA = {
 +    "ScheduledTasks:[1-9]",
 +    "OptionalTasks:[1-9]",
 +    "Reference-Reaper:[1-9]",
 +    "LocalPool-Cleaner:[1-9]",
 +    "CacheCleanupExecutor:[1-9]",
 +    "CompactionExecutor:[1-9]",
 +    "ValidationExecutor:[1-9]",
 +    "NonPeriodicTasks:[1-9]",
 +    "Sampler:[1-9]",
 +    "SecondaryIndexManagement:[1-9]",
 +    "Strong-Reference-Leak-Detector:[1-9]",
 +    "Background_Reporter:[1-9]",
 +    "EXPIRING-MAP-REAPER:[1-9]",
 +    };
 +
 +    public void assertNoUnexpectedThreadsStarted(String[] expectedThreadNames, String[] optionalThreadNames)
 +    {
 +        ThreadMXBean threads = ManagementFactory.getThreadMXBean();
 +
 +        Set<String> initial = initialThreads
 +                              .stream()
 +                              .map(ThreadInfo::getThreadName)
 +                              .collect(Collectors.toSet());
 +
 +        Set<String> current = Arrays.stream(threads.getThreadInfo(threads.getAllThreadIds()))
 +                                    .map(ThreadInfo::getThreadName)
 +                                    .collect(Collectors.toSet());
 +
 +        List<Pattern> expected = expectedThreadNames != null
 +                                 ? Arrays.stream(expectedThreadNames).map(Pattern::compile).collect(Collectors.toList())
 +                                 : Collections.emptyList();
 +
 +        List<Pattern> optional = optionalThreadNames != null
 +                                 ? Arrays.stream(optionalThreadNames).map(Pattern::compile).collect(Collectors.toList())
 +                                 : Collections.emptyList();
 +
 +        current.removeAll(initial);
 +
 +        List<Pattern> notPresent = expected.stream()
 +                                           .filter(threadNamePattern -> !current.stream().anyMatch(threadName -> threadNamePattern.matcher(threadName).matches()))
 +                                           .collect(Collectors.toList());
 +
 +        Set<String> remain = current.stream()
 +                                    .filter(threadName -> expected.stream().anyMatch(pattern -> pattern.matcher(threadName).matches()))
 +                                    .filter(threadName -> optional.stream().anyMatch(pattern -> pattern.matcher(threadName).matches()))
 +                                    .collect(Collectors.toSet());
 +
 +        if (!current.isEmpty())
 +            System.err.println("Unexpected thread names: " + remain);
 +        if (!notPresent.isEmpty())
 +            System.err.println("Mandatory thread missing: " + notPresent);
 +
 +        assertTrue("Wrong thread status", remain.isEmpty() && notPresent.isEmpty());
 +    }
 +
 +    public void assertSchemaNotLoaded()
 +    {
 +        assertClassNotLoaded("org.apache.cassandra.config.Schema");
 +    }
 +
 +    public void assertSchemaLoaded()
 +    {
 +        assertClassLoaded("org.apache.cassandra.config.Schema");
 +    }
 +
 +    public void assertKeyspaceNotLoaded()
 +    {
 +        assertClassNotLoaded("org.apache.cassandra.db.Keyspace");
 +    }
 +
 +    public void assertKeyspaceLoaded()
 +    {
 +        assertClassLoaded("org.apache.cassandra.db.Keyspace");
 +    }
 +
 +    public void assertServerNotLoaded()
 +    {
 +        assertClassNotLoaded("org.apache.cassandra.transport.Server");
 +    }
 +
 +    public void assertSystemKSNotLoaded()
 +    {
 +        assertClassNotLoaded("org.apache.cassandra.db.SystemKeyspace");
 +    }
 +
 +    public void assertCLSMNotLoaded()
 +    {
 +        assertClassNotLoaded("org.apache.cassandra.db.commitlog.CommitLogSegmentManager");
 +    }
 +
 +    public void assertClassLoaded(String clazz)
 +    {
 +        assertClassLoadedStatus(clazz, true);
 +    }
 +
 +    public void assertClassNotLoaded(String clazz)
 +    {
 +        assertClassLoadedStatus(clazz, false);
 +    }
 +
 +    private void assertClassLoadedStatus(String clazz, boolean expected)
 +    {
 +        for (ClassLoader cl = Thread.currentThread().getContextClassLoader(); cl != null; cl = cl.getParent())
 +        {
 +            try
 +            {
 +                Method mFindLoadedClass = ClassLoader.class.getDeclaredMethod("findLoadedClass", String.class);
 +                mFindLoadedClass.setAccessible(true);
 +                boolean loaded = mFindLoadedClass.invoke(cl, clazz) != null;
 +
 +                if (expected)
 +                {
 +                    if (loaded)
 +                        return;
 +                }
 +                else
 +                    assertFalse(clazz + " has been loaded", loaded);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +
 +        if (expected)
 +            fail(clazz + " has not been loaded");
 +    }
 +
 +    public void runTool(int expectedExitCode, String clazz, String... args)
 +    {
 +        try
 +        {
 +            // install security manager to get informed about the exit-code
 +            System.setSecurityManager(new SecurityManager()
 +            {
 +                public void checkExit(int status)
 +                {
 +                    throw new SystemExitException(status);
 +                }
 +
 +                public void checkPermission(Permission perm)
 +                {
 +                }
 +
 +                public void checkPermission(Permission perm, Object context)
 +                {
 +                }
 +            });
 +
 +            try
 +            {
 +                Class.forName(clazz).getDeclaredMethod("main", String[].class).invoke(null, (Object) args);
 +            }
 +            catch (InvocationTargetException e)
 +            {
 +                Throwable cause = e.getCause();
 +                if (cause instanceof Error)
 +                    throw (Error) cause;
 +                if (cause instanceof RuntimeException)
 +                    throw (RuntimeException) cause;
 +                throw e;
 +            }
 +
 +            assertEquals("Unexpected exit code", expectedExitCode, 0);
 +        }
 +        catch (SystemExitException e)
 +        {
 +            assertEquals("Unexpected exit code", expectedExitCode, e.status);
 +        }
 +        catch (InvocationTargetException e)
 +        {
 +            throw new RuntimeException(e.getTargetException());
 +        }
 +        catch (Exception e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +        finally
 +        {
 +            // uninstall security manager
 +            System.setSecurityManager(null);
 +        }
 +    }
 +
 +    @BeforeClass
 +    public static void setupTester()
 +    {
 +        System.setProperty("cassandra.partitioner", "org.apache.cassandra.dht.Murmur3Partitioner");
 +
 +        // may start an async appender
 +        LoggerFactory.getLogger(ToolsTester.class);
 +
 +        ThreadMXBean threads = ManagementFactory.getThreadMXBean();
 +        initialThreads = Arrays.asList(threads.getThreadInfo(threads.getAllThreadIds()));
 +    }
 +
 +    public static class SystemExitException extends Error
 +    {
 +        public final int status;
 +
 +        public SystemExitException(int status)
 +        {
 +            this.status = status;
 +        }
 +    }
 +
 +    public static String findOneSSTable(String ks, String cf) throws IOException
 +    {
 +        File cfDir = sstableDir(ks, cf);
 +        File[] sstableFiles = cfDir.listFiles((file) -> file.isFile() && file.getName().endsWith("-Data.db"));
 +        return sstableFiles[0].getAbsolutePath();
 +    }
 +
 +    public static String sstableDirName(String ks, String cf) throws IOException
 +    {
 +        return sstableDir(ks, cf).getAbsolutePath();
 +    }
 +
 +    public static File sstableDir(String ks, String cf) throws IOException
 +    {
 +        File dataDir = copySSTables();
 +        File ksDir = new File(dataDir, ks);
 +        File[] cfDirs = ksDir.listFiles((dir, name) -> cf.equals(name) || name.startsWith(cf + '-'));
 +        return cfDirs[0];
 +    }
 +
 +    public static File copySSTables() throws IOException
 +    {
 +        File dataDir = new File("build/test/cassandra/data");
 +        File srcDir = new File("test/data/legacy-sstables/ma");
 +        FileUtils.copyDirectory(new File(srcDir, "legacy_tables"), new File(dataDir, "legacy_sstables"));
 +        return dataDir;
 +    }
 +}