You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/01/21 18:49:15 UTC
[2/3] cassandra git commit: Add tooling to detect hot partitions
Add tooling to detect hot partitions
Patch by Chris Lohfink, reviewed by brandonwilliams for CASSANDRA-7974
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/faf91818
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/faf91818
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/faf91818
Branch: refs/heads/trunk
Commit: faf91818b46fb51ed576664a1119315e7b7c3383
Parents: 1435b9a
Author: Brandon Williams <br...@apache.org>
Authored: Wed Jan 21 11:45:45 2015 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Jan 21 11:45:45 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 69 ++++++++-
.../cassandra/db/ColumnFamilyStoreMBean.java | 14 ++
.../cassandra/metrics/ColumnFamilyMetrics.java | 19 ++-
.../org/apache/cassandra/tools/NodeProbe.java | 26 +++-
.../org/apache/cassandra/tools/NodeTool.java | 83 ++++++++++-
.../org/apache/cassandra/utils/TopKSampler.java | 139 ++++++++++++++++++
.../apache/cassandra/utils/TopKSamplerTest.java | 147 +++++++++++++++++++
8 files changed, 482 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c2bab8..f1eaa77 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Add tooling to detect hot partitions (CASSANDRA-7974)
* Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608)
* Only stream from unrepaired sstables during inc repair (CASSANDRA-8267)
* Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f7a691e..0c95b0e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -26,14 +26,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.management.*;
+import javax.management.openmbean.*;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
+import com.google.common.base.*;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.json.simple.*;
import org.slf4j.Logger;
@@ -68,14 +66,18 @@ import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamLockfile;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.TopKSampler.SamplerResult;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.memory.MemtableAllocator;
+import com.clearspring.analytics.stream.Counter;
+
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
@@ -102,6 +104,39 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
new NamedThreadFactory("MemtableReclaimMemory"),
"internal");
+ private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"};
+ private static final String[] COUNTER_DESCS = new String[]
+ { "partition key in raw hex bytes",
+ "value of this partition for given sampler",
+ "value is within the error bounds plus or minus of this",
+ "the partition key turned into a human readable format" };
+ private static final CompositeType COUNTER_COMPOSITE_TYPE;
+ private static final TabularType COUNTER_TYPE;
+
+ private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"};
+ private static final String[] SAMPLER_DESCS = new String[]
+ { "cardinality of partitions",
+ "list of counter results" };
+
+ private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS";
+ private static final CompositeType SAMPLING_RESULT;
+
+ static
+ {
+ try
+ {
+ OpenType<?>[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING };
+ COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_NAMES, COUNTER_DESCS, counterTypes);
+ COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_COMPOSITE_TYPE, COUNTER_NAMES);
+
+ OpenType<?>[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE };
+ SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes);
+ } catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
public final Keyspace keyspace;
public final String name;
public final CFMetaData metadata;
@@ -1152,6 +1187,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Memtable mt = data.getMemtableFor(opGroup, replayPosition);
final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
maybeUpdateRowCache(key);
+ metric.samplers.get(Sampler.WRITES).addSample(key.getKey());
metric.writeLatency.addNano(System.nanoTime() - start);
if(timeDelta < Long.MAX_VALUE)
metric.colUpdateTimeDeltaHistogram.update(timeDelta);
@@ -1915,10 +1951,35 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap());
}
+ if (columns != null)
+ metric.samplers.get(Sampler.READS).addSample(filter.key.getKey());
metric.updateSSTableIterated(controller.getSstablesIterated());
return columns;
}
+ public void beginLocalSampling(String sampler, int capacity)
+ {
+ metric.samplers.get(Sampler.valueOf(sampler)).beginSampling(capacity);
+ }
+
+ public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException
+ {
+ SamplerResult<ByteBuffer> samplerResults = metric.samplers.get(Sampler.valueOf(sampler))
+ .finishSampling(count);
+ TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE);
+ for (Counter<ByteBuffer> counter : samplerResults.topK)
+ {
+ byte[] key = counter.getItem().array();
+ result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new Object[] {
+ Hex.bytesToHex(key), // raw
+ counter.getCount(), // count
+ counter.getError(), // error
+ metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string
+ }
+ return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{
+ samplerResults.cardinality, result});
+ }
+
public void cleanupCache()
{
Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index 3418b26..4df593b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -21,6 +21,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+
/**
* The MBean interface for ColumnFamilyStore
*/
@@ -402,4 +405,15 @@ public interface ColumnFamilyStoreMBean
* @return the size of SSTables in "snapshots" subdirectory which aren't live anymore
*/
public long trueSnapshotsSize();
+
+ /**
+ * begin sampling for a specific sampler with a given capacity. The cardinality may
+ * be larger than the capacity, but depending on the use case it may affect its accuracy
+ */
+ public void beginLocalSampling(String sampler, int capacity);
+
+ /**
+ * @return top <i>count</i> items for the sampler since beginLocalSampling was called
+ */
+ public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index b906750..c82569d 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -17,9 +17,8 @@
*/
package org.apache.cassandra.metrics;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.nio.ByteBuffer;
+import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -28,11 +27,13 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.utils.EstimatedHistogram;
+import org.apache.cassandra.utils.TopKSampler;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.*;
+import com.yammer.metrics.core.Timer;
import com.yammer.metrics.util.RatioGauge;
/**
@@ -144,6 +145,7 @@ public class ColumnFamilyMetrics
public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalNameFactory, "Write");
public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalNameFactory, "Range");
+ public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
* stores metrics that will be rolled into a single global metric
*/
@@ -203,6 +205,12 @@ public class ColumnFamilyMetrics
{
factory = new ColumnFamilyMetricNameFactory(cfs);
+ samplers = Maps.newHashMap();
+ for (Sampler sampler : Sampler.values())
+ {
+ samplers.put(sampler, new TopKSampler<ByteBuffer>());
+ }
+
memtableColumnsCount = createColumnFamilyGauge("MemtableColumnsCount", new Gauge<Long>()
{
public Long value()
@@ -766,4 +774,9 @@ public class ColumnFamilyMetrics
return new MetricName(groupName, "ColumnFamily", metricName, "all", mbeanName.toString());
}
}
+
+ public static enum Sampler
+ {
+ READS, WRITES
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 00f9686..67cc7f1 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -28,8 +28,7 @@ import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import javax.management.*;
import javax.management.openmbean.CompositeData;
@@ -37,11 +36,11 @@ import javax.management.remote.JMXConnectionNotification;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
-import javax.management.openmbean.TabularData;
+import javax.management.openmbean.*;
import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.yammer.metrics.reporting.JmxReporter;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
@@ -53,6 +52,7 @@ import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.FailureDetectorMBean;
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.repair.RepairParallelism;
@@ -312,6 +312,22 @@ public class NodeProbe implements AutoCloseable
}
}
+ public Map<Sampler, CompositeData> getPartitionSample(String ks, String cf, int capacity, int duration, int count, List<Sampler> samplers) throws OpenDataException
+ {
+ ColumnFamilyStoreMBean cfsProxy = getCfsProxy(ks, cf);
+ for(Sampler sampler : samplers)
+ {
+ cfsProxy.beginLocalSampling(sampler.name(), capacity);
+ }
+ Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS);
+ Map<Sampler, CompositeData> result = Maps.newHashMap();
+ for(Sampler sampler : samplers)
+ {
+ result.put(sampler, cfsProxy.finishLocalSampling(sampler.name(), count));
+ }
+ return result;
+ }
+
public void invalidateCounterCache()
{
cacheService.invalidateCounterCache();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 2cc0b98..12496fc 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -27,13 +27,11 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
-import javax.management.openmbean.TabularData;
+import javax.management.openmbean.*;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.LinkedHashMultimap;
-import com.google.common.collect.Maps;
+import com.google.common.collect.*;
import com.yammer.metrics.reporting.JmxReporter;
import io.airlift.command.*;
@@ -47,6 +45,7 @@ import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
import org.apache.cassandra.locator.LocalStrategy;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.service.CacheServiceMBean;
@@ -146,6 +145,7 @@ public class NodeTool
Drain.class,
TruncateHints.class,
TpStats.class,
+ TopPartitions.class,
SetLoggingLevel.class,
GetLoggingLevels.class
);
@@ -925,6 +925,81 @@ public class NodeTool
}
}
+ @Command(name = "toppartitions", description = "Sample and print the most active partitions for a given column family")
+ public static class TopPartitions extends NodeToolCmd
+ {
+ @Arguments(usage = "<keyspace> <cfname> <duration>", description = "The keyspace, column family name, and duration in milliseconds")
+ private List<String> args = new ArrayList<>();
+ @Option(name = "-s", description = "Capacity of stream summary, closer to the actual cardinality of partitions will yield more accurate results (Default: 256)")
+ private int size = 256;
+ @Option(name = "-k", description = "Number of the top partitions to list (Default: 10)")
+ private int topCount = 10;
+ @Option(name = "-a", description = "Comma separated list of samplers to use (Default: all)")
+ private String samplers = join(Sampler.values(), ',');
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(args.size() == 3, "toppartitions requires keyspace, column family name, and duration");
+ checkArgument(topCount < size, "TopK count (-k) option must be smaller then the summary capacity (-s)");
+ String keyspace = args.get(0);
+ String cfname = args.get(1);
+ Integer duration = Integer.parseInt(args.get(2));
+ // generate the list of samplers
+ List<Sampler> targets = Lists.newArrayList();
+ for (String s : samplers.split(","))
+ {
+ try
+ {
+ targets.add(Sampler.valueOf(s.toUpperCase()));
+ } catch (Exception e)
+ {
+ throw new IllegalArgumentException(s + " is not a valid sampler, choose one of: " + join(Sampler.values(), ", "));
+ }
+ }
+
+ Map<Sampler, CompositeData> results;
+ try
+ {
+ results = probe.getPartitionSample(keyspace, cfname, size, duration, topCount, targets);
+ } catch (OpenDataException e)
+ {
+ throw new RuntimeException(e);
+ }
+ boolean first = true;
+ for(Entry<Sampler, CompositeData> result : results.entrySet())
+ {
+ CompositeData sampling = result.getValue();
+ // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436
+ List<CompositeData> topk = (List<CompositeData>) (Object) Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values());
+ Collections.sort(topk, new Ordering<CompositeData>()
+ {
+ public int compare(CompositeData left, CompositeData right)
+ {
+ return Long.compare((long) right.get("count"), (long) left.get("count"));
+ }
+ });
+ if(!first)
+ System.out.println();
+ System.out.println(result.getKey().toString()+ " Sampler:");
+ System.out.printf(" Cardinality: ~%d (%d capacity)%n", (long) sampling.get("cardinality"), size);
+ System.out.printf(" Top %d partitions:%n", topCount);
+ if (topk.size() == 0)
+ {
+ System.out.println("\tNothing recorded during sampling period...");
+ } else
+ {
+ int offset = 0;
+ for (CompositeData entry : topk)
+ offset = Math.max(offset, entry.get("string").toString().length());
+ System.out.printf("\t%-" + offset + "s%10s%10s%n", "Partition", "Count", "+/-");
+ for (CompositeData entry : topk)
+ System.out.printf("\t%-" + offset + "s%10d%10d%n", entry.get("string").toString(), entry.get("count"), entry.get("error"));
+ }
+ first = false;
+ }
+ }
+ }
+
@Command(name = "cfhistograms", description = "Print statistic histograms for a given column family")
public static class CfHistograms extends NodeToolCmd
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/src/java/org/apache/cassandra/utils/TopKSampler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/TopKSampler.java b/src/java/org/apache/cassandra/utils/TopKSampler.java
new file mode 100644
index 0000000..29d46286
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/TopKSampler.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.apache.cassandra.concurrent.*;
+import org.slf4j.*;
+
+import com.clearspring.analytics.stream.*;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.google.common.annotations.VisibleForTesting;
+
+public class TopKSampler<T>
+{
+ private static final Logger logger = LoggerFactory.getLogger(TopKSampler.class);
+ private volatile boolean enabled = false;
+
+ @VisibleForTesting
+ static final ThreadPoolExecutor samplerExecutor = new JMXEnabledThreadPoolExecutor(1, 1,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("Sampler"),
+ "internal");
+
+ private StreamSummary<T> summary;
+ @VisibleForTesting
+ HyperLogLogPlus hll;
+
+ /**
+ * Start to record samples
+ *
+ * @param capacity
+ * Number of sample items to keep in memory, the lower this is
+ * the less accurate results are. For best results use value
+ * close to cardinality, but understand the memory trade offs.
+ */
+ public synchronized void beginSampling(int capacity)
+ {
+ if (!enabled)
+ {
+ summary = new StreamSummary<T>(capacity);
+ hll = new HyperLogLogPlus(14);
+ enabled = true;
+ }
+ }
+
+ /**
+ * Call to stop collecting samples, and gather the results
+ * @param count Number of most frequent items to return
+ */
+ public synchronized SamplerResult<T> finishSampling(int count)
+ {
+ List<Counter<T>> results = Collections.EMPTY_LIST;
+ long cardinality = 0;
+ if (enabled)
+ {
+ enabled = false;
+ results = summary.topK(count);
+ cardinality = hll.cardinality();
+ }
+ return new SamplerResult<T>(results, cardinality);
+ }
+
+ public void addSample(T item)
+ {
+ addSample(item, 1);
+ }
+
+ /**
+ * Adds a sample to statistics collection. This method is non-blocking and will
+ * use the "Sampler" thread pool to record results if the sampler is enabled. If not
+ * sampling this is a NOOP
+ */
+ public void addSample(final T item, final int value)
+ {
+ if (enabled)
+ {
+ final Object lock = this;
+ samplerExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ // samplerExecutor is single threaded but still need
+ // synchronization against jmx calls to finishSampling
+ synchronized (lock)
+ {
+ if (enabled)
+ {
+ try
+ {
+ summary.offer(item, value);
+ hll.offer(item);
+ } catch (Exception e)
+ {
+ logger.debug("Failure to offer sample", e);
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Represents the cardinality and the topK ranked items collected during a
+ * sample period
+ */
+ public static class SamplerResult<S> implements Serializable
+ {
+ public final List<Counter<S>> topK;
+ public final long cardinality;
+
+ public SamplerResult(List<Counter<S>> topK, long cardinality)
+ {
+ this.topK = topK;
+ this.cardinality = cardinality;
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/faf91818/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
new file mode 100644
index 0000000..dc3b91c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
@@ -0,0 +1,147 @@
+package org.apache.cassandra.utils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.utils.TopKSampler.SamplerResult;
+import org.junit.Test;
+
+import com.clearspring.analytics.stream.Counter;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class TopKSamplerTest
+{
+
+ @Test
+ public void testSamplerSingleInsertionsEqualMulti() throws TimeoutException
+ {
+ TopKSampler<String> sampler = new TopKSampler<String>();
+ sampler.beginSampling(10);
+ insert(sampler);
+ waitForEmpty(1000);
+ SamplerResult single = sampler.finishSampling(10);
+
+ TopKSampler<String> sampler2 = new TopKSampler<String>();
+ sampler2.beginSampling(10);
+ for(int i = 1; i <= 10; i++)
+ {
+ sampler2.addSample("item" + i, i);
+ }
+ waitForEmpty(1000);
+ Assert.assertEquals(countMap(single.topK), countMap(sampler2.finishSampling(10).topK));
+ Assert.assertEquals(sampler.hll.cardinality(), sampler2.hll.cardinality());
+ }
+
+ @Test
+ public void testSamplerOutOfOrder() throws TimeoutException
+ {
+ TopKSampler<String> sampler = new TopKSampler<String>();
+ sampler.beginSampling(10);
+ insert(sampler);
+ waitForEmpty(1000);
+ SamplerResult single = sampler.finishSampling(10);
+ single = sampler.finishSampling(10);
+ }
+
+ /**
+ * checking for exceptions from SS/HLL which are not thread safe
+ */
+ @Test
+ public void testMultithreadedAccess() throws Exception
+ {
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final TopKSampler<String> sampler = new TopKSampler<String>();
+
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ while (running.get())
+ {
+ insert(sampler);
+ }
+ } finally
+ {
+ latch.countDown();
+ }
+ }
+
+ }
+ ,"inserter").start();
+ try
+ {
+ // start/stop in fast iterations
+ for(int i = 0; i<100; i++)
+ {
+ sampler.beginSampling(i);
+ sampler.finishSampling(i);
+ }
+ // start/stop with pause to let it build up past capacity
+ for(int i = 0; i<3; i++)
+ {
+ sampler.beginSampling(i);
+ Thread.sleep(250);
+ sampler.finishSampling(i);
+ }
+
+ // with empty results
+ running.set(false);
+ latch.await(1, TimeUnit.SECONDS);
+ waitForEmpty(1000);
+ for(int i = 0; i<10; i++)
+ {
+ sampler.beginSampling(i);
+ Thread.sleep(i);
+ sampler.finishSampling(i);
+ }
+ } finally
+ {
+ running.set(false);
+ }
+ }
+
+ private void insert(TopKSampler<String> sampler)
+ {
+ for(int i = 1; i <= 10; i++)
+ {
+ for(int j = 0; j < i; j++)
+ {
+ sampler.addSample("item" + i);
+ }
+ }
+ }
+
+ private void waitForEmpty(int timeoutMs) throws TimeoutException
+ {
+ int timeout = 0;
+ while (!TopKSampler.samplerExecutor.getQueue().isEmpty())
+ {
+ timeout++;
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ if (timeout * 100 > timeoutMs)
+ {
+ throw new TimeoutException("TRACE executor not cleared within timeout");
+ }
+ }
+ }
+
+ private <T> Map<T, Long> countMap(List<Counter<T>> target)
+ {
+ Map<T, Long> counts = Maps.newHashMap();
+ for(Counter<T> counter : target)
+ {
+ counts.put(counter.getItem(), counter.getCount());
+ }
+ return counts;
+ }
+}