You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/11/30 13:34:50 UTC
[06/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ea69be62
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ea69be62
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ea69be62
Branch: refs/heads/cassandra-3.11
Commit: ea69be62c84e51bbfa465204a8d4373a0d553553
Parents: f00e431 8cb9693
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Nov 30 14:30:45 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Nov 30 14:30:45 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Memtable.java | 4 -
.../db/partitions/AtomicBTreePartition.java | 20 -----
.../apache/cassandra/utils/memory/HeapPool.java | 84 ++++++--------------
.../utils/memory/MemtableAllocator.java | 36 ---------
.../cassandra/utils/memory/NativeAllocator.java | 6 --
.../cassandra/utils/memory/SlabAllocator.java | 5 --
.../org/apache/cassandra/tools/ToolsTester.java | 2 +-
8 files changed, 26 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b238018,58a29e7..ea91a4e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -111,12 -2,23 +111,13 @@@ Merged from 3.0
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
* AnticompactionRequestSerializer serializedSize is incorrect (CASSANDRA-12934)
* Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
+ * Reenable HeapPool (CASSANDRA-12900)
-Merged from 2.2:
- * cqlsh: fix DESC TYPES errors (CASSANDRA-12914)
- * Fix leak on skipped SSTables in sstableupgrade (CASSANDRA-12899)
- * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
-
-
-3.0.10
- * Disallow offheap_buffers memtable allocation (CASSANDRA-11039)
- * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
* Pass root cause to CorruptBlockException when uncompression failed (CASSANDRA-12889)
- * Fix partition count log during compaction (CASSANDRA-12184)
* Batch with multiple conditional updates for the same partition causes AssertionError (CASSANDRA-12867)
* Make AbstractReplicationStrategy extendable from outside its package (CASSANDRA-12788)
- * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
* Don't tell users to turn off consistent rangemovements during rebuild. (CASSANDRA-12296)
- * Avoid deadlock due to materialized view lock contention (CASSANDRA-12689)
+ * Fix CommitLogTest.testDeleteIfNotDirty (CASSANDRA-12854)
+ * Avoid deadlock due to MV lock contention (CASSANDRA-12689)
* Fix for KeyCacheCqlTest flakiness (CASSANDRA-12801)
* Include SSTable filename in compacting large row message (CASSANDRA-12384)
* Fix potential socket leak (CASSANDRA-12329, CASSANDRA-12330)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index c7113d4,7f2de82..c9c6006
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@@ -366,24 -303,9 +363,8 @@@ public class AtomicBTreePartition exten
this.dataSize = 0;
this.heapSize = 0;
if (inserted != null)
- {
- for (Row row : inserted)
- abort(row);
inserted.clear();
- }
- reclaimer.cancel();
- }
-
- protected void abort(Row abort)
- {
- reclaimer.reclaimImmediately(abort);
}
-
- protected void discard(Row discard)
- {
- reclaimer.reclaim(discard);
- }
--
public boolean abortEarly()
{
return updating.ref != ref;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 46f4111,57242c4..abcc241
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@@ -25,68 -29,27 +29,28 @@@ public class HeapPool extends MemtableP
super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
}
- public boolean needToCopyOnHeap()
- {
- return false;
- }
-
public MemtableAllocator newAllocator()
{
- // TODO
- throw new UnsupportedOperationException();
- //return new Allocator(this);
+ return new Allocator(this);
}
- // TODO
- //public static class Allocator extends MemtableBufferAllocator
- //{
- // Allocator(HeapPool pool)
- // {
- // super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
- // }
-
- // public ByteBuffer allocate(int size, OpOrder.Group opGroup)
- // {
- // super.onHeap().allocate(size, opGroup);
- // return ByteBuffer.allocate(size);
- // }
-
- // public DataReclaimer reclaimer()
- // {
- // return new Reclaimer();
- // }
-
- // private class Reclaimer implements DataReclaimer
- // {
- // List<Cell> delayed;
-
- // public Reclaimer reclaim(Cell cell)
- // {
- // if (delayed == null)
- // delayed = new ArrayList<>();
- // delayed.add(cell);
- // return this;
- // }
-
- // public Reclaimer reclaimImmediately(Cell cell)
- // {
- // onHeap().release(cell.name().dataSize() + cell.value().remaining());
- // return this;
- // }
-
- // public Reclaimer reclaimImmediately(DecoratedKey key)
- // {
- // onHeap().release(key.getKey().remaining());
- // return this;
- // }
-
- // public void cancel()
- // {
- // if (delayed != null)
- // delayed.clear();
- // }
-
- // public void commit()
- // {
- // if (delayed != null)
- // for (Cell cell : delayed)
- // reclaimImmediately(cell);
- // }
- // }
- //}
+ private static class Allocator extends MemtableBufferAllocator
+ {
++ private static final EnsureOnHeap ENSURE_NOOP = new EnsureOnHeap.NoOp();
+ Allocator(HeapPool pool)
+ {
+ super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
+ }
+
+ public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+ {
+ super.onHeap().allocate(size, opGroup);
+ return ByteBuffer.allocate(size);
+ }
++
++ public EnsureOnHeap ensureOnHeap()
++ {
++ return ENSURE_NOOP;
++ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index b326af7,fa547ce..8635dd6
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@@ -60,8 -61,6 +60,7 @@@ public abstract class MemtableAllocato
public abstract Row.Builder rowBuilder(OpOrder.Group opGroup);
public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup);
- public abstract DataReclaimer reclaimer();
+ public abstract EnsureOnHeap ensureOnHeap();
public SubAllocator onHeap()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index 5bdaf08,67f2a36..61e8407
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@@ -98,17 -74,6 +98,11 @@@ public class NativeAllocator extends Me
return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey());
}
- @Override
- public MemtableAllocator.DataReclaimer reclaimer()
- {
- return NO_OP;
- }
-
+ public EnsureOnHeap ensureOnHeap()
+ {
+ return cloneToHeap;
+ }
+
public long allocate(int size, OpOrder.Group opGroup)
{
assert size >= 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ea69be62/test/unit/org/apache/cassandra/tools/ToolsTester.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/tools/ToolsTester.java
index 5094a53,0000000..97b19c9
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/tools/ToolsTester.java
+++ b/test/unit/org/apache/cassandra/tools/ToolsTester.java
@@@ -1,296 -1,0 +1,296 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Permission;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
+
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Base unit test class for standalone tools
+ */
+public abstract class ToolsTester
+{
+ private static List<ThreadInfo> initialThreads;
+
+ static final String[] EXPECTED_THREADS_WITH_SCHEMA = {
- "NativePoolCleaner",
++ "(NativePool|SlabPool|HeapPool)Cleaner",
+ "COMMIT-LOG-ALLOCATOR",
+ "COMMIT-LOG-WRITER",
+ "PerDiskMemtableFlushWriter_0:[1-9]",
+ "MemtablePostFlush:[1-9]",
+ "MemtableFlushWriter:[1-9]",
+ "MemtableReclaimMemory:[1-9]",
+ };
+ static final String[] OPTIONAL_THREADS_WITH_SCHEMA = {
+ "ScheduledTasks:[1-9]",
+ "OptionalTasks:[1-9]",
+ "Reference-Reaper:[1-9]",
+ "LocalPool-Cleaner:[1-9]",
+ "CacheCleanupExecutor:[1-9]",
+ "CompactionExecutor:[1-9]",
+ "ValidationExecutor:[1-9]",
+ "NonPeriodicTasks:[1-9]",
+ "Sampler:[1-9]",
+ "SecondaryIndexManagement:[1-9]",
+ "Strong-Reference-Leak-Detector:[1-9]",
+ "Background_Reporter:[1-9]",
+ "EXPIRING-MAP-REAPER:[1-9]",
+ };
+
+ public void assertNoUnexpectedThreadsStarted(String[] expectedThreadNames, String[] optionalThreadNames)
+ {
+ ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+
+ Set<String> initial = initialThreads
+ .stream()
+ .map(ThreadInfo::getThreadName)
+ .collect(Collectors.toSet());
+
+ Set<String> current = Arrays.stream(threads.getThreadInfo(threads.getAllThreadIds()))
+ .map(ThreadInfo::getThreadName)
+ .collect(Collectors.toSet());
+
+ List<Pattern> expected = expectedThreadNames != null
+ ? Arrays.stream(expectedThreadNames).map(Pattern::compile).collect(Collectors.toList())
+ : Collections.emptyList();
+
+ List<Pattern> optional = optionalThreadNames != null
+ ? Arrays.stream(optionalThreadNames).map(Pattern::compile).collect(Collectors.toList())
+ : Collections.emptyList();
+
+ current.removeAll(initial);
+
+ List<Pattern> notPresent = expected.stream()
+ .filter(threadNamePattern -> !current.stream().anyMatch(threadName -> threadNamePattern.matcher(threadName).matches()))
+ .collect(Collectors.toList());
+
+ Set<String> remain = current.stream()
+ .filter(threadName -> expected.stream().anyMatch(pattern -> pattern.matcher(threadName).matches()))
+ .filter(threadName -> optional.stream().anyMatch(pattern -> pattern.matcher(threadName).matches()))
+ .collect(Collectors.toSet());
+
+ if (!current.isEmpty())
+ System.err.println("Unexpected thread names: " + remain);
+ if (!notPresent.isEmpty())
+ System.err.println("Mandatory thread missing: " + notPresent);
+
+ assertTrue("Wrong thread status", remain.isEmpty() && notPresent.isEmpty());
+ }
+
+ public void assertSchemaNotLoaded()
+ {
+ assertClassNotLoaded("org.apache.cassandra.config.Schema");
+ }
+
+ public void assertSchemaLoaded()
+ {
+ assertClassLoaded("org.apache.cassandra.config.Schema");
+ }
+
+ public void assertKeyspaceNotLoaded()
+ {
+ assertClassNotLoaded("org.apache.cassandra.db.Keyspace");
+ }
+
+ public void assertKeyspaceLoaded()
+ {
+ assertClassLoaded("org.apache.cassandra.db.Keyspace");
+ }
+
+ public void assertServerNotLoaded()
+ {
+ assertClassNotLoaded("org.apache.cassandra.transport.Server");
+ }
+
+ public void assertSystemKSNotLoaded()
+ {
+ assertClassNotLoaded("org.apache.cassandra.db.SystemKeyspace");
+ }
+
+ public void assertCLSMNotLoaded()
+ {
+ assertClassNotLoaded("org.apache.cassandra.db.commitlog.CommitLogSegmentManager");
+ }
+
+ public void assertClassLoaded(String clazz)
+ {
+ assertClassLoadedStatus(clazz, true);
+ }
+
+ public void assertClassNotLoaded(String clazz)
+ {
+ assertClassLoadedStatus(clazz, false);
+ }
+
+ private void assertClassLoadedStatus(String clazz, boolean expected)
+ {
+ for (ClassLoader cl = Thread.currentThread().getContextClassLoader(); cl != null; cl = cl.getParent())
+ {
+ try
+ {
+ Method mFindLoadedClass = ClassLoader.class.getDeclaredMethod("findLoadedClass", String.class);
+ mFindLoadedClass.setAccessible(true);
+ boolean loaded = mFindLoadedClass.invoke(cl, clazz) != null;
+
+ if (expected)
+ {
+ if (loaded)
+ return;
+ }
+ else
+ assertFalse(clazz + " has been loaded", loaded);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (expected)
+ fail(clazz + " has not been loaded");
+ }
+
+ public void runTool(int expectedExitCode, String clazz, String... args)
+ {
+ try
+ {
+ // install security manager to get informed about the exit-code
+ System.setSecurityManager(new SecurityManager()
+ {
+ public void checkExit(int status)
+ {
+ throw new SystemExitException(status);
+ }
+
+ public void checkPermission(Permission perm)
+ {
+ }
+
+ public void checkPermission(Permission perm, Object context)
+ {
+ }
+ });
+
+ try
+ {
+ Class.forName(clazz).getDeclaredMethod("main", String[].class).invoke(null, (Object) args);
+ }
+ catch (InvocationTargetException e)
+ {
+ Throwable cause = e.getCause();
+ if (cause instanceof Error)
+ throw (Error) cause;
+ if (cause instanceof RuntimeException)
+ throw (RuntimeException) cause;
+ throw e;
+ }
+
+ assertEquals("Unexpected exit code", expectedExitCode, 0);
+ }
+ catch (SystemExitException e)
+ {
+ assertEquals("Unexpected exit code", expectedExitCode, e.status);
+ }
+ catch (InvocationTargetException e)
+ {
+ throw new RuntimeException(e.getTargetException());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ // uninstall security manager
+ System.setSecurityManager(null);
+ }
+ }
+
+ @BeforeClass
+ public static void setupTester()
+ {
+ System.setProperty("cassandra.partitioner", "org.apache.cassandra.dht.Murmur3Partitioner");
+
+ // may start an async appender
+ LoggerFactory.getLogger(ToolsTester.class);
+
+ ThreadMXBean threads = ManagementFactory.getThreadMXBean();
+ initialThreads = Arrays.asList(threads.getThreadInfo(threads.getAllThreadIds()));
+ }
+
+ public static class SystemExitException extends Error
+ {
+ public final int status;
+
+ public SystemExitException(int status)
+ {
+ this.status = status;
+ }
+ }
+
+ public static String findOneSSTable(String ks, String cf) throws IOException
+ {
+ File cfDir = sstableDir(ks, cf);
+ File[] sstableFiles = cfDir.listFiles((file) -> file.isFile() && file.getName().endsWith("-Data.db"));
+ return sstableFiles[0].getAbsolutePath();
+ }
+
+ public static String sstableDirName(String ks, String cf) throws IOException
+ {
+ return sstableDir(ks, cf).getAbsolutePath();
+ }
+
+ public static File sstableDir(String ks, String cf) throws IOException
+ {
+ File dataDir = copySSTables();
+ File ksDir = new File(dataDir, ks);
+ File[] cfDirs = ksDir.listFiles((dir, name) -> cf.equals(name) || name.startsWith(cf + '-'));
+ return cfDirs[0];
+ }
+
+ public static File copySSTables() throws IOException
+ {
+ File dataDir = new File("build/test/cassandra/data");
+ File srcDir = new File("test/data/legacy-sstables/ma");
+ FileUtils.copyDirectory(new File(srcDir, "legacy_tables"), new File(dataDir, "legacy_sstables"));
+ return dataDir;
+ }
+}