You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/19 18:38:16 UTC
[1/2] cassandra git commit: Move all hints related tasks to hints
internal executor
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 7c6993f82 -> 6f42afa2b
Move all hints related tasks to hints internal executor
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-8285
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351e35b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351e35b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351e35b6
Branch: refs/heads/cassandra-2.1
Commit: 351e35b64ac7f364bd4149da399f86b7f36f5ae9
Parents: 6218993
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Dec 19 20:27:32 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Dec 19 20:27:32 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DebuggableScheduledThreadPoolExecutor.java | 5 +
.../JMXEnabledScheduledThreadPoolExecutor.java | 137 +++++++++++++++++++
...EnabledScheduledThreadPoolExecutorMBean.java | 26 ++++
.../cassandra/db/HintedHandOffManager.java | 25 ++--
5 files changed, 179 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 516b4a2..1ad2de5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Move all hints related tasks to hints internal executor (CASSANDRA-8285)
* Fix paging for multi-partition IN queries (CASSANDRA-8408)
* Fix MOVED_NODE topology event never being emitted when a node
moves its token (CASSANDRA-8373)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
index a41df54..1699c0f 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
@@ -33,6 +33,11 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
}
+ public DebuggableScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
+ {
+ super(corePoolSize, threadFactory);
+ }
+
public DebuggableScheduledThreadPoolExecutor(String threadPoolName)
{
this(1, threadPoolName, Thread.NORM_PRIORITY);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
new file mode 100644
index 0000000..64d9267
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.cassandra.metrics.ThreadPoolMetrics;
+
+/**
+ * A JMX enabled wrapper for DebuggableScheduledThreadPoolExecutor.
+ */
+public class JMXEnabledScheduledThreadPoolExecutor extends DebuggableScheduledThreadPoolExecutor implements JMXEnabledScheduledThreadPoolExecutorMBean
+{
+ private final String mbeanName;
+ private final ThreadPoolMetrics metrics;
+
+ public JMXEnabledScheduledThreadPoolExecutor(int corePoolSize, NamedThreadFactory threadFactory, String jmxPath)
+ {
+ super(corePoolSize, threadFactory);
+
+ metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ mbeanName = "org.apache.cassandra." + jmxPath + ":type=" + threadFactory.id;
+
+ try
+ {
+ mbs.registerMBean(this, new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void unregisterMBean()
+ {
+ try
+ {
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(mbeanName));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ // release metrics
+ metrics.release();
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ // synchronized, because there is no way to access super.mainLock, which would be
+ // the preferred way to make this threadsafe
+ if (!isShutdown())
+ unregisterMBean();
+
+ super.shutdown();
+ }
+
+ @Override
+ public synchronized List<Runnable> shutdownNow()
+ {
+ // synchronized, because there is no way to access super.mainLock, which would be
+ // the preferred way to make this threadsafe
+ if (!isShutdown())
+ unregisterMBean();
+
+ return super.shutdownNow();
+ }
+
+ /**
+ * Get the number of completed tasks
+ */
+ public long getCompletedTasks()
+ {
+ return getCompletedTaskCount();
+ }
+
+ /**
+ * Get the number of tasks waiting to be executed
+ */
+ public long getPendingTasks()
+ {
+ return getTaskCount() - getCompletedTaskCount();
+ }
+
+ public int getTotalBlockedTasks()
+ {
+ return (int) metrics.totalBlocked.count();
+ }
+
+ public int getCurrentlyBlockedTasks()
+ {
+ return (int) metrics.currentBlocked.count();
+ }
+
+ public int getCoreThreads()
+ {
+ return getCorePoolSize();
+ }
+
+ public void setCoreThreads(int number)
+ {
+ setCorePoolSize(number);
+ }
+
+ public int getMaximumThreads()
+ {
+ return getMaximumPoolSize();
+ }
+
+ public void setMaximumThreads(int number)
+ {
+ setMaximumPoolSize(number);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
new file mode 100644
index 0000000..d9c45e3
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledScheduledThreadPoolExecutorMBean.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+/**
+ * @see org.apache.cassandra.metrics.ThreadPoolMetrics
+ */
+@Deprecated
+public interface JMXEnabledScheduledThreadPoolExecutorMBean extends JMXEnabledThreadPoolExecutorMBean
+{
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351e35b6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 87260b2..c8c3845 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -34,20 +34,16 @@ import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -106,12 +102,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
- private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(),
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
- "internal");
+ private final JMXEnabledScheduledThreadPoolExecutor executor =
+ new JMXEnabledScheduledThreadPoolExecutor(
+ DatabaseDescriptor.getMaxHintsThread(),
+ new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
+ "internal");
private final ColumnFamilyStore hintStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.HINTS_CF);
@@ -174,7 +169,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
metrics.log();
}
};
- StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
+ executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
@@ -225,7 +220,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
};
- StorageService.optionalTasks.submit(runnable);
+ executor.submit(runnable);
}
//foobar
@@ -246,7 +241,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
}
}
};
- StorageService.optionalTasks.submit(runnable).get();
+ executor.submit(runnable).get();
}
[2/2] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/HintedHandOffManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f42afa2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f42afa2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f42afa2
Branch: refs/heads/cassandra-2.1
Commit: 6f42afa2bb54bc6a2624d40accab8c6f6db2038f
Parents: 7c6993f 351e35b
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Dec 19 20:38:02 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Dec 19 20:38:02 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DebuggableScheduledThreadPoolExecutor.java | 5 +
.../JMXEnabledScheduledThreadPoolExecutor.java | 137 +++++++++++++++++++
...EnabledScheduledThreadPoolExecutorMBean.java | 26 ++++
.../JMXEnabledThreadPoolExecutor.java | 2 -
.../cassandra/db/HintedHandOffManager.java | 38 ++---
.../cassandra/metrics/ThreadPoolMetrics.java | 6 +-
7 files changed, 186 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f42afa2/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4d88aa9,1ad2de5..ddc25cd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,46 -1,5 +1,47 @@@
-2.0.12:
+2.1.3
+ * Fix regression in SSTableRewriter causing some rows to become unreadable
+ during compaction (CASSANDRA-8429)
+ * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510)
+ * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression
+ is disabled (CASSANDRA-8288)
+ * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623)
+ * Make sure we set lastCompactedKey correctly (CASSANDRA-8463)
+ * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507)
+ * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370)
+ * Make sstablescrub check leveled manifest again (CASSANDRA-8432)
+ * Check first/last keys in sstable when giving out positions (CASSANDRA-8458)
+ * Disable mmap on Windows (CASSANDRA-6993)
+ * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253)
+ * Add auth support to cassandra-stress (CASSANDRA-7985)
+ * Fix ArrayIndexOutOfBoundsException when generating error message
+ for some CQL syntax errors (CASSANDRA-8455)
+ * Scale memtable slab allocation logarithmically (CASSANDRA-7882)
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
+ * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
+ * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
+ * Reduce maxHintsInProgress (CASSANDRA-8415)
+ * BTree updates may call provided update function twice (CASSANDRA-8018)
+ * Release sstable references after anticompaction (CASSANDRA-8386)
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
+ * Fix high size calculations for prepared statements (CASSANDRA-8231)
+ * Centralize shared executors (CASSANDRA-8055)
+ * Fix filtering for CONTAINS (KEY) relations on frozen collection
+ clustering columns when the query is restricted to a single
+ partition (CASSANDRA-8203)
+ * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243)
+ * Add more log info if readMeter is null (CASSANDRA-8238)
+ * add check of the system wall clock time at startup (CASSANDRA-8305)
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+ * Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
+ * Log failed host when preparing incremental repair (CASSANDRA-8228)
+ * Force config client mode in CQLSSTableWriter (CASSANDRA-8281)
+Merged from 2.0:
+ * Move all hints related tasks to hints internal executor (CASSANDRA-8285)
* Fix paging for multi-partition IN queries (CASSANDRA-8408)
* Fix MOVED_NODE topology event never being emitted when a node
moves its token (CASSANDRA-8373)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f42afa2/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f42afa2/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index 5c96bb6,de448b5..3f60df1
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@@ -74,7 -73,7 +73,6 @@@ public class JMXEnabledThreadPoolExecut
{
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory);
super.prestartAllCoreThreads();
- this.maxPoolSize = maxPoolSize;
-
metrics = new ThreadPoolMetrics(this, jmxPath, threadFactory.id);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f42afa2/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 869e38e,c8c3845..9821574
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@@ -37,20 -37,13 +37,16 @@@ import com.google.common.util.concurren
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+ import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
- import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
- import org.apache.cassandra.db.composites.CellName;
- import org.apache.cassandra.db.composites.Composite;
- import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@@ -64,10 -57,9 +60,7 @@@ import org.apache.cassandra.metrics.Hin
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.*;
--import org.apache.cassandra.utils.ByteBufferUtil;
--import org.apache.cassandra.utils.FBUtilities;
- import org.apache.cassandra.utils.JVMStabilityInspector;
--import org.apache.cassandra.utils.UUIDGen;
++import org.apache.cassandra.utils.*;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
/**
@@@ -105,16 -97,16 +98,15 @@@ public class HintedHandOffManager imple
private volatile boolean hintedHandOffPaused = false;
- static final CompositeType comparator = CompositeType.getInstance(Arrays.<AbstractType<?>>asList(UUIDType.instance, Int32Type.instance));
static final int maxHintTTL = Integer.parseInt(System.getProperty("cassandra.maxHintTTL", String.valueOf(Integer.MAX_VALUE)));
-- private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<InetAddress>();
++ private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<>();
- private final ThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getMaxHintsThread(),
- Integer.MAX_VALUE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
- "internal");
+ private final JMXEnabledScheduledThreadPoolExecutor executor =
+ new JMXEnabledScheduledThreadPoolExecutor(
+ DatabaseDescriptor.getMaxHintsThread(),
+ new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
+ "internal");
private final ColumnFamilyStore hintStore = Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.HINTS_CF);
@@@ -177,14 -169,14 +169,14 @@@
metrics.log();
}
};
- ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
+ executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
}
- private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
+ private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, tokenBytes);
- rm.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
- rm.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, tokenBytes);
+ mutation.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
+ mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
public void deleteHintsForEndpoint(final String ipOrHostname)
@@@ -514,8 -506,8 +506,8 @@@
IPartitioner p = StorageService.getPartitioner();
RowPosition minPos = p.getMinimumToken().minKeyBound();
-- Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
- IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
++ Range<RowPosition> range = new Range<>(minPos, minPos, p);
+ IDiskAtomFilter filter = new NamesQueryFilter(ImmutableSortedSet.<CellName>of());
List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
for (Row row : rows)
{
@@@ -578,7 -570,7 +570,7 @@@
Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory();
// Extract the keys as strings to be reported.
-- LinkedList<String> result = new LinkedList<String>();
++ LinkedList<String> result = new LinkedList<>();
for (Row row : getHintsSlice(1))
{
if (row.cf != null) //ignore removed rows
@@@ -595,9 -588,9 +587,9 @@@
columnCount);
// From keys "" to ""...
- IPartitioner<?> partitioner = StorageService.getPartitioner();
+ IPartitioner partitioner = StorageService.getPartitioner();
RowPosition minPos = partitioner.getMinimumToken().minKeyBound();
-- Range<RowPosition> range = new Range<RowPosition>(minPos, minPos);
++ Range<RowPosition> range = new Range<>(minPos, minPos);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f42afa2/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
index 8600e0c,af54cdb..a5e6daf
--- a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
@@@ -54,9 -50,9 +52,9 @@@ public class ThreadPoolMetric
* @param path Type of thread pool
* @param poolName Name of thread pool to identify metrics
*/
- public ThreadPoolMetrics(final JMXEnabledThreadPoolExecutor executor, String path, String poolName)
+ public ThreadPoolMetrics(final ThreadPoolExecutor executor, String path, String poolName)
{
- this.factory = new ThreadPoolMetricNameFactory(path, poolName);
+ this.factory = new ThreadPoolMetricNameFactory("ThreadPools", path, poolName);
activeTasks = Metrics.newGauge(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
{
@@@ -81,13 -77,6 +79,13 @@@
return executor.getTaskCount() - executor.getCompletedTaskCount();
}
});
+ maxPoolSize = Metrics.newGauge(factory.createMetricName("MaxPoolSize"), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
- return executor.maxPoolSize;
++ return executor.getMaximumPoolSize();
+ }
+ });
}
public void release()