You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/01/22 21:23:41 UTC
[2/7] storm git commit: STORM-2887: store metrics into RocksDB
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
new file mode 100644
index 0000000..2f2ad76
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.Map;
+import org.apache.storm.DaemonConfig;
+
+
+public class MetricStoreConfig {
+
+ /**
+ * Configures metrics store to use the class specified in the conf.
+ * @param conf Storm config map
+ * @return MetricStore prepared store
+ * @throws MetricException on misconfiguration
+ */
+ public static MetricStore configure(Map conf) throws MetricException {
+
+ try {
+ String storeClass = (String)conf.get(DaemonConfig.STORM_METRIC_STORE_CLASS);
+ MetricStore store = (MetricStore) (Class.forName(storeClass)).newInstance();
+ store.prepare(conf);
+ return store;
+ } catch (Throwable t) {
+ throw new MetricException("Failed to create metric store", t);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java
new file mode 100644
index 0000000..a351be7
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Specifies all the valid types of keys and their values.
+ */
+public enum KeyType {
+ TOPOLOGY_BLOB(0), // reserved for saving topology data
+ METADATA_STRING_START(1),
+ TOPOLOGY_STRING(1),
+ METRIC_STRING(2),
+ COMPONENT_STRING(3),
+ EXEC_ID_STRING(4),
+ HOST_STRING(5),
+ STREAM_ID_STRING(6),
+ METADATA_STRING_END(7),
+ METRIC_DATA(0x80);
+
+ private final byte value;
+ private static Map<Byte, KeyType> MAP;
+
+ static {
+ MAP = new HashMap<>();
+ for (KeyType type : EnumSet.allOf(KeyType.class)) {
+ MAP.put(type.getValue(), type);
+ }
+ MAP = Collections.unmodifiableMap(MAP);
+ }
+
+ KeyType(int value) {
+ this.value = (byte)value;
+ }
+
+ byte getValue() {
+ return this.value;
+ }
+
+ static KeyType getKeyType(byte value) {
+ KeyType type = MAP.get(value);
+ if (type == null) {
+ throw new RuntimeException("Invalid key type " + value);
+ } else {
+ return type;
+ }
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
new file mode 100644
index 0000000..6618f5d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.MetricException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for removing expired metrics and unused metadata from the RocksDB store.
+ */
+public class MetricsCleaner implements Runnable, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricsCleaner.class);
+ private static long DEFAULT_SLEEP_MS = 4L * 60L * 60L * 1000L;
+ private RocksDbStore store;
+ private long retentionHours;
+ private volatile boolean shutdown = false;
+ private long sleepMs = DEFAULT_SLEEP_MS;
+ private Meter failureMeter;
+ private long purgeTimestamp = 0L;
+
+ MetricsCleaner(RocksDbStore store, int retentionHours, int hourlyPeriod, Meter failureMeter) {
+ this.store = store;
+ this.retentionHours = retentionHours;
+ if (hourlyPeriod > 0) {
+ this.sleepMs = hourlyPeriod * 60L * 60L * 1000L;
+ }
+ this.failureMeter = failureMeter;
+
+ Gauge<Long> gauge = new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return purgeTimestamp;
+ }
+ };
+ StormMetricsRegistry.registerProvidedGauge("MetricsCleaner:purgeTimestamp", gauge);
+ }
+
+ @Override
+ public void close() {
+ shutdown = true;
+ }
+
+ @Override
+ public void run() {
+ while (!shutdown) {
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ LOG.error("Sleep interrupted", e);
+ continue;
+ }
+
+ try {
+ purgeMetrics();
+ } catch (MetricException e) {
+ LOG.error("Failed to purge metrics", e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ }
+ }
+ }
+
+ void purgeMetrics() throws MetricException {
+ purgeTimestamp = System.currentTimeMillis() - this.retentionHours * 60L * 60L * 1000L;
+
+ LOG.info("Purging metrics before {}", purgeTimestamp);
+
+ FilterOptions filter = new FilterOptions();
+ long endTime = purgeTimestamp - 1L;
+ filter.setEndTime(endTime);
+ store.deleteMetrics(filter);
+
+ LOG.info("Purging metadata before " + purgeTimestamp);
+ store.deleteMetadataBefore(purgeTimestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java
new file mode 100644
index 0000000..0effbc4
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import org.apache.http.annotation.ThreadSafe;
+
+/**
+ * The read-only interface to a StringMetadataCache allowed to be used by any thread.
+ */
+@ThreadSafe
+public interface ReadOnlyStringMetadataCache {
+
+ /**
+ * Get the string metadata from the cache.
+ *
+ * @param s The string to look for
+ * @return the metadata associated with the string or null if not found
+ */
+ StringMetadata get(String s);
+
+ /**
+ * Returns the string matching the string Id if in the cache.
+ *
+ * @param stringId The string Id to check
+ * @return the associated string if the Id is in the cache, null otherwise
+ */
+ String getMetadataString(Integer stringId);
+
+ /**
+ * Determines if a string Id is contained in the cache.
+ *
+ * @param stringId The string Id to check
+ * @return true if the Id is in the cache, false otherwise
+ */
+ boolean contains(Integer stringId);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java
new file mode 100644
index 0000000..7868282
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.google.common.primitives.UnsignedBytes;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.storm.metricstore.AggLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class representing the data used as a Key in RocksDB. Keys can be used either for metadata or metrics.
+ *
+ * <P>Keys are 38 bytes in size. The fields for a key are:
+ * <pre><
+ * Field Size Offset
+ *
+ * Type 1 0 The type maps to the KeyType enum, specifying a metric or various types of metadata
+ * Aggregation Level 1 1 The aggregation level for a metric (see AggLevel enum). 0 for metadata.
+ * TopologyId 4 2 The metadata string Id representing a topologyId for a metric, or the unique
+ * string Id for a metadata string
+ * Timestamp 8 6 The timestamp for a metric, unused for metadata
+ * MetricId 4 14 The metadata string Id for the metric name
+ * ComponentId 4 18 The metadata string Id for the component Id
+ * ExecutorId 4 22 The metadata string Id for the executor Id
+ * HostId 4 26 The metadata string Id for the host Id
+ * Port 4 30 The port number
+ * StreamId 4 34 The metadata string Id for the stream Id
+ * </pre>
+ */
+public class RocksDbKey implements Comparable<RocksDbKey> {
+ private static final Logger LOG = LoggerFactory.getLogger(RocksDbKey.class);
+ static final int KEY_SIZE = 38;
+ private static Map<Byte, RocksDbKey> PREFIX_MAP = new HashMap<>();
+ private byte[] key;
+
+ static {
+ // pregenerate commonly used keys for scans
+ for (KeyType type : EnumSet.allOf(KeyType.class)) {
+ RocksDbKey key = new RocksDbKey(type, 0);
+ PREFIX_MAP.put(type.getValue(), key);
+ }
+ PREFIX_MAP = Collections.unmodifiableMap(PREFIX_MAP);
+ }
+
+ /**
+ * Constructor for a RocksDB key for a metadata string.
+ *
+ * @param type type of metadata string
+ * @param metadataStringId the string Id for the string (stored in the topologyId portion of the key)
+ */
+ RocksDbKey(KeyType type, int metadataStringId) {
+ byte[] key = new byte[KEY_SIZE];
+ ByteBuffer bb = ByteBuffer.wrap(key);
+ bb.put(type.getValue());
+ bb.put(AggLevel.AGG_LEVEL_NONE.getValue());
+ bb.putInt(metadataStringId);
+ this.key = key;
+ }
+
+ /**
+ * Constructor for a RocksDB key from raw data.
+ *
+ * @param raw the key data
+ */
+ RocksDbKey(byte[] raw) {
+ this.key = raw;
+ }
+
+
+ /**
+ * Get a zeroed key of the specified type.
+ *
+ * @param type the desired type
+ * @return a key of the desired type
+ */
+ static RocksDbKey getPrefix(KeyType type) {
+ return PREFIX_MAP.get(type.getValue());
+ }
+
+ /**
+ * get the metadata string Id portion of the key for metadata keys.
+ *
+ * @return the metadata string Id
+ * @throws RuntimeException if the key is not a metadata type
+ */
+ int getMetadataStringId() {
+ if (this.getType().getValue() < KeyType.METADATA_STRING_END.getValue()) {
+ return ByteBuffer.wrap(key, 2, 4).getInt();
+ } else {
+ throw new RuntimeException("Cannot fetch metadata string for key of type " + this.getType());
+ }
+ }
+
+ /**
+ * get the raw key bytes
+ */
+ byte[] getRaw() {
+ return this.key;
+ }
+
+ /**
+ * get the type of key.
+ *
+ * @return the type of key
+ */
+ KeyType getType() {
+ return KeyType.getKeyType(key[0]);
+ }
+
+ /**
+ * compares to keys on a byte by byte basis.
+ *
+ * @return comparison of key byte values
+ */
+ @Override
+ public int compareTo(RocksDbKey o) {
+ return UnsignedBytes.lexicographicalComparator().compare(this.getRaw(), o.getRaw());
+ }
+
+ /**
+ * gets the first possible key value for the desired key type.
+ *
+ * @return the initial key
+ */
+ static RocksDbKey getInitialKey(KeyType type) {
+ return PREFIX_MAP.get(type.getValue());
+ }
+
+ /**
+ * gets the key just larger than the last possible key value for the desired key type.
+ *
+ * @return the last key
+ */
+ static RocksDbKey getLastKey(KeyType type) {
+ byte value = (byte)(type.getValue() + 1);
+ return PREFIX_MAP.get(value);
+ }
+
+ /**
+ * Creates a metric key with the desired properties.
+ *
+ * @return the generated key
+ */
+ static RocksDbKey createMetricKey(AggLevel aggLevel, int topologyId, long metricTimestamp, int metricId,
+ int componentId, int executorId, int hostId, int port,
+ int streamId) {
+ byte[] raw = new byte[KEY_SIZE];
+ ByteBuffer bb = ByteBuffer.wrap(raw);
+ bb.put(KeyType.METRIC_DATA.getValue());
+ bb.put(aggLevel.getValue());
+ bb.putInt(topologyId); // offset 2
+ bb.putLong(metricTimestamp); // offset 6
+ bb.putInt(metricId); // offset 14
+ bb.putInt(componentId); // offset 18
+ bb.putInt(executorId); // offset 22
+ bb.putInt(hostId); // offset 26
+ bb.putInt(port); // offset 30
+ bb.putInt(streamId); // offset 34
+
+ RocksDbKey key = new RocksDbKey(raw);
+ return key;
+ }
+
+ /**
+ * Get the unique string Id for a metric's topologyId.
+ */
+ int getTopologyId() {
+ int val = ByteBuffer.wrap(key, 2, 4).getInt();
+ return val;
+ }
+
+ long getTimestamp() {
+ return ByteBuffer.wrap(key, 6, 8).getLong();
+ }
+
+ int getMetricId() {
+ return ByteBuffer.wrap(key, 14, 4).getInt();
+ }
+
+ int getComponentId() {
+ return ByteBuffer.wrap(key, 18, 4).getInt();
+ }
+
+ int getExecutorId() {
+ return ByteBuffer.wrap(key, 22, 4).getInt();
+ }
+
+ int getHostnameId() {
+ return ByteBuffer.wrap(key, 26, 4).getInt();
+ }
+
+ int getPort() {
+ return ByteBuffer.wrap(key, 30, 4).getInt();
+ }
+
+ int getStreamId() {
+ return ByteBuffer.wrap(key, 34, 4).getInt();
+ }
+
+ @Override
+ public String toString() {
+ return "[0x" + DatatypeConverter.printHexBinary(key) + "]";
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
new file mode 100644
index 0000000..a050a76
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class designed to perform all metrics inserts into RocksDB. Metrics are processed from a blocking queue. Inserts
+ * to RocksDB are done using a single thread to simplify design (such as looking up existing metric data for aggregation,
+ * and fetching/evicting metadata from the cache).
+ * </P>
+ * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids. As entries are added to the full cache, older
+ * entries are evicted from the cache and need to be written to the database. This happens as the handleEvictedMetadata()
+ * method callback.
+ * </P>
+ * The following issues would need to be addressed to implement a multithreaded metrics writer:
+ * <ul>
+ * <li>Generation of unique unused IDs for new metadata strings needs to be thread safe.</li>
+ * <li>Ensuring newly created metadata strings are seen by all threads.</li>
+ * <li>Maintaining a properly cached state of metadata for multiple writers. The current LRU cache
+ * evicts data as new metadata is added.</li>
+ * <li>Processing the aggregation of a metric requires fetching and updating previous aggregates. A multithreaded
+ * design would need to ensure two metrics were not updating an aggregated metric at the same time.</li>
+ * <li>Investigate performance of multiple threads inserting into RocksDB versus a single ordered insert.</li>
+ * </ul>
+ */
+@NotThreadSafe
+public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
+ private RocksDbStore store;
+ private BlockingQueue queue;
+ private WritableStringMetadataCache stringMetadataCache;
+ private Set<Integer> unusedIds = new HashSet<>();
+ private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
+ private WriteOptions writeOpts = new WriteOptions();
+ private volatile boolean shutdown = false;
+ private Meter failureMeter;
+ private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
+
+ /**
+ * Constructor for the RocksDbMetricsWriter.
+ *
+ * @param store The RocksDB store
+ * @param queue The queue to receive metrics for insertion
+ */
+ RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter) {
+ this.store = store;
+ this.queue = queue;
+ this.failureMeter = failureMeter;
+
+ aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
+ aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
+ aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
+ }
+
+ /**
+ * Init routine called once the Metadata cache has been created.
+ *
+ * @throws MetricException on cache error
+ */
+ void init() throws MetricException {
+ this.stringMetadataCache = StringMetadataCache.getWritableStringMetadataCache();
+ }
+
+ /**
+ * Run routine to wait for metrics on a queue and insert into RocksDB.
+ */
+ @Override
+ public void run() {
+ while (!shutdown) {
+ try {
+ Metric m = (Metric) queue.take();
+ processInsert(m);
+ } catch (Exception e) {
+ LOG.error("Failed to insert metric", e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ }
+ }
+ }
+
+ /**
+ * Performs the actual metric insert, and aggregates over all bucket times.
+ *
+ * @param metric Metric to store
+ * @throws MetricException if database write fails
+ */
+ private void processInsert(Metric metric) throws MetricException {
+
+ // convert all strings to numeric Ids for the metric key and add to the metadata cache
+ long metricTimestamp = metric.getTimestamp();
+ Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), metricTimestamp);
+ Integer metricId = storeMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), metricTimestamp);
+ Integer componentId = storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), metricTimestamp);
+ Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), metricTimestamp);
+ Integer hostId = storeMetadataString(KeyType.HOST_STRING, metric.getHostname(), metricTimestamp);
+ Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), metricTimestamp);
+
+ RocksDbKey key = RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, metric.getTimestamp(), metricId,
+ componentId, executorId, hostId, metric.getPort(), streamId);
+
+ // save metric key/value to be batched
+ RocksDbValue value = new RocksDbValue(metric);
+ insertBatch.put(key, value);
+
+ // Aggregate matching metrics over bucket timeframes.
+ // We'll process starting with the longest bucket. If the metric for this does not exist, we don't have to
+ // search for the remaining bucket metrics.
+ ListIterator li = aggBuckets.listIterator(aggBuckets.size());
+ boolean populate = true;
+ while (li.hasPrevious()) {
+ AggLevel bucket = (AggLevel)li.previous();
+ Metric aggMetric = new Metric(metric);
+ aggMetric.setAggLevel(bucket);
+
+ long msToBucket = 1000L * 60L * bucket.getValue();
+ long roundedToBucket = msToBucket * (metric.getTimestamp() / msToBucket);
+ aggMetric.setTimestamp(roundedToBucket);
+
+ RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, topologyId, aggMetric.getTimestamp(), metricId,
+ componentId, executorId, hostId, aggMetric.getPort(), streamId);
+
+ if (populate) {
+ // retrieve any existing aggregation matching this one and update the values
+ if (store.populateFromKey(aggKey, aggMetric)) {
+ aggMetric.addValue(metric.getValue());
+ } else {
+ // aggregating metric did not exist, don't look for further ones with smaller timestamps
+ populate = false;
+ }
+ }
+
+ // save metric key/value to be batched
+ RocksDbValue aggVal = new RocksDbValue(aggMetric);
+ insertBatch.put(aggKey, aggVal);
+ }
+
+ processBatchInsert(insertBatch);
+
+ insertBatch.clear();
+ }
+
+ // converts a metadata string into a unique integer. Updates the timestamp of the string
+ // so we can track when it was last used for later deletion on database cleanup.
+ private int storeMetadataString(KeyType type, String s, long metricTimestamp) throws MetricException {
+ if (s == null) {
+ throw new MetricException("No string for metric metadata string type " + type);
+ }
+
+ // attempt to find it in the string cache
+ StringMetadata stringMetadata = stringMetadataCache.get(s);
+ if (stringMetadata != null) {
+ // make sure the timestamp on the metadata has the latest time
+ stringMetadata.update(metricTimestamp, type);
+ return stringMetadata.getStringId();
+ }
+
+ // attempt to find the string in the database
+ stringMetadata = store.rocksDbGetStringMetadata(type, s);
+ if (stringMetadata != null) {
+ // update to the latest timestamp and add to the string cache
+ stringMetadata.update(metricTimestamp, type);
+ stringMetadataCache.put(s, stringMetadata, false);
+ return stringMetadata.getStringId();
+ }
+
+ // string does not exist, create using an unique string id and add to cache
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(type + "." + s + " does not exist in cache or database");
+ }
+ int stringId = getUniqueMetadataStringId();
+ stringMetadata = new StringMetadata(type, stringId, metricTimestamp);
+ stringMetadataCache.put(s, stringMetadata, true);
+
+ return stringMetadata.getStringId();
+ }
+
+ // get a currently unused unique string id
+ private int getUniqueMetadataStringId() throws MetricException {
+ generateUniqueStringIds();
+ int id = unusedIds.iterator().next();
+ unusedIds.remove(id);
+ return id;
+ }
+
+ // guarantees a list of unused string Ids exists. Once the list is empty, creates a new list
+ // by generating a list of random numbers and removing the ones that already are in use.
+ private void generateUniqueStringIds() throws MetricException {
+ int attempts = 0;
+ while (unusedIds.isEmpty()) {
+ attempts++;
+ if (attempts > 100) {
+ String message = "Failed to generate unique ids";
+ LOG.error(message);
+ throw new MetricException(message);
+ }
+ for (int i = 0; i < 600; i++) {
+ int n = ThreadLocalRandom.current().nextInt();
+ if (n == RocksDbStore.INVALID_METADATA_STRING_ID) {
+ continue;
+ }
+ // remove any entries in the cache
+ if (stringMetadataCache.contains(n)) {
+ continue;
+ }
+ unusedIds.add(n);
+ }
+ // now scan all metadata and remove any matching string Ids from this list
+ RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
+ RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
+ store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
+ unusedIds.remove(key.getMetadataStringId());
+ return true; // process all metadata
+ });
+ }
+ }
+
+ // writes multiple metric values into the database as a batch operation. The tree map keeps the keys sorted
+ // for faster insertion to RocksDB.
+ private void processBatchInsert(TreeMap<RocksDbKey, RocksDbValue> batchMap) throws MetricException {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ // take the batched metric data and write to the database
+ for (RocksDbKey k : batchMap.keySet()) {
+ RocksDbValue v = batchMap.get(k);
+ writeBatch.put(k.getRaw(), v.getRaw());
+ }
+ store.db.write(writeOpts, writeBatch);
+ } catch (Exception e) {
+ String message = "Failed to store data to RocksDB";
+ LOG.error(message, e);
+ throw new MetricException(message, e);
+ }
+ }
+
+ // evicted metadata needs to be stored immediately. Metadata lookups count on it being in the cache
+ // or database.
+ void handleEvictedMetadata(RocksDbKey key, RocksDbValue val) {
+ try {
+ store.db.put(key.getRaw(), val.getRaw());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ boolean isShutdown() {
+ return this.shutdown;
+ }
+
+ @Override
+ public void close() {
+ this.shutdown = true;
+
+ // get all metadata from the cache to put into the database
+ TreeMap<RocksDbKey, RocksDbValue> batchMap = new TreeMap<>(); // use a new map to prevent threading issues with writer thread
+ for (Map.Entry entry : stringMetadataCache.entrySet()) {
+ String metadataString = (String)entry.getKey();
+ StringMetadata val = (StringMetadata)entry.getValue();
+ RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), metadataString);
+
+ for (KeyType type : val.getMetadataTypes()) { // save the metadata for all types of strings it matches
+ RocksDbKey rkey = new RocksDbKey(type, val.getStringId());
+ batchMap.put(rkey, rval);
+ }
+ }
+
+ try {
+ processBatchInsert(batchMap);
+ } catch (MetricException e) {
+ LOG.error("Failed to insert all metadata", e);
+ }
+
+ // flush db to disk
+ try (FlushOptions flushOps = new FlushOptions()) {
+ flushOps.setWaitForFlush(true);
+ store.db.flush(flushOps);
+ } catch (RocksDBException e) {
+ LOG.error("Failed ot flush RocksDB", e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
new file mode 100644
index 0000000..2f44aff
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.utils.ObjectReader;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.IndexType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDbStore implements MetricStore, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
+ private static final int MAX_QUEUE_CAPACITY = 4000;
+ static final int INVALID_METADATA_STRING_ID = 0;
+ RocksDB db;
+ private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
+ private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
+ private RocksDbMetricsWriter metricsWriter = null;
+ private MetricsCleaner metricsCleaner = null;
+ private Meter failureMeter = null;
+
+ interface RocksDbScanCallback {
+ boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan
+ }
+
+ /**
+ * Create metric store instance using the configurations provided via the config map.
+ *
+ * @param config Storm config map
+ * @throws MetricException on preparation error
+ */
+ public void prepare(Map config) throws MetricException {
+ validateConfig(config);
+
+ this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+
+ RocksDB.loadLibrary();
+ boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
+
+ try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
+ // use the hash index for prefix searches
+ BlockBasedTableConfig tfc = new BlockBasedTableConfig();
+ tfc.setIndexType(IndexType.kHashSearch);
+ options.setTableFormatConfig(tfc);
+ options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
+
+ String path = getRocksDbAbsoluteDir(config);
+ LOG.info("Opening RocksDB from {}", path);
+ db = RocksDB.open(options, path);
+ } catch (RocksDBException e) {
+ String message = "Error opening RockDB database";
+ LOG.error(message, e);
+ throw new MetricException(message, e);
+ }
+
+ // create thread to delete old metrics and metadata
+ Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
+ Integer deletionPeriod = 0;
+ if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
+ deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
+ }
+ metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter);
+
+ // create thread to process insertion of all metrics
+ metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
+
+ int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
+ StringMetadataCache.init(metricsWriter, cacheCapacity);
+ readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
+ metricsWriter.init(); // init the writer once the cache is setup
+
+ // start threads after metadata cache created
+ Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
+ thread.setDaemon(true);
+ thread.start();
+
+ thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ /**
+ * Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store.
+ *
+ * @param config Storm config to specify which store type, location of store and creation policy
+ * @throws MetricException if there is a missing required configuration or if the store does not exist but
+ * the config specifies not to create the store
+ */
+ private void validateConfig(Map config) throws MetricException {
+ if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
+ throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
+ }
+
+ if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
+ throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy "
+ + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
+ }
+
+ // validate path defined
+ String storePath = getRocksDbAbsoluteDir(config);
+
+ boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
+ if (!createIfMissing) {
+ if (!(new File(storePath).exists())) {
+ throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath);
+ }
+ }
+
+ if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) {
+ throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size "
+ + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
+ }
+
+ if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
+ throw new MetricException("Not a valid RocksDB configuration - Missing metric retention "
+ + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
+ }
+ }
+
+ private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
+ String storePath = (String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
+ if (storePath == null) {
+ throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
+ } else {
+ if (new File(storePath).isAbsolute()) {
+ return storePath;
+ } else {
+ String stormHome = System.getProperty("storm.home");
+ if (stormHome == null) {
+ throw new MetricException("storm.home not set");
+ }
+ return (stormHome + File.separator + storePath);
+ }
+ }
+ }
+
+ /**
+ * Stores metrics in the store.
+ *
+ * @param metric Metric to store
+ * @throws MetricException if database write fails
+ */
+ public void insert(Metric metric) throws MetricException {
+ try {
+ // don't bother blocking on a full queue, just drop metrics in case we can't keep up
+ if (queue.remainingCapacity() <= 0) {
+ LOG.info("Metrics q full, dropping metric");
+ return;
+ }
+ queue.put(metric);
+ } catch (Exception e) {
+ String message = "Failed to insert metric";
+ LOG.error(message, e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ throw new MetricException(message, e);
+ }
+ }
+
+ /**
+ * Fill out the numeric values for a metric.
+ *
+ * @param metric Metric to populate
+ * @return true if the metric was populated, false otherwise
+ * @throws MetricException if read from database fails
+ */
+ @Override
+ public boolean populateValue(Metric metric) throws MetricException {
+ Map<String, Integer> localLookupCache = new HashMap<>(6);
+
+ int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), localLookupCache);
+ if (INVALID_METADATA_STRING_ID == topologyId) {
+ return false;
+ }
+ int metricId = lookupMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), localLookupCache);
+ if (INVALID_METADATA_STRING_ID == metricId) {
+ return false;
+ }
+ int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), localLookupCache);
+ if (INVALID_METADATA_STRING_ID == componentId) {
+ return false;
+ }
+ int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), localLookupCache);
+ if (INVALID_METADATA_STRING_ID == executorId) {
+ return false;
+ }
+ int hostId = lookupMetadataString(KeyType.HOST_STRING, metric.getHostname(), localLookupCache);
+ if (INVALID_METADATA_STRING_ID == hostId) {
+ return false;
+ }
+ int streamId = lookupMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), localLookupCache);
+ if (INVALID_METADATA_STRING_ID == streamId) {
+ return false;
+ }
+
+ RocksDbKey key = RocksDbKey.createMetricKey(metric.getAggLevel(), topologyId, metric.getTimestamp(), metricId,
+ componentId, executorId, hostId, metric.getPort(), streamId);
+
+ return populateFromKey(key, metric);
+ }
+
+ // populate metric values using the provided key
+ boolean populateFromKey(RocksDbKey key, Metric metric) throws MetricException {
+ try {
+ byte[] value = db.get(key.getRaw());
+ if (value == null) {
+ return false;
+ }
+ RocksDbValue rdbValue = new RocksDbValue(value);
+ rdbValue.populateMetric(metric);
+ } catch (Exception e) {
+ String message = "Failed to populate metric";
+ LOG.error(message, e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ throw new MetricException(message, e);
+ }
+ return true;
+ }
+
+ // attempts to lookup the unique Id for a string that may not exist yet. Returns INVALID_METADATA_STRING_ID
+ // if it does not exist.
+ private int lookupMetadataString(KeyType type, String s, Map<String, Integer> lookupCache) throws MetricException {
+ if (s == null) {
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ throw new MetricException("No string for metric metadata string type " + type);
+ }
+
+ // attempt to find it in the string cache, this will update the LRU
+ StringMetadata stringMetadata = readOnlyStringMetadataCache.get(s);
+ if (stringMetadata != null) {
+ return stringMetadata.getStringId();
+ }
+
+ // attempt to find it in callers cache
+ Integer id = lookupCache.get(s);
+ if (id != null) {
+ return id;
+ }
+
+ // attempt to find the string in the database
+ stringMetadata = rocksDbGetStringMetadata(type, s);
+ if (stringMetadata != null) {
+ id = stringMetadata.getStringId();
+
+ // add to the callers cache. We can't add it to the stringMetadataCache, since that could cause an eviction
+ // database write, which we want to only occur from the inserting DB thread.
+ lookupCache.put(s, id);
+
+ return id;
+ }
+
+ // string does not exist
+ return INVALID_METADATA_STRING_ID;
+ }
+
+ // scans the database to look for a metadata string and returns the metadata info
+ StringMetadata rocksDbGetStringMetadata(KeyType type, String s) {
+ RocksDbKey firstKey = RocksDbKey.getInitialKey(type);
+ RocksDbKey lastKey = RocksDbKey.getLastKey(type);
+ final AtomicReference<StringMetadata> reference = new AtomicReference<>();
+ scanRange(firstKey, lastKey, (key, value) -> {
+ if (s.equals(value.getMetdataString())) {
+ reference.set(value.getStringMetadata(key));
+ return false;
+ } else {
+ return true; // haven't found string, keep searching
+ }
+ });
+ return reference.get();
+ }
+
+ // scans from key start to the key before end, calling back until callback indicates not to process further
+ void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) {
+ try (ReadOptions ro = new ReadOptions()) {
+ ro.setTotalOrderSeek(true);
+ RocksIterator iterator = db.newIterator(ro);
+ for (iterator.seek(start.getRaw()); iterator.isValid(); iterator.next()) {
+ RocksDbKey key = new RocksDbKey(iterator.key());
+ if (key.compareTo(end) >= 0) { // past limit, quit
+ return;
+ }
+
+ RocksDbValue val = new RocksDbValue(iterator.value());
+ if (!fn.cb(key, val)) {
+ // if cb returns false, we are done with this section of rows
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Shutdown the store.
+ */
+ @Override
+ public void close() {
+ metricsWriter.close();
+ metricsCleaner.close();
+ }
+
+ /**
+ * Scans all metrics in the store and returns the ones matching the specified filtering options.
+ * Callback returns Metric class results.
+ *
+ * @param filter options to filter by
+ * @param scanCallback callback for each Metric found
+ * @throws MetricException on error
+ */
+ public void scan(FilterOptions filter, ScanCallback scanCallback) throws MetricException {
+ scanInternal(filter, scanCallback, null);
+ }
+
+ /**
+ * Scans all metrics in the store and returns the ones matching the specified filtering options.
+ * Callback returns raw key/value data.
+ *
+ * @param filter options to filter by
+ * @param rawCallback callback for each Metric found
+ * @throws MetricException on error
+ */
+ private void scanRaw(FilterOptions filter, RocksDbScanCallback rawCallback) throws MetricException {
+ scanInternal(filter, null, rawCallback);
+ }
+
+ // perform a scan given filter options, and return results in either Metric or raw data.
+ private void scanInternal(FilterOptions filter, ScanCallback scanCallback, RocksDbScanCallback rawCallback) throws MetricException {
+
+ Map<String, Integer> stringToIdCache = new HashMap<>();
+ Map<Integer, String> idToStringCache = new HashMap<>();
+
+ int startTopologyId = 0;
+ int endTopologyId = 0xFFFFFFFF;
+ String filterTopologyId = filter.getTopologyId();
+ if (filterTopologyId != null) {
+ int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, filterTopologyId, stringToIdCache);
+ if (INVALID_METADATA_STRING_ID == topologyId) {
+ return; // string does not exist in database
+ }
+ startTopologyId = topologyId;
+ endTopologyId = topologyId;
+ }
+
+ long startTime = filter.getStartTime();
+ long endTime = filter.getEndTime();
+
+ int startMetricId = 0;
+ int endMetricId = 0xFFFFFFFF;
+ String filterMetricName = filter.getMetricName();
+ if (filterMetricName != null) {
+ int metricId = lookupMetadataString(KeyType.METRIC_STRING, filterMetricName, stringToIdCache);
+ if (INVALID_METADATA_STRING_ID == metricId) {
+ return; // string does not exist in database
+ }
+ startMetricId = metricId;
+ endMetricId = metricId;
+ }
+
+ int startComponentId = 0;
+ int endComponentId = 0xFFFFFFFF;
+ String filterComponentId = filter.getComponentId();
+ if (filterComponentId != null) {
+ int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, filterComponentId, stringToIdCache);
+ if (INVALID_METADATA_STRING_ID == componentId) {
+ return; // string does not exist in database
+ }
+ startComponentId = componentId;
+ endComponentId = componentId;
+ }
+
+ int startExecutorId = 0;
+ int endExecutorId = 0xFFFFFFFF;
+ String filterExecutorName = filter.getExecutorId();
+ if (filterExecutorName != null) {
+ int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, filterExecutorName, stringToIdCache);
+ if (INVALID_METADATA_STRING_ID == executorId) {
+ return; // string does not exist in database
+ }
+ startExecutorId = executorId;
+ endExecutorId = executorId;
+ }
+
+ int startHostId = 0;
+ int endHostId = 0xFFFFFFFF;
+ String filterHostId = filter.getHostId();
+ if (filterHostId != null) {
+ int hostId = lookupMetadataString(KeyType.HOST_STRING, filterHostId, stringToIdCache);
+ if (INVALID_METADATA_STRING_ID == hostId) {
+ return; // string does not exist in database
+ }
+ startHostId = hostId;
+ endHostId = hostId;
+ }
+
+ int startPort = 0;
+ int endPort = 0xFFFFFFFF;
+ Integer filterPort = filter.getPort();
+ if (filterPort != null) {
+ startPort = filterPort;
+ endPort = filterPort;
+ }
+
+ int startStreamId = 0;
+ int endStreamId = 0xFFFFFFFF;
+ String filterStreamId = filter.getStreamId();
+ if (filterStreamId != null) {
+ int streamId = lookupMetadataString(KeyType.HOST_STRING, filterStreamId, stringToIdCache);
+ if (INVALID_METADATA_STRING_ID == streamId) {
+ return; // string does not exist in database
+ }
+ startStreamId = streamId;
+ endStreamId = streamId;
+ }
+
+ ReadOptions ro = new ReadOptions();
+ ro.setTotalOrderSeek(true);
+
+ for (AggLevel aggLevel : filter.getAggLevels()) {
+
+ RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId,
+ startComponentId, startExecutorId, startHostId, startPort, startStreamId);
+ RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId,
+ endComponentId, endExecutorId, endHostId, endPort, endStreamId);
+
+ RocksIterator iterator = db.newIterator(ro);
+ for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) {
+ RocksDbKey key = new RocksDbKey(iterator.key());
+
+ if (key.compareTo(endKey) > 0) { // past limit, quit
+ break;
+ }
+
+ if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) {
+ continue;
+ }
+
+ long timestamp = key.getTimestamp();
+ if (timestamp < startTime || timestamp > endTime) {
+ continue;
+ }
+
+ if (startMetricId != 0 && key.getMetricId() != startMetricId) {
+ continue;
+ }
+
+ if (startComponentId != 0 && key.getComponentId() != startComponentId) {
+ continue;
+ }
+
+ if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) {
+ continue;
+ }
+
+ if (startHostId != 0 && key.getHostnameId() != startHostId) {
+ continue;
+ }
+
+ if (startPort != 0 && key.getPort() != startPort) {
+ continue;
+ }
+
+ if (startStreamId != 0 && key.getStreamId() != startStreamId) {
+ continue;
+ }
+
+ RocksDbValue val = new RocksDbValue(iterator.value());
+
+ if (scanCallback != null) {
+ try {
+ // populate a metric
+ String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
+ String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache);
+ String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache);
+ String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache);
+ String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
+ String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache);
+
+ Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname,
+ streamId, key.getPort(), aggLevel);
+
+ val.populateMetric(metric);
+
+ // callback to caller
+ scanCallback.cb(metric);
+ } catch (MetricException e) {
+ LOG.warn("Failed to report found metric: {}", e.getMessage());
+ }
+ } else {
+ if (!rawCallback.cb(key, val)) {
+ return;
+ }
+ }
+ }
+ iterator.close();
+ }
+ ro.close();
+ }
+
+ // Finds the metadata string that matches the string Id and type provided. The string should exist, as it is
+ // referenced from a metric.
+ private String metadataIdToString(KeyType type, int id, Map<Integer, String> lookupCache) throws MetricException {
+ String s = readOnlyStringMetadataCache.getMetadataString(id);
+ if (s != null) {
+ return s;
+ }
+ s = lookupCache.get(id);
+ if (s != null) {
+ return s;
+ }
+ // get from DB and add to lookup cache
+ RocksDbKey key = new RocksDbKey(type, id);
+ try {
+ byte[] value = db.get(key.getRaw());
+ if (value == null) {
+ throw new MetricException("Failed to find metadata string for id " + id + " of type " + type);
+ }
+ RocksDbValue rdbValue = new RocksDbValue(value);
+ s = rdbValue.getMetdataString();
+ lookupCache.put(id, s);
+ return s;
+ } catch (RocksDBException e) {
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ throw new MetricException("Failed to get from RocksDb", e);
+ }
+ }
+
+ // deletes metrics matching the filter options
+ void deleteMetrics(FilterOptions filter) throws MetricException {
+ try (WriteBatch writeBatch = new WriteBatch();
+ WriteOptions writeOps = new WriteOptions()) {
+
+ scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> {
+ writeBatch.remove(key.getRaw());
+ return true;
+ });
+
+ if (writeBatch.count() > 0) {
+ LOG.info("Deleting {} metrics", writeBatch.count());
+ try {
+ db.write(writeOps, writeBatch);
+ } catch (Exception e) {
+ String message = "Failed delete metrics";
+ LOG.error(message, e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ throw new MetricException(message, e);
+ }
+ }
+ }
+ }
+
+ // deletes metadata strings before the provided timestamp
+ void deleteMetadataBefore(long firstValidTimestamp) throws MetricException {
+ if (firstValidTimestamp < 1L) {
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ throw new MetricException("Invalid timestamp for deleting metadata: " + firstValidTimestamp);
+ }
+
+ try (WriteBatch writeBatch = new WriteBatch();
+ WriteOptions writeOps = new WriteOptions()) {
+
+ // search all metadata strings
+ RocksDbKey topologyMetadataPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
+ RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
+ scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> {
+ // we'll assume the metadata was recently used if still in the cache.
+ if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) {
+ if (value.getLastTimestamp() < firstValidTimestamp) {
+ writeBatch.remove(key.getRaw());
+ }
+ }
+ return true;
+ });
+
+ if (writeBatch.count() > 0) {
+ LOG.info("Deleting {} metadata strings", writeBatch.count());
+ try {
+ db.write(writeOps, writeBatch);
+ } catch (Exception e) {
+ String message = "Failed delete metadata strings";
+ LOG.error(message, e);
+ if (this.failureMeter != null) {
+ this.failureMeter.mark();
+ }
+ throw new MetricException(message, e);
+ }
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
new file mode 100644
index 0000000..58b2c76
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.nio.ByteBuffer;
+import org.apache.storm.metricstore.Metric;
+
+
+/**
+ * Class representing the data used as a Value in RocksDB. Values can be used either for metadata or metrics.
+ *
+ * <p>Formats for Metadata String values are:
+ *
+ * <pre>
+ * Field Size Offset
+ *
+ * Version 1 0 The current metadata version - allows migrating if the format changes in the future
+ * Timestamp 8 1 The time when the metadata was last used by a metric. Allows deleting of old metadata.
+ * Metadata String any 9 The metadata string
+ *</pre>
+ *
+ * <p>Formats for Metric values are:
+ *
+ * <pre>
+ * Field Size Offset
+ *
+ * Version 1 0 The current metric version - allows migrating if the format changes in the future
+ * Value 8 1 The metric value
+ * Count 8 9 The metric count
+ * Min 8 17 The minimum metric value
+ * Max 8 25 The maximum metric value
+ * Sum 8 33 The sum of the metric values
+ * </pre>
+ */
+
+class RocksDbValue {
+ private static int METRIC_VALUE_SIZE = 41;
+ private byte[] value;
+ private static final byte CURRENT_METADATA_VERSION = 0;
+ private static final byte CURRENT_METRIC_VERSION = 0;
+ private static int MIN_METADATA_VALUE_SIZE = 9;
+
+ /**
+ * Constructor from raw data.
+ *
+ * @param value the raw bytes representing the key
+ */
+ RocksDbValue(byte[] value) {
+ this.value = value;
+ }
+
+ /**
+ * Constructor for a metadata string.
+ *
+ * @param lastTimestamp the last timestamp when the string was used
+ * @param metadataString the metadata string
+ */
+ RocksDbValue(long lastTimestamp, String metadataString) {
+ this.value = new byte[MIN_METADATA_VALUE_SIZE + metadataString.length()];
+ ByteBuffer bb = ByteBuffer.wrap(value);
+ bb.put(CURRENT_METADATA_VERSION);
+ bb.putLong(lastTimestamp);
+ bb.put(metadataString.getBytes());
+ }
+
+ /**
+ * Constructor for a metric.
+ *
+ * @param m the metric to create a value from
+ */
+ RocksDbValue(Metric m) {
+ this.value = new byte[METRIC_VALUE_SIZE];
+ ByteBuffer bb = ByteBuffer.wrap(value);
+ bb.put(CURRENT_METRIC_VERSION);
+ bb.putDouble(m.getValue());
+ bb.putLong(m.getCount());
+ bb.putDouble(m.getMin());
+ bb.putDouble(m.getMax());
+ bb.putDouble(m.getSum());
+ }
+
+ /**
+ * Get the metadata string portion of the value. Assumes the value is metadata.
+ *
+ * @return the metadata string
+ */
+ String getMetdataString() {
+ if (this.value.length < MIN_METADATA_VALUE_SIZE) {
+ throw new RuntimeException("RocksDB value is too small to be a metadata string!");
+ }
+ return new String(this.value, 9, this.value.length - 9);
+ }
+
+ /**
+ * Gets StringMetadata associated with the key/value pair.
+ */
+ StringMetadata getStringMetadata(RocksDbKey key) {
+ return new StringMetadata(key.getType(), key.getMetadataStringId(), this.getLastTimestamp());
+ }
+
+ /**
+ * Gets the last time a metadata string was used.
+ */
+ long getLastTimestamp() {
+ return ByteBuffer.wrap(value, 1, 8).getLong();
+ }
+
+ /**
+ * get the raw value bytes
+ */
+ byte[] getRaw() {
+ return this.value;
+ }
+
+ /**
+ * populate metric values from the raw data.
+ */
+ void populateMetric(Metric metric) {
+ ByteBuffer bb = ByteBuffer.wrap(this.value, 0, METRIC_VALUE_SIZE);
+ bb.get(); // version
+ metric.setValue(bb.getDouble());
+ metric.setCount(bb.getLong());
+ metric.setMin(bb.getDouble());
+ metric.setMax(bb.getDouble());
+ metric.setSum(bb.getDouble());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
new file mode 100644
index 0000000..6f54a58
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class that contains the information associated with a metadata string that remains cached in memory.
+ */
+class StringMetadata {
+ private List<KeyType> types = new ArrayList<>(1); // its possible a string is used by multiple types of metadata strings
+ private int stringId;
+ private long lastTimestamp;
+
+ /**
+ * Constructor for StringMetadata.
+ *
+ * @param metadataType the type of metadata string
+ * @param stringId the unique id for the metadata string
+ * @param lastTimestamp the timestamp when the metric used the metadata string
+ */
+ StringMetadata(KeyType metadataType, Integer stringId, Long lastTimestamp) {
+ this.types.add(metadataType);
+ this.stringId = stringId;
+ this.lastTimestamp = lastTimestamp;
+ }
+
+ int getStringId() {
+ return this.stringId;
+ }
+
+ long getLastTimestamp() {
+ return this.lastTimestamp;
+ }
+
+ List<KeyType> getMetadataTypes() {
+ return this.types;
+ }
+
+ private void addKeyType(KeyType type) {
+ if (!this.types.contains(type)) {
+ this.types.add(type);
+ }
+ }
+
+ /**
+ * Updates the timestamp of when a metadata string was last used. Adds the type of the string if it is a new
+ * type.
+ *
+ * @param metricTimestamp the timestamp of the metric using the metadata string
+ * @param type the type of metadata string for the metric
+ */
+ void update(Long metricTimestamp, KeyType type) {
+ if (metricTimestamp > this.lastTimestamp) {
+ this.lastTimestamp = metricTimestamp;
+ }
+ addKeyType(type);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
new file mode 100644
index 0000000..7ce8435
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.utils.LruMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to create a use a cache that stores Metadata string information in memory. It allows searching for a
+ * Metadata string's unique id, or looking up the string by the unique id. The StringMetadata is stored in an
+ * LRU map. When an entry is added to the cache, an older entry may be evicted, which then needs to be
+ * immediately stored to the database to provide a consistent view of all the metadata strings.
+ *
+ * <p>All write operations adding metadata to RocksDB are done by a single thread (a RocksDbMetricsWriter),
+ * but multiple threads can read values from the cache. To clarify which permissions are accessible by various
+ * threads, the ReadOnlyStringMetadataCache and WritableStringMetadataCache are provided to be used.
+ */
+
+public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, StringMetadata>,
+ WritableStringMetadataCache, ReadOnlyStringMetadataCache {
+ private static final Logger LOG = LoggerFactory.getLogger(StringMetadataCache.class);
+ private Map<String, StringMetadata> lruStringCache;
+ private Map<Integer, String> hashToString = new ConcurrentHashMap<>();
+ private RocksDbMetricsWriter dbWriter;
+ private static StringMetadataCache instance = null;
+
+ /**
+ * Initializes the cache instance.
+ *
+ * @param dbWriter the RocksDB writer instance to handle writing evicted cache data
+ * @param capacity the number of StringMetadata instances to hold in memory
+ * @throws MetricException if creating multiple cache instances
+ */
+ static void init(RocksDbMetricsWriter dbWriter, int capacity) throws MetricException {
+ if (instance == null) {
+ instance = new StringMetadataCache(dbWriter, capacity);
+ } else {
+ throw new MetricException("StringMetadataCache already created");
+ }
+ }
+
+ /**
+ * Provides the WritableStringMetadataCache interface to the cache instance.
+ *
+ * @throws MetricException if the cache instance was not created
+ */
+ static WritableStringMetadataCache getWritableStringMetadataCache() throws MetricException {
+ if (instance != null) {
+ return instance;
+ } else {
+ throw new MetricException("StringMetadataCache was not initialized");
+ }
+ }
+
+ /**
+ * Provides the ReadOnlyStringMetadataCache interface to the cache instance.
+ *
+ * @throws MetricException if the cache instance was not created
+ */
+ static ReadOnlyStringMetadataCache getReadOnlyStringMetadataCache() throws MetricException {
+ if (instance != null) {
+ return instance;
+ } else {
+ throw new MetricException("StringMetadataCache was not initialized");
+ }
+ }
+
+ /**
+ * Constructor to create a cache.
+ *
+ * @param dbWriter The rocks db writer instance the cache should use when evicting data
+ * @param capacity The cache size
+ */
+ private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) {
+ lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, this));
+ this.dbWriter = dbWriter;
+ }
+
+ /**
+ * Get the string metadata from the cache.
+ *
+ * @param s The string to look for
+ * @return the metadata associated with the string or null if not found
+ */
+ public StringMetadata get(String s) {
+ return lruStringCache.get(s);
+ }
+
+ /**
+ * Add the string metadata to the cache.
+ *
+ * NOTE: this can cause data to be evicted from the cache when full. When this occurs, the evictionCallback() method
+ * is called to store the metadata back into the RocksDB database.
+ *
+ * This method is only exposed to the WritableStringMetadataCache interface.
+ *
+ * @param s The string to add
+ * @param stringMetadata The string's metadata
+ * @param newEntry Indicates the metadata is being used for the first time and should be written to RocksDB immediately
+ * @throws MetricException when evicted data fails to save to the database or when the database is shutdown
+ */
+ public void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException {
+ if (dbWriter.isShutdown()) {
+ // another thread could be writing out the metadata cache to the database.
+ throw new MetricException("Shutting down");
+ }
+ try {
+ if (newEntry) {
+ writeMetadataToDisk(s, stringMetadata);
+ }
+ lruStringCache.put(s, stringMetadata);
+ hashToString.put(stringMetadata.getStringId(), s);
+ } catch (Exception e) { // catch any runtime exceptions caused by eviction
+ throw new MetricException("Failed to save string in metadata cache", e);
+ }
+ }
+
+ /**
+ * Callback when data is about to be removed from the cache. This method then
+ * immediately writes the metadata to RocksDB.
+ *
+ * @param key The evicted string
+ * @param val The evicted string's metadata
+ * @throws RuntimeException when evicted data fails to save to the database
+ */
+ public void evictionCallback(String key, StringMetadata val) {
+ writeMetadataToDisk(key, val);
+ }
+
+ private void writeMetadataToDisk(String key, StringMetadata val) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing {} to RocksDB", key);
+ }
+ // remove reverse lookup from map
+ hashToString.remove(val.getStringId());
+
+ // save the evicted key/value to the database immediately
+ RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), key);
+
+ for (KeyType type : val.getMetadataTypes()) { // save the metadata for all types of strings it matches
+ RocksDbKey rkey = new RocksDbKey(type, val.getStringId());
+ dbWriter.handleEvictedMetadata(rkey, rval);
+ }
+ }
+
+ /**
+ * Determines if a string Id is contained in the cache.
+ *
+ * @param stringId The string Id to check
+ * @return true if the Id is in the cache, false otherwise
+ */
+ public boolean contains(Integer stringId) {
+ return hashToString.containsKey(stringId);
+ }
+
+ /**
+ * Returns the string matching the string Id if in the cache.
+ *
+ * @param stringId The string Id to check
+ * @return the associated string if the Id is in the cache, null otherwise
+ */
+ public String getMetadataString(Integer stringId) {
+ return hashToString.get(stringId);
+ }
+
+ /**
+ * Get the map of the cache contents. Provided to allow writing the data to RocksDB on shutdown.
+ *
+ * @return the string metadata map entrySet
+ */
+ public Set<Map.Entry<String, StringMetadata>> entrySet() {
+ return lruStringCache.entrySet();
+ }
+
+ static void cleanUp() {
+ instance = null;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
new file mode 100644
index 0000000..2d4165f
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.storm.metricstore.MetricException;
+
+/**
+ * The writable interface to a StringMetadataCache intended to be used by a single RocksDBMetricwWriter instance.
+ */
+@NotThreadSafe
+public interface WritableStringMetadataCache extends ReadOnlyStringMetadataCache {
+
+ /**
+ * Add the string metadata to the cache.
+ *
+ * * NOTE: this can cause data to be evicted from the cache when full. When this occurs, the evictionCallback() method
+ * is called to store the metadata back into the RocksDB database.
+ *
+ * This method is only exposed to the WritableStringMetadataCache interface.
+ *
+ * @param s The string to add
+ * @param stringMetadata The string's metadata
+ * @param newEntry Indicates the metadata is being used for the first time and should be written to RocksDB immediately
+ * @throws MetricException when evicted data fails to save to the database or when the database is shutdown
+ */
+ void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException;
+
+ /**
+ * Get the map of the cache contents. Provided to allow writing the data to RocksDB on shutdown.
+ *
+ * @return the string metadata map entrySet
+ */
+ Set<Map.Entry<String, StringMetadata>> entrySet();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/LruMap.java b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
new file mode 100644
index 0000000..3ed5d06
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LruMap<A, B> extends LinkedHashMap<A, B> {
+ private int maxSize;
+ private CacheEvictionCallback evCb = null;
+
+ public LruMap(int maxSize) {
+ super(maxSize + 1, 1.0f, true);
+ this.maxSize = maxSize;
+ }
+
+ /**
+ * Creates an LRU map that will call back before data is removed from the map.
+ *
+ * @param maxSize max capacity for the map
+ * @param evictionCallback callback to be called before removing data
+ */
+ public LruMap(int maxSize, CacheEvictionCallback evictionCallback) {
+ this(maxSize);
+ this.evCb = evictionCallback;
+ }
+
+ @Override
+ protected boolean removeEldestEntry(final Map.Entry<A, B> eldest) {
+ boolean evict = size() > this.maxSize;
+ if (evict && this.evCb != null) {
+ this.evCb.evictionCallback(eldest.getKey(), eldest.getValue());
+ }
+ return evict;
+ }
+
+ public interface CacheEvictionCallback<K, V> {
+ void evictionCallback(K key, V val);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java
new file mode 100644
index 0000000..21d2377
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import org.apache.storm.metricstore.AggLevel;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RocksDbKeyTest {
+
+ @Test
+ public void testConstructors() {
+ byte[] raw = new byte[RocksDbKey.KEY_SIZE];
+ raw[0] = KeyType.COMPONENT_STRING.getValue();
+ raw[2] = 0x01;
+ raw[3] = 0x02;
+ raw[4] = 0x03;
+ raw[5] = 0x04;
+ RocksDbKey rawKey = new RocksDbKey(raw);
+
+ RocksDbKey metadataKey = new RocksDbKey(KeyType.COMPONENT_STRING, 0x01020304);
+ Assert.assertEquals(0, metadataKey.compareTo(rawKey));
+ Assert.assertEquals(KeyType.COMPONENT_STRING, metadataKey.getType());
+
+ metadataKey = new RocksDbKey(KeyType.TOPOLOGY_STRING, 0x01020304);
+ Assert.assertTrue(metadataKey.compareTo(rawKey) < 0);
+ Assert.assertEquals(KeyType.TOPOLOGY_STRING, metadataKey.getType());
+
+ metadataKey = new RocksDbKey(KeyType.COMPONENT_STRING, 0x01020305);
+ Assert.assertTrue(metadataKey.compareTo(rawKey) > 0);
+
+ Assert.assertEquals(0x01020304, rawKey.getTopologyId());
+ Assert.assertEquals(KeyType.COMPONENT_STRING, rawKey.getType());
+ }
+
+ @Test
+ public void testMetricKey() {
+ AggLevel aggLevel = AggLevel.AGG_LEVEL_10_MIN;
+ int topologyId = 0x45665;
+ long timestamp = System.currentTimeMillis();
+ int metricId = 0xF3916034;
+ int componentId = 0x82915031;
+ int executorId = 0x434738;
+ int hostId = 0x4348394;
+ int port = 3456;
+ int streamId = 0x84221956;
+ RocksDbKey key = RocksDbKey.createMetricKey(aggLevel, topologyId, timestamp, metricId,
+ componentId, executorId, hostId, port, streamId);
+ Assert.assertEquals(topologyId, key.getTopologyId());
+ Assert.assertEquals(timestamp, key.getTimestamp());
+ Assert.assertEquals(metricId, key.getMetricId());
+ Assert.assertEquals(componentId, key.getComponentId());
+ Assert.assertEquals(executorId, key.getExecutorId());
+ Assert.assertEquals(hostId, key.getHostnameId());
+ Assert.assertEquals(port, key.getPort());
+ Assert.assertEquals(streamId, key.getStreamId());
+ }
+}