You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/08/30 14:48:36 UTC
[2/3] New metrics;
patch by yukim reviewed by brandonwilliams for CASSANDRA-4009
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
new file mode 100644
index 0000000..1a93022
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.net.OutboundTcpConnectionPool;
+
+/**
+ * Metrics for {@link OutboundTcpConnectionPool}.
+ */
+public class ConnectionMetrics
+{
+ public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+ public static final String TYPE_NAME = "Connection";
+
+ /** Total number of timeouts happened on this node */
+ public static final Meter totalTimeouts = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalTimeouts"), "total timeouts", TimeUnit.SECONDS);
+ private static long recentTimeouts;
+
+ public final String address;
+ /** Pending tasks for Command(Mutations, Read etc) TCP Connections */
+ public final Gauge<Integer> commandPendingTasks;
+ /** Completed tasks for Command(Mutations, Read etc) TCP Connections */
+ public final Gauge<Long> commandCompletedTasks;
+ /** Dropped tasks for Command(Mutations, Read etc) TCP Connections */
+ public final Gauge<Long> commandDroppedTasks;
+ /** Pending tasks for Response(GOSSIP & RESPONSE) TCP Connections */
+ public final Gauge<Integer> responsePendingTasks;
+ /** Completed tasks for Response(GOSSIP & RESPONSE) TCP Connections */
+ public final Gauge<Long> responseCompletedTasks;
+ /** Number of timeouts for specific IP */
+ public final Meter timeouts;
+
+ private long recentTimeoutCount;
+
+ /**
+ * Create metrics for given connection pool.
+ *
+ * @param ip IP address to use for metrics label
+ * @param connectionPool Connection pool
+ */
+ public ConnectionMetrics(InetAddress ip, final OutboundTcpConnectionPool connectionPool)
+ {
+ address = ip.getHostAddress();
+ commandPendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CommandPendingTasks", address), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ return connectionPool.cmdCon.getPendingMessages();
+ }
+ });
+ commandCompletedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CommandCompletedTasks", address), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return connectionPool.cmdCon.getCompletedMesssages();
+ }
+ });
+ commandDroppedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "CommandDroppedTasks", address), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return connectionPool.cmdCon.getDroppedMessages();
+ }
+ });
+ responsePendingTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "ResponsePendingTasks", address), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ return connectionPool.ackCon.getPendingMessages();
+ }
+ });
+ responseCompletedTasks = Metrics.newGauge(new MetricName(GROUP_NAME, TYPE_NAME, "ResponseCompletedTasks", address), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return connectionPool.ackCon.getCompletedMesssages();
+ }
+ });
+ timeouts = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Timeouts", address), "timeouts", TimeUnit.SECONDS);
+ }
+
+ public void release()
+ {
+ Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "CommandPendingTasks", address));
+ Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "CommandCompletedTasks", address));
+ Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "CommandDroppedTasks", address));
+ Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "ResponsePendingTasks", address));
+ Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "ResponseCompletedTasks", address));
+ Metrics.defaultRegistry().removeMetric(new MetricName(GROUP_NAME, TYPE_NAME, "Timeouts", address));
+ }
+
+ @Deprecated
+ public static long getRecentTotalTimeout()
+ {
+ long total = totalTimeouts.count();
+ long recent = total - recentTimeouts;
+ recentTimeouts = total;
+ return recent;
+ }
+
+ @Deprecated
+ public long getRecentTimeout()
+ {
+ long timeoutCount = timeouts.count();
+ long recent = timeoutCount - recentTimeoutCount;
+ recentTimeoutCount = timeoutCount;
+ return recent;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
new file mode 100644
index 0000000..e0b12bb
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/DroppedMessageMetrics.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.core.MetricName;
+
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Metrics for dropped messages by verb.
+ */
+public class DroppedMessageMetrics
+{
+ public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+ public static final String TYPE_NAME = "DroppedMessage";
+
+ /** Number of dropped messages */
+ public final Meter dropped;
+
+ private long lastDropped = 0;
+
+ public DroppedMessageMetrics(MessagingService.Verb verb)
+ {
+ dropped = Metrics.newMeter(new MetricName(GROUP_NAME, TYPE_NAME, "Dropped", verb.toString()), "dropped", TimeUnit.SECONDS);
+ }
+
+ @Deprecated
+ public int getRecentlyDropped()
+ {
+ long currentDropped = dropped.count();
+ long recentlyDropped = currentDropped - lastDropped;
+ lastDropped = currentDropped;
+ return (int)recentlyDropped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
new file mode 100644
index 0000000..d177613
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.Timer;
+
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+/**
+ * Metrics about latencies
+ */
+public class LatencyMetrics
+{
+ /** Latency */
+ public final Timer latency;
+ /** Total latency in micro sec */
+ public final Counter totalLatency;
+
+ protected final MetricNameFactory factory;
+ protected final String namePrefix;
+
+ @Deprecated public final EstimatedHistogram totalLatencyHistogram = new EstimatedHistogram();
+ @Deprecated public final EstimatedHistogram recentLatencyHistogram = new EstimatedHistogram();
+ protected long lastLatency;
+ protected long lastOpCount;
+
+ /**
+ * Create LatencyMetrics with given group, type, and scope. Name prefix for each metric will be empty.
+ *
+ * @param group Group name
+ * @param type Type name
+ * @param scope Scope
+ */
+ public LatencyMetrics(String group, String type, String scope)
+ {
+ this(group, type, "", scope);
+ }
+
+ /**
+ * Create LatencyMetrics with given group, type, prefix to append to each metric name, and scope.
+ *
+ * @param group Group name
+ * @param type Type name
+ * @param namePrefix Prefix to append to each metric name
+ * @param scope Scope of metrics
+ */
+ public LatencyMetrics(String group, String type, String namePrefix, String scope)
+ {
+ this(new LatencyMetricNameFactory(group, type, scope), namePrefix);
+ }
+
+ /**
+ * Create LatencyMetrics with given group, type, prefix to append to each metric name, and scope.
+ *
+ * @param factory MetricName factory to use
+ * @param namePrefix Prefix to append to each metric name
+ */
+ public LatencyMetrics(MetricNameFactory factory, String namePrefix)
+ {
+ this.factory = factory;
+ this.namePrefix = namePrefix;
+
+ latency = Metrics.newTimer(factory.createMetricName(namePrefix + "Latency"), TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
+ totalLatency = Metrics.newCounter(factory.createMetricName(namePrefix + "TotalLatency"));
+ }
+
+ /** takes nanoseconds **/
+ public void addNano(long nanos)
+ {
+ // convert to microseconds. 1 millionth
+ addMicro(nanos / 1000);
+ }
+
+ public void addMicro(long micros)
+ {
+ latency.update(micros, TimeUnit.MICROSECONDS);
+ totalLatency.inc(micros);
+ totalLatencyHistogram.add(micros);
+ recentLatencyHistogram.add(micros);
+ }
+
+ public void release()
+ {
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName(namePrefix + "Latency"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName(namePrefix + "TotalLatency"));
+ }
+
+ @Deprecated
+ public double getRecentLatency()
+ {
+ long ops = latency.count();
+ long n = totalLatency.count();
+ try
+ {
+ return ((double) n - lastLatency) / (ops - lastOpCount);
+ }
+ finally
+ {
+ lastLatency = n;
+ lastOpCount = ops;
+ }
+ }
+
+ static class LatencyMetricNameFactory implements MetricNameFactory
+ {
+ private final String group;
+ private final String type;
+ private final String scope;
+
+ LatencyMetricNameFactory(String group, String type, String scope)
+ {
+ this.group = group;
+ this.type = type;
+ this.scope = scope;
+ }
+
+ public MetricName createMetricName(String metricName)
+ {
+ return new MetricName(group, type, metricName, scope);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/MetricNameFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MetricNameFactory.java b/src/java/org/apache/cassandra/metrics/MetricNameFactory.java
new file mode 100644
index 0000000..5c1a5c2
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/MetricNameFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.core.MetricName;
+
+public interface MetricNameFactory
+{
+ /**
+ * Create {@link MetricName} from given metric name.
+ *
+ * @param metricName Name part of {@link MetricName}.
+ * @return new MetricName with given metric name.
+ */
+ MetricName createMetricName(String metricName);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/StorageMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
new file mode 100644
index 0000000..3cda71e
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
+
+/**
+ * Metrics related to Storage.
+ */
+public class StorageMetrics
+{
+ public static final Counter load = Metrics.newCounter(new MetricName("org.apache.cassandra.metrics", "Storage", "Load"));
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/StreamingMetrics.java b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
new file mode 100644
index 0000000..05aad31
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/StreamingMetrics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.net.InetAddress;
+import java.util.concurrent.ConcurrentMap;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.MetricName;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+/**
+ * Metrics for streaming.
+ */
+public class StreamingMetrics
+{
+ public static final String GROUP_NAME = "org.apache.cassandra.metrics";
+ public static final String TYPE_NAME = "Streaming";
+
+ private static final ConcurrentMap<InetAddress, StreamingMetrics> instances = new NonBlockingHashMap<InetAddress, StreamingMetrics>();
+
+ public static final Counter activeStreamsOutbound = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "ActiveOutboundStreams"));
+ public static final Counter totalIncomingBytes = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalIncomingBytes"));
+ public static final Counter totalOutgoingBytes = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "TotalOutgoingBytes"));
+ public final Counter incomingBytes;
+ public final Counter outgoingBytes;
+
+ public static StreamingMetrics get(InetAddress ip)
+ {
+ StreamingMetrics metrics = instances.get(ip);
+ if (metrics == null)
+ {
+ metrics = new StreamingMetrics(ip);
+ instances.put(ip, metrics);
+ }
+ return metrics;
+ }
+
+ public StreamingMetrics(final InetAddress peer)
+ {
+ incomingBytes = Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "IncomingBytes", peer.getHostAddress()));
+ outgoingBytes= Metrics.newCounter(new MetricName(GROUP_NAME, TYPE_NAME, "OutgoingBytes", peer.getHostAddress()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
new file mode 100644
index 0000000..af54cdb
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadPoolMetrics.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.yammer.metrics.Metrics;
+import com.yammer.metrics.core.*;
+
+/**
+ * Metrics for {@link ThreadPoolExecutor}.
+ */
+public class ThreadPoolMetrics
+{
+ /** Number of active tasks. */
+ public final Gauge<Integer> activeTasks;
+ /** Number of tasks that had blocked before being accepted (or rejected). */
+ public final Counter totalBlocked;
+ /**
+ * Number of tasks currently blocked, waiting to be accepted by
+ * the executor (because all threads are busy and the backing queue is full).
+ */
+ public final Counter currentBlocked;
+ /** Number of completed tasks. */
+ public final Gauge<Long> completedTasks;
+ /** Number of tasks waiting to be executed. */
+ public final Gauge<Long> pendingTasks;
+
+ private MetricNameFactory factory;
+
+ /**
+ * Create metrics for given ThreadPoolExecutor.
+ *
+ * @param executor Thread pool
+ * @param path Type of thread pool
+ * @param poolName Name of thread pool to identify metrics
+ */
+ public ThreadPoolMetrics(final ThreadPoolExecutor executor, String path, String poolName)
+ {
+ this.factory = new ThreadPoolMetricNameFactory(path, poolName);
+
+ activeTasks = Metrics.newGauge(factory.createMetricName("ActiveTasks"), new Gauge<Integer>()
+ {
+ public Integer value()
+ {
+ return executor.getActiveCount();
+ }
+ });
+ totalBlocked = Metrics.newCounter(factory.createMetricName("TotalBlockedTasks"));
+ currentBlocked = Metrics.newCounter(factory.createMetricName("CurrentlyBlockedTasks"));
+ completedTasks = Metrics.newGauge(factory.createMetricName("CompletedTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return executor.getCompletedTaskCount();
+ }
+ });
+ pendingTasks = Metrics.newGauge(factory.createMetricName("PendingTasks"), new Gauge<Long>()
+ {
+ public Long value()
+ {
+ return executor.getTaskCount() - executor.getCompletedTaskCount();
+ }
+ });
+ }
+
+ public void release()
+ {
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("ActiveTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("PendingTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("CompletedTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("TotalBlockedTasks"));
+ Metrics.defaultRegistry().removeMetric(factory.createMetricName("CurrentlyBlockedTasks"));
+ }
+
+ class ThreadPoolMetricNameFactory implements MetricNameFactory
+ {
+ private final String path;
+ private final String poolName;
+
+ ThreadPoolMetricNameFactory(String path, String poolName)
+ {
+ this.path = path;
+ this.poolName = poolName;
+ }
+
+ public MetricName createMetricName(String metricName)
+ {
+ String groupName = ThreadPoolMetrics.class.getPackage().getName();
+ String type = "ThreadPools";
+ StringBuilder mbeanName = new StringBuilder();
+ mbeanName.append(groupName).append(":");
+ mbeanName.append("type=").append(type);
+ mbeanName.append(",path=").append(path);
+ mbeanName.append(",scope=").append(poolName);
+ mbeanName.append(",name=").append(metricName);
+
+ return new MetricName(groupName, type, metricName, path + "." + poolName, mbeanName.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 9d95fad..c4ceae2 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -31,15 +31,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
-
-import org.apache.cassandra.tracing.Tracing;
-
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +54,8 @@ import org.apache.cassandra.gms.GossipDigestSyn;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.metrics.ConnectionMetrics;
+import org.apache.cassandra.metrics.DroppedMessageMetrics;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.AntiEntropyService;
@@ -65,9 +64,8 @@ import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
public final class MessagingService implements MessagingServiceMBean
{
@@ -260,7 +258,6 @@ public final class MessagingService implements MessagingServiceMBean
* is not going to be a thread per node - but rather an instance per node. That's totally fine.
*/
private final ConcurrentMap<InetAddress, DebuggableThreadPoolExecutor> streamExecutors = new NonBlockingHashMap<InetAddress, DebuggableThreadPoolExecutor>();
- private final AtomicInteger activeStreamsOutbound = new AtomicInteger(0);
private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
@@ -283,15 +280,10 @@ public final class MessagingService implements MessagingServiceMBean
Verb.REQUEST_RESPONSE);
// total dropped message counts for server lifetime
- private final Map<Verb, AtomicInteger> droppedMessages = new EnumMap<Verb, AtomicInteger>(Verb.class);
+ private final Map<Verb, DroppedMessageMetrics> droppedMessages = new EnumMap<Verb, DroppedMessageMetrics>(Verb.class);
// dropped count when last requested for the Recent api. high concurrency isn't necessary here.
- private final Map<Verb, Integer> lastDropped = Collections.synchronizedMap(new EnumMap<Verb, Integer>(Verb.class));
private final Map<Verb, Integer> lastDroppedInternal = new EnumMap<Verb, Integer>(Verb.class);
- private long totalTimeouts = 0;
- private long recentTotalTimeouts = 0;
- private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>();
- private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>();
private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
// protocol versions of the other nodes in the cluster
@@ -310,8 +302,7 @@ public final class MessagingService implements MessagingServiceMBean
{
for (Verb verb : DROPPABLE_VERBS)
{
- droppedMessages.put(verb, new AtomicInteger());
- lastDropped.put(verb, 0);
+ droppedMessages.put(verb, new DroppedMessageMetrics(verb));
lastDroppedInternal.put(verb, 0);
}
@@ -332,19 +323,8 @@ public final class MessagingService implements MessagingServiceMBean
{
CallbackInfo expiredCallbackInfo = pair.right.value;
maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
- totalTimeouts++;
- String ip = expiredCallbackInfo.target.getHostAddress();
- AtomicLong c = timeoutsPerHost.get(ip);
- if (c == null)
- {
- c = new AtomicLong();
- timeoutsPerHost.put(ip, c);
- }
- c.incrementAndGet();
- // we only create AtomicLong instances here, so that the write
- // access to the hashmap happens single-threadedly.
- if (recentTimeoutsPerHost.get(ip) == null)
- recentTimeoutsPerHost.put(ip, new AtomicLong());
+ ConnectionMetrics.totalTimeouts.mark();
+ getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
if (expiredCallbackInfo.shouldHint())
{
@@ -627,24 +607,6 @@ public final class MessagingService implements MessagingServiceMBean
: new CompressedFileStreamTask(header, to));
}
- public void incrementActiveStreamsOutbound()
- {
- activeStreamsOutbound.incrementAndGet();
- }
-
- public void decrementActiveStreamsOutbound()
- {
- activeStreamsOutbound.decrementAndGet();
- }
-
- /**
- * The count of active outbound stream tasks.
- */
- public int getActiveStreamsOutbound()
- {
- return activeStreamsOutbound.get();
- }
-
public void register(ILatencySubscriber subcriber)
{
subscribers.add(subcriber);
@@ -821,23 +783,23 @@ public final class MessagingService implements MessagingServiceMBean
public void incrementDroppedMessages(Verb verb)
{
assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
- droppedMessages.get(verb).incrementAndGet();
+ droppedMessages.get(verb).dropped.mark();
}
private void logDroppedMessages()
{
boolean logTpstats = false;
- for (Map.Entry<Verb, AtomicInteger> entry : droppedMessages.entrySet())
+ for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
{
- AtomicInteger dropped = entry.getValue();
+ int dropped = (int) entry.getValue().dropped.count();
Verb verb = entry.getKey();
- int recent = dropped.get() - lastDroppedInternal.get(verb);
+ int recent = dropped - lastDroppedInternal.get(verb);
if (recent > 0)
{
logTpstats = true;
logger.info("{} {} messages dropped in last {}ms",
- new Object[]{ recent, verb, LOG_DROPPED_INTERVAL_IN_MS });
- lastDroppedInternal.put(verb, dropped.get());
+ new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS});
+ lastDroppedInternal.put(verb, dropped);
}
}
@@ -932,43 +894,37 @@ public final class MessagingService implements MessagingServiceMBean
public Map<String, Integer> getDroppedMessages()
{
Map<String, Integer> map = new HashMap<String, Integer>();
- for (Map.Entry<Verb, AtomicInteger> entry : droppedMessages.entrySet())
- map.put(entry.getKey().toString(), entry.getValue().get());
+ for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
+ map.put(entry.getKey().toString(), (int) entry.getValue().dropped.count());
return map;
}
public Map<String, Integer> getRecentlyDroppedMessages()
{
Map<String, Integer> map = new HashMap<String, Integer>();
- for (Map.Entry<Verb, AtomicInteger> entry : droppedMessages.entrySet())
- {
- Verb verb = entry.getKey();
- Integer dropped = entry.getValue().get();
- Integer recentlyDropped = dropped - lastDropped.get(verb);
- map.put(verb.toString(), recentlyDropped);
- lastDropped.put(verb, dropped);
- }
+ for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
+ map.put(entry.getKey().toString(), entry.getValue().getRecentlyDropped());
return map;
}
public long getTotalTimeouts()
{
- return totalTimeouts;
+ return ConnectionMetrics.totalTimeouts.count();
}
public long getRecentTotalTimouts()
{
- long recent = totalTimeouts - recentTotalTimeouts;
- recentTotalTimeouts = totalTimeouts;
- return recent;
+ return ConnectionMetrics.getRecentTotalTimeout();
}
public Map<String, Long> getTimeoutsPerHost()
{
Map<String, Long> result = new HashMap<String, Long>();
- for (Map.Entry<String, AtomicLong> entry : timeoutsPerHost.entrySet())
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet())
{
- result.put(entry.getKey(), entry.getValue().get());
+ String ip = entry.getKey().getHostAddress();
+ long recent = entry.getValue().getTimeouts();
+ result.put(ip, recent);
}
return result;
}
@@ -976,12 +932,11 @@ public final class MessagingService implements MessagingServiceMBean
public Map<String, Long> getRecentTimeoutsPerHost()
{
Map<String, Long> result = new HashMap<String, Long>();
- for (Map.Entry<String, AtomicLong> entry : recentTimeoutsPerHost.entrySet())
+ for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet())
{
- String ip = entry.getKey();
- AtomicLong recent = entry.getValue();
- Long timeout = timeoutsPerHost.get(ip).get();
- result.put(ip, timeout - recent.getAndSet(timeout));
+ String ip = entry.getKey().getHostAddress();
+ long recent = entry.getValue().getRecentTimeouts();
+ result.put(ip, recent);
}
return result;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index ce51cf0..05a39a1 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.metrics.ConnectionMetrics;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.utils.FBUtilities;
@@ -39,6 +40,7 @@ public class OutboundTcpConnectionPool
public final OutboundTcpConnection ackCon;
// pointer to the reseted Address.
private InetAddress resetedEndpoint;
+ private ConnectionMetrics metrics;
OutboundTcpConnectionPool(InetAddress remoteEp)
{
@@ -47,6 +49,8 @@ public class OutboundTcpConnectionPool
cmdCon.start();
ackCon = new OutboundTcpConnection(this);
ackCon.start();
+
+ metrics = new ConnectionMetrics(id, this);
}
/**
@@ -86,6 +90,25 @@ public class OutboundTcpConnectionPool
resetedEndpoint = remoteEP;
for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon })
conn.softCloseSocket();
+
+ // release previous metrics and create new one with reset address
+ metrics.release();
+ metrics = new ConnectionMetrics(resetedEndpoint, this);
+ }
+
+ public long getTimeouts()
+ {
+ return metrics.timeouts.count();
+ }
+
+ public long getRecentTimeouts()
+ {
+ return metrics.getRecentTimeout();
+ }
+
+ public void incrementTimeout()
+ {
+ metrics.timeouts.mark();
}
public Socket newSocket() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java b/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
index a7d9919..2d2e0bd 100644
--- a/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
+++ b/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.scheduler;
-
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
@@ -25,11 +24,11 @@ import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.metrics.LatencyMetrics;
class WeightedQueue implements WeightedQueueMBean
{
- private final LatencyTracker stats = new LatencyTracker();
+ private final LatencyMetrics metric;
public final String key;
public final int weight;
@@ -39,6 +38,7 @@ class WeightedQueue implements WeightedQueueMBean
this.key = key;
this.weight = weight;
this.queue = new SynchronousQueue<Entry>(true);
+ this.metric = new LatencyMetrics("org.apache.cassandra.metrics", "scheduler", "WeightedQueue", key);
}
public void register()
@@ -66,7 +66,7 @@ class WeightedQueue implements WeightedQueueMBean
Entry e = queue.poll();
if (e == null)
return null;
- stats.addNano(System.nanoTime() - e.creationTime);
+ metric.addNano(System.nanoTime() - e.creationTime);
return e.thread;
}
@@ -90,26 +90,26 @@ class WeightedQueue implements WeightedQueueMBean
public long getOperations()
{
- return stats.getOpCount();
+ return metric.latency.count();
}
public long getTotalLatencyMicros()
{
- return stats.getTotalLatencyMicros();
+ return metric.totalLatency.count();
}
public double getRecentLatencyMicros()
{
- return stats.getRecentLatencyMicros();
+ return metric.getRecentLatency();
}
public long[] getTotalLatencyHistogramMicros()
{
- return stats.getTotalLatencyHistogramMicros();
+ return metric.totalLatencyHistogram.getBuckets(false);
}
public long[] getRecentLatencyHistogramMicros()
{
- return stats.getRecentLatencyHistogramMicros();
+ return metric.recentLatencyHistogram.getBuckets(true);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java b/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
index 4a253d1..d16d007 100644
--- a/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
+++ b/src/java/org/apache/cassandra/scheduler/WeightedQueueMBean.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.scheduler;
/**
* Exposes client request scheduling metrics for a particular scheduler queue.
+ * @see org.apache.cassandra.metrics.LatencyMetrics
*/
+@Deprecated
public interface WeightedQueueMBean
{
public long getOperations();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 25a38ef..0ed8002 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -29,11 +29,14 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.util.concurrent.Futures;
+import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
+import org.github.jamm.MemoryMeter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.*;
import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
@@ -52,12 +55,6 @@ import org.apache.cassandra.io.sstable.SSTableReader.Operator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
-import org.github.jamm.MemoryMeter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.googlecode.concurrentlinkedhashmap.EntryWeigher;
public class CacheService implements CacheServiceMBean
{
@@ -125,7 +122,7 @@ public class CacheService implements CacheServiceMBean
else
{
logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); KeyCache size in JVM Heap will not be calculated accurately. " +
- "Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
+ "Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
/* We don't know the overhead size because memory meter is not enabled. */
EntryWeigher<KeyCacheKey, RowIndexEntry> weigher = new EntryWeigher<KeyCacheKey, RowIndexEntry>()
{
@@ -177,32 +174,32 @@ public class CacheService implements CacheServiceMBean
public long getKeyCacheHits()
{
- return keyCache.getHits();
+ return keyCache.getMetrics().hits.count();
}
public long getRowCacheHits()
{
- return rowCache.getHits();
+ return rowCache.getMetrics().hits.count();
}
public long getKeyCacheRequests()
{
- return keyCache.getRequests();
+ return keyCache.getMetrics().requests.count();
}
public long getRowCacheRequests()
{
- return rowCache.getRequests();
+ return rowCache.getMetrics().requests.count();
}
public double getKeyCacheRecentHitRate()
{
- return keyCache.getRecentHitRate();
+ return keyCache.getMetrics().getRecentHitRate();
}
public double getRowCacheRecentHitRate()
{
- return rowCache.getRecentHitRate();
+ return rowCache.getMetrics().getRecentHitRate();
}
public int getRowCacheSavePeriodInSeconds()
@@ -245,7 +242,7 @@ public class CacheService implements CacheServiceMBean
public long getRowCacheCapacityInBytes()
{
- return rowCache.getCapacity();
+ return rowCache.getMetrics().capacityInBytes.value();
}
public long getRowCacheCapacityInMB()
@@ -263,7 +260,7 @@ public class CacheService implements CacheServiceMBean
public long getKeyCacheCapacityInBytes()
{
- return keyCache.getCapacity();
+ return keyCache.getMetrics().capacityInBytes.value();
}
public long getKeyCacheCapacityInMB()
@@ -282,12 +279,12 @@ public class CacheService implements CacheServiceMBean
public long getRowCacheSize()
{
- return rowCache.weightedSize();
+ return rowCache.getMetrics().size.value();
}
public long getKeyCacheSize()
{
- return keyCache.weightedSize();
+ return keyCache.getMetrics().size.value();
}
public void reduceCacheSizes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/CacheServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheServiceMBean.java b/src/java/org/apache/cassandra/service/CacheServiceMBean.java
index 747b8f8..a34d5ed 100644
--- a/src/java/org/apache/cassandra/service/CacheServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/CacheServiceMBean.java
@@ -21,13 +21,28 @@ import java.util.concurrent.ExecutionException;
public interface CacheServiceMBean
{
+ /**
+ * @see org.apache.cassandra.metrics.CacheMetrics#hits
+ */
+ @Deprecated
public long getKeyCacheHits();
+ @Deprecated
public long getRowCacheHits();
+ /**
+ * @see org.apache.cassandra.metrics.CacheMetrics#requests
+ */
+ @Deprecated
public long getKeyCacheRequests();
+ @Deprecated
public long getRowCacheRequests();
+ /**
+ * @see org.apache.cassandra.metrics.CacheMetrics#recentHitRate
+ */
+ @Deprecated
public double getKeyCacheRecentHitRate();
+ @Deprecated
public double getRowCacheRecentHitRate();
public int getRowCacheSavePeriodInSeconds();
@@ -47,15 +62,31 @@ public interface CacheServiceMBean
public void invalidateRowCache();
public long getRowCacheCapacityInMB();
+ /**
+ * @see org.apache.cassandra.metrics.CacheMetrics#capacityInBytes
+ */
+ @Deprecated
public long getRowCacheCapacityInBytes();
public void setRowCacheCapacityInMB(long capacity);
public long getKeyCacheCapacityInMB();
+ /**
+ * @see org.apache.cassandra.metrics.CacheMetrics#capacityInBytes
+ */
+ @Deprecated
public long getKeyCacheCapacityInBytes();
public void setKeyCacheCapacityInMB(long capacity);
+ /**
+ * @see org.apache.cassandra.metrics.CacheMetrics#size
+ */
+ @Deprecated
public long getRowCacheSize();
+ /**
+ * @see org.apache.cassandra.metrics.CacheMetrics#size
+ */
+ @Deprecated
public long getKeyCacheSize();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5b85a84..3863eed 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -58,18 +58,12 @@ import org.apache.cassandra.net.*;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.*;
-
public class StorageProxy implements StorageProxyMBean
{
public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
private static final boolean OPTIMIZE_LOCAL_REQUESTS = true; // set to false to test messagingservice path on single node
- // mbean stuff
- private static final LatencyTracker readStats = new LatencyTracker();
- private static final LatencyTracker rangeStats = new LatencyTracker();
- private static final LatencyTracker writeStats = new LatencyTracker();
-
public static final String UNREACHABLE = "UNREACHABLE";
private static final WritePerformer standardWritePerformer;
@@ -88,6 +82,9 @@ public class StorageProxy implements StorageProxyMBean
}
});
private static final AtomicLong totalHints = new AtomicLong();
+ private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read");
+ private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
+ private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
private StorageProxy() {}
@@ -200,6 +197,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (TimedOutException ex)
{
+ writeMetrics.timeouts.mark();
ClientRequestMetrics.writeTimeouts.inc();
if (logger.isDebugEnabled())
{
@@ -212,6 +210,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (UnavailableException e)
{
+ writeMetrics.unavailables.mark();
ClientRequestMetrics.writeUnavailables.inc();
throw e;
}
@@ -222,7 +221,7 @@ public class StorageProxy implements StorageProxyMBean
}
finally
{
- writeStats.addNano(System.nanoTime() - startTime);
+ writeMetrics.addNano(System.nanoTime() - startTime);
}
}
@@ -602,6 +601,7 @@ public class StorageProxy implements StorageProxyMBean
{
if (StorageService.instance.isBootstrapMode() && !systemTableQuery(commands))
{
+ readMetrics.unavailables.mark();
ClientRequestMetrics.readUnavailables.inc();
throw new UnavailableException();
}
@@ -613,17 +613,19 @@ public class StorageProxy implements StorageProxyMBean
}
catch (UnavailableException e)
{
+ readMetrics.unavailables.mark();
ClientRequestMetrics.readUnavailables.inc();
throw e;
}
catch (TimeoutException e)
{
+ readMetrics.timeouts.mark();
ClientRequestMetrics.readTimeouts.inc();
throw e;
}
finally
{
- readStats.addNano(System.nanoTime() - startTime);
+ readMetrics.addNano(System.nanoTime() - startTime);
}
return rows;
}
@@ -940,7 +942,7 @@ public class StorageProxy implements StorageProxyMBean
}
finally
{
- rangeStats.addNano(System.nanoTime() - startTime);
+ rangeMetrics.addNano(System.nanoTime() - startTime);
}
return trim(command, rows);
}
@@ -1084,77 +1086,77 @@ public class StorageProxy implements StorageProxyMBean
public long getReadOperations()
{
- return readStats.getOpCount();
+ return readMetrics.latency.count();
}
public long getTotalReadLatencyMicros()
{
- return readStats.getTotalLatencyMicros();
+ return readMetrics.totalLatency.count();
}
public double getRecentReadLatencyMicros()
{
- return readStats.getRecentLatencyMicros();
+ return readMetrics.getRecentLatency();
}
public long[] getTotalReadLatencyHistogramMicros()
{
- return readStats.getTotalLatencyHistogramMicros();
+ return readMetrics.totalLatencyHistogram.getBuckets(false);
}
public long[] getRecentReadLatencyHistogramMicros()
{
- return readStats.getRecentLatencyHistogramMicros();
+ return readMetrics.recentLatencyHistogram.getBuckets(true);
}
public long getRangeOperations()
{
- return rangeStats.getOpCount();
+ return rangeMetrics.latency.count();
}
public long getTotalRangeLatencyMicros()
{
- return rangeStats.getTotalLatencyMicros();
+ return rangeMetrics.totalLatency.count();
}
public double getRecentRangeLatencyMicros()
{
- return rangeStats.getRecentLatencyMicros();
+ return rangeMetrics.getRecentLatency();
}
public long[] getTotalRangeLatencyHistogramMicros()
{
- return rangeStats.getTotalLatencyHistogramMicros();
+ return rangeMetrics.totalLatencyHistogram.getBuckets(false);
}
public long[] getRecentRangeLatencyHistogramMicros()
{
- return rangeStats.getRecentLatencyHistogramMicros();
+ return rangeMetrics.recentLatencyHistogram.getBuckets(true);
}
public long getWriteOperations()
{
- return writeStats.getOpCount();
+ return writeMetrics.latency.count();
}
public long getTotalWriteLatencyMicros()
{
- return writeStats.getTotalLatencyMicros();
+ return writeMetrics.totalLatency.count();
}
public double getRecentWriteLatencyMicros()
{
- return writeStats.getRecentLatencyMicros();
+ return writeMetrics.getRecentLatency();
}
public long[] getTotalWriteLatencyHistogramMicros()
{
- return writeStats.getTotalLatencyHistogramMicros();
+ return writeMetrics.totalLatencyHistogram.getBuckets(false);
}
public long[] getRecentWriteLatencyHistogramMicros()
{
- return writeStats.getRecentLatencyHistogramMicros();
+ return writeMetrics.recentLatencyHistogram.getBuckets(true);
}
public boolean getHintedHandoffEnabled()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index dd1541a..c6d4b7c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -19,22 +19,52 @@ package org.apache.cassandra.service;
public interface StorageProxyMBean
{
+ /**
+ * @see org.apache.cassandra.metrics.LatencyMetrics#opCount
+ */
+ @Deprecated
public long getReadOperations();
+ /**
+ * @see org.apache.cassandra.metrics.LatencyMetrics#totalLatency
+ */
+ @Deprecated
public long getTotalReadLatencyMicros();
+ /**
+ * @see org.apache.cassandra.metrics.LatencyMetrics#recentLatencyMicro
+ */
+ @Deprecated
public double getRecentReadLatencyMicros();
+ /**
+ * @see org.apache.cassandra.metrics.LatencyMetrics#totalLatencyHistogramMicro
+ */
+ @Deprecated
public long[] getTotalReadLatencyHistogramMicros();
+ /**
+ * @see org.apache.cassandra.metrics.LatencyMetrics#recentLatencyHistogramMicro
+ */
+ @Deprecated
public long[] getRecentReadLatencyHistogramMicros();
+ @Deprecated
public long getRangeOperations();
+ @Deprecated
public long getTotalRangeLatencyMicros();
+ @Deprecated
public double getRecentRangeLatencyMicros();
+ @Deprecated
public long[] getTotalRangeLatencyHistogramMicros();
+ @Deprecated
public long[] getRecentRangeLatencyHistogramMicros();
+ @Deprecated
public long getWriteOperations();
+ @Deprecated
public long getTotalWriteLatencyMicros();
+ @Deprecated
public double getRecentWriteLatencyMicros();
+ @Deprecated
public long[] getTotalWriteLatencyHistogramMicros();
+ @Deprecated
public long[] getRecentWriteLatencyHistogramMicros();
public long getTotalHints();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f0714a..9d7b481 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
@@ -110,6 +111,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public static final StorageService instance = new StorageService();
+ private static final StorageMetrics metrics = new StorageMetrics();
+
public static IPartitioner getPartitioner()
{
return DatabaseDescriptor.getPartitioner();
@@ -395,13 +398,6 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
throw new AssertionError(e);
}
- if (!isClientMode)
- {
- // "Touch" metrics classes to trigger static initialization, such that all metrics become available
- // on start-up even if they have not yet been used.
- new ClientRequestMetrics();
- }
-
if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
{
logger.info("Loading persisted ring state");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 350eff5..8ad047e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -171,7 +171,9 @@ public interface StorageServiceMBean
/**
* Numeric load value.
+ * @see org.apache.cassandra.metrics.StorageMetrics#load
*/
+ @Deprecated
public double getLoad();
/** Human-readable load value */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index ba7d9f3..5ecdfbf 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -29,6 +29,7 @@ import com.ning.compress.lzf.LZFOutputStream;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -57,6 +58,7 @@ public class FileStreamTask extends WrappedRunnable
// outbound global throughput limiter
protected final Throttle throttle;
private final StreamReplyVerbHandler handler = new StreamReplyVerbHandler();
+ protected final StreamingMetrics metrics;
public FileStreamTask(StreamHeader header, InetAddress to)
{
@@ -73,9 +75,10 @@ public class FileStreamTask extends WrappedRunnable
// total throughput
int totalBytesPerMS = DatabaseDescriptor.getStreamThroughputOutboundMegabitsPerSec() * 1024 * 1024 / 8 / 1000;
// per stream throughput (target bytes per MS)
- return totalBytesPerMS / Math.max(1, MessagingService.instance().getActiveStreamsOutbound());
+ return totalBytesPerMS / Math.max(1, (int)StreamingMetrics.activeStreamsOutbound.count());
}
});
+ metrics = StreamingMetrics.get(to);
}
public void runMayThrow() throws IOException
@@ -141,9 +144,10 @@ public class FileStreamTask extends WrappedRunnable
// setting up data compression stream
compressedoutput = new LZFOutputStream(output);
- MessagingService.instance().incrementActiveStreamsOutbound();
+ StreamingMetrics.activeStreamsOutbound.inc();
try
{
+ long totalBytesTransferred = 0;
// stream each of the required sections of the file
for (Pair<Long, Long> section : header.file.sections)
{
@@ -159,6 +163,7 @@ public class FileStreamTask extends WrappedRunnable
{
long lastWrite = write(file, length, bytesTransferred);
bytesTransferred += lastWrite;
+ totalBytesTransferred += lastWrite;
// store streaming progress
header.file.progress += lastWrite;
}
@@ -169,12 +174,14 @@ public class FileStreamTask extends WrappedRunnable
if (logger.isDebugEnabled())
logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
}
+ StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
+ metrics.outgoingBytes.inc(totalBytesTransferred);
// receive reply confirmation
receiveReply();
}
finally
{
- MessagingService.instance().decrementActiveStreamsOutbound();
+ StreamingMetrics.activeStreamsOutbound.dec();
// no matter what happens close file
FileUtils.closeQuietly(file);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index f74f566..49cb35b 100644
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.compaction.PrecompactedRow;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.compress.CompressedInputStream;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -52,6 +53,7 @@ public class IncomingStreamReader
protected final PendingFile remoteFile;
protected final StreamInSession session;
private final InputStream underliningStream;
+ private final StreamingMetrics metrics;
public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
{
@@ -80,6 +82,7 @@ public class IncomingStreamReader
{
underliningStream = null;
}
+ metrics = StreamingMetrics.get(socket.getInetAddress());
}
/**
@@ -127,6 +130,7 @@ public class IncomingStreamReader
try
{
BytesReadTracker in = new BytesReadTracker(input);
+ long totalBytesRead = 0;
for (Pair<Long, Long> section : localFile.sections)
{
@@ -166,8 +170,11 @@ public class IncomingStreamReader
remoteFile.progress += remoteFile.compressionInfo != null
? ((CompressedInputStream) underliningStream).uncompressedBytes()
: in.getBytesRead();
+ totalBytesRead += in.getBytesRead();
}
}
+ StreamingMetrics.totalIncomingBytes.inc(totalBytesRead);
+ metrics.incomingBytes.inc(totalBytesRead);
return writer.closeAndOpenReader();
}
catch (Throwable e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69cedbfc/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
index 1a281fc..398599f 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.FileStreamTask;
import org.apache.cassandra.streaming.StreamHeader;
@@ -67,11 +68,12 @@ public class CompressedFileStreamTask extends FileStreamTask
RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()), true);
FileChannel fc = file.getChannel();
- MessagingService.instance().incrementActiveStreamsOutbound();
+ StreamingMetrics.activeStreamsOutbound.inc();
// calculate chunks to transfer. we want to send continuous chunks altogether.
List<Pair<Long, Long>> sections = getTransferSections(header.file.compressionInfo.chunks);
try
{
+ long totalBytesTransferred = 0;
// stream each of the required sections of the file
for (Pair<Long, Long> section : sections)
{
@@ -99,18 +101,21 @@ public class CompressedFileStreamTask extends FileStreamTask
throttle.throttleDelta(toTransfer);
lastWrite = toTransfer;
}
+ totalBytesTransferred += lastWrite;
bytesTransferred += lastWrite;
header.file.progress += lastWrite;
}
logger.debug("Bytes transferred " + bytesTransferred + "/" + header.file.size);
}
+ StreamingMetrics.totalOutgoingBytes.inc(totalBytesTransferred);
+ metrics.outgoingBytes.inc(totalBytesTransferred);
// receive reply confirmation
receiveReply();
}
finally
{
- MessagingService.instance().decrementActiveStreamsOutbound();
+ StreamingMetrics.activeStreamsOutbound.dec();
// no matter what happens close file
FileUtils.closeQuietly(file);