You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/01/22 21:23:41 UTC

[2/7] storm git commit: STORM-2887: store metrics into RocksDB

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
new file mode 100644
index 0000000..2f2ad76
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/MetricStoreConfig.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.Map;
+import org.apache.storm.DaemonConfig;
+
+
+public class MetricStoreConfig {
+
+    /**
+     * Configures metrics store to use the class specified in the conf.
+     * @param conf Storm config map
+     * @return MetricStore prepared store
+     * @throws MetricException  on misconfiguration
+     */
+    public static MetricStore configure(Map conf) throws MetricException {
+
+        try {
+            String storeClass = (String)conf.get(DaemonConfig.STORM_METRIC_STORE_CLASS);
+            MetricStore store = (MetricStore) (Class.forName(storeClass)).newInstance();
+            store.prepare(conf);
+            return store;
+        } catch (Throwable t) {
+            throw new MetricException("Failed to create metric store", t);
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java
new file mode 100644
index 0000000..a351be7
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/KeyType.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Specifies all the valid types of keys and their values.
+ */
+public enum KeyType {
+    TOPOLOGY_BLOB(0), // reserved for saving topology data
+    METADATA_STRING_START(1),
+    TOPOLOGY_STRING(1),
+    METRIC_STRING(2),
+    COMPONENT_STRING(3),
+    EXEC_ID_STRING(4),
+    HOST_STRING(5),
+    STREAM_ID_STRING(6),
+    METADATA_STRING_END(7),
+    METRIC_DATA(0x80);
+
+    private final byte value;
+    private static Map<Byte, KeyType> MAP;
+
+    static {
+        MAP = new HashMap<>();
+        for (KeyType type : EnumSet.allOf(KeyType.class)) {
+            MAP.put(type.getValue(), type);
+        }
+        MAP = Collections.unmodifiableMap(MAP);
+    }
+
+    KeyType(int value) {
+        this.value = (byte)value;
+    }
+
+    byte getValue() {
+        return this.value;
+    }
+
+    static KeyType getKeyType(byte value) {
+        KeyType type = MAP.get(value);
+        if (type == null) {
+            throw new RuntimeException("Invalid key type " + value);
+        } else {
+            return type;
+        }
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
new file mode 100644
index 0000000..6618f5d
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.MetricException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for removing expired metrics and unused metadata from the RocksDB store.
+ */
+public class MetricsCleaner implements Runnable, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(MetricsCleaner.class);
+    private static long DEFAULT_SLEEP_MS = 4L * 60L * 60L * 1000L;
+    private RocksDbStore store;
+    private long retentionHours;
+    private volatile boolean shutdown = false;
+    private long sleepMs = DEFAULT_SLEEP_MS;
+    private Meter failureMeter;
+    private long purgeTimestamp = 0L;
+
+    MetricsCleaner(RocksDbStore store, int retentionHours, int hourlyPeriod, Meter failureMeter) {
+        this.store = store;
+        this.retentionHours = retentionHours;
+        if (hourlyPeriod > 0) {
+            this.sleepMs = hourlyPeriod * 60L * 60L * 1000L;
+        }
+        this.failureMeter = failureMeter;
+
+        Gauge<Long> gauge = new Gauge<Long>() {
+            @Override
+            public Long getValue() {
+                return purgeTimestamp;
+            }
+        };
+        StormMetricsRegistry.registerProvidedGauge("MetricsCleaner:purgeTimestamp", gauge);
+    }
+
+    @Override
+    public void close() {
+        shutdown = true;
+    }
+
+    @Override
+    public void run() {
+        while (!shutdown) {
+            try {
+                Thread.sleep(sleepMs);
+            } catch (InterruptedException e) {
+                LOG.error("Sleep interrupted", e);
+                continue;
+            }
+
+            try {
+                purgeMetrics();
+            } catch (MetricException e) {
+                LOG.error("Failed to purge metrics", e);
+                if (this.failureMeter != null) {
+                    this.failureMeter.mark();
+                }
+            }
+        }
+    }
+
+    void purgeMetrics() throws MetricException {
+        purgeTimestamp = System.currentTimeMillis() - this.retentionHours * 60L * 60L * 1000L;
+
+        LOG.info("Purging metrics before {}", purgeTimestamp);
+
+        FilterOptions filter = new FilterOptions();
+        long endTime = purgeTimestamp - 1L;
+        filter.setEndTime(endTime);
+        store.deleteMetrics(filter);
+
+        LOG.info("Purging metadata before " + purgeTimestamp);
+        store.deleteMetadataBefore(purgeTimestamp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java
new file mode 100644
index 0000000..0effbc4
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/ReadOnlyStringMetadataCache.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import org.apache.http.annotation.ThreadSafe;
+
+/**
+ * The read-only interface to a StringMetadataCache allowed to be used by any thread.
+ */
+@ThreadSafe
+public interface ReadOnlyStringMetadataCache {
+
+    /**
+     * Get the string metadata from the cache.
+     *
+     * @param s   The string to look for
+     * @return   the metadata associated with the string or null if not found
+     */
+    StringMetadata get(String s);
+
+    /**
+     * Returns the string matching the string Id if in the cache.
+     *
+     * @param stringId   The string Id to check
+     * @return   the associated string if the Id is in the cache, null otherwise
+     */
+    String getMetadataString(Integer stringId);
+
+    /**
+     * Determines if a string Id is contained in the cache.
+     *
+     * @param stringId   The string Id to check
+     * @return   true if the Id is in the cache, false otherwise
+     */
+    boolean contains(Integer stringId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java
new file mode 100644
index 0000000..7868282
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbKey.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.google.common.primitives.UnsignedBytes;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.storm.metricstore.AggLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class representing the data used as a Key in RocksDB.  Keys can be used either for metadata or metrics.
+ *
+ * <P>Keys are 38 bytes in size.  The fields for a key are:
+ * <pre><
+ * Field             Size         Offset
+ *
+ * Type                 1              0      The type maps to the KeyType enum, specifying a metric or various types of metadata
+ * Aggregation Level    1              1      The aggregation level for a metric (see AggLevel enum).  0 for metadata.
+ * TopologyId           4              2      The metadata string Id representing a topologyId for a metric, or the unique
+ *                                                   string Id for a metadata string
+ * Timestamp            8              6      The timestamp for a metric, unused for metadata
+ * MetricId             4             14      The metadata string Id for the metric name
+ * ComponentId          4             18      The metadata string Id for the component Id
+ * ExecutorId           4             22      The metadata string Id for the executor Id
+ * HostId               4             26      The metadata string Id for the host Id
+ * Port                 4             30      The port number
+ * StreamId             4             34      The metadata string Id for the stream Id
+ * </pre>
+ */
+public class RocksDbKey implements Comparable<RocksDbKey> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDbKey.class);
+    static final int KEY_SIZE = 38;
+    private static Map<Byte, RocksDbKey> PREFIX_MAP = new HashMap<>();
+    private byte[] key;
+
+    static {
+        // pregenerate commonly used keys for scans
+        for (KeyType type : EnumSet.allOf(KeyType.class)) {
+            RocksDbKey key = new RocksDbKey(type, 0);
+            PREFIX_MAP.put(type.getValue(), key);
+        }
+        PREFIX_MAP = Collections.unmodifiableMap(PREFIX_MAP);
+    }
+
+    /**
+     * Constructor for a RocksDB key for a metadata string.
+     *
+     * @param type  type of metadata string
+     * @param metadataStringId  the string Id for the string (stored in the topologyId portion of the key)
+     */
+    RocksDbKey(KeyType type, int metadataStringId) {
+        byte[] key = new byte[KEY_SIZE];
+        ByteBuffer bb = ByteBuffer.wrap(key);
+        bb.put(type.getValue());
+        bb.put(AggLevel.AGG_LEVEL_NONE.getValue());
+        bb.putInt(metadataStringId);
+        this.key = key;
+    }
+
+    /**
+     * Constructor for a RocksDB key from raw data.
+     *
+     * @param raw  the key data
+     */
+    RocksDbKey(byte[] raw) {
+        this.key = raw;
+    }
+
+
+    /**
+     * Get a zeroed key of the specified type.
+     *
+     * @param type  the desired type
+     * @return  a key of the desired type
+     */
+    static RocksDbKey getPrefix(KeyType type) {
+        return PREFIX_MAP.get(type.getValue());
+    }
+
+    /**
+     * get the metadata string Id portion of the key for metadata keys.
+     *
+     * @return  the metadata string Id
+     * @throws  RuntimeException  if the key is not a metadata type
+     */
+    int getMetadataStringId() {
+        if (this.getType().getValue() < KeyType.METADATA_STRING_END.getValue()) {
+            return ByteBuffer.wrap(key, 2, 4).getInt();
+        } else {
+            throw new RuntimeException("Cannot fetch metadata string for key of type " + this.getType());
+        }
+    }
+
+    /**
+     * get the raw key bytes
+     */
+    byte[] getRaw() {
+        return this.key;
+    }
+
+    /**
+     * get the type of key.
+     *
+     * @return  the type of key
+     */
+    KeyType getType() {
+        return KeyType.getKeyType(key[0]);
+    }
+
+    /**
+     * compares to keys on a byte by byte basis.
+     *
+     * @return  comparison of key byte values
+     */
+    @Override
+    public int compareTo(RocksDbKey o) {
+        return UnsignedBytes.lexicographicalComparator().compare(this.getRaw(), o.getRaw());
+    }
+
+    /**
+     * gets the first possible key value for the desired key type.
+     *
+     * @return  the initial key
+     */
+    static RocksDbKey getInitialKey(KeyType type) {
+        return PREFIX_MAP.get(type.getValue());
+    }
+
+    /**
+     * gets the key just larger than the last possible key value for the desired key type.
+     *
+     * @return  the last key
+     */
+    static RocksDbKey getLastKey(KeyType type) {
+        byte value = (byte)(type.getValue() + 1);
+        return PREFIX_MAP.get(value);
+    }
+
+    /**
+     * Creates a metric key with the desired properties.
+     *
+     * @return  the generated key
+     */
+    static RocksDbKey createMetricKey(AggLevel aggLevel, int topologyId, long metricTimestamp, int metricId,
+                                      int componentId, int executorId, int hostId, int port,
+                                      int streamId) {
+        byte[] raw = new byte[KEY_SIZE];
+        ByteBuffer bb = ByteBuffer.wrap(raw);
+        bb.put(KeyType.METRIC_DATA.getValue());
+        bb.put(aggLevel.getValue());
+        bb.putInt(topologyId);       // offset 2
+        bb.putLong(metricTimestamp); // offset 6
+        bb.putInt(metricId);         // offset 14
+        bb.putInt(componentId);      // offset 18
+        bb.putInt(executorId);       // offset 22
+        bb.putInt(hostId);           // offset 26
+        bb.putInt(port);             // offset 30
+        bb.putInt(streamId);         // offset 34
+
+        RocksDbKey key = new RocksDbKey(raw);
+        return key;
+    }
+
+    /**
+     * Get the unique string Id for a metric's topologyId.
+     */
+    int getTopologyId() {
+        int val = ByteBuffer.wrap(key, 2, 4).getInt();
+        return val;
+    }
+
+    long getTimestamp() {
+        return ByteBuffer.wrap(key, 6, 8).getLong();
+    }
+
+    int getMetricId() {
+        return ByteBuffer.wrap(key, 14, 4).getInt();
+    }
+
+    int getComponentId() {
+        return ByteBuffer.wrap(key, 18, 4).getInt();
+    }
+
+    int getExecutorId() {
+        return ByteBuffer.wrap(key, 22, 4).getInt();
+    }
+
+    int getHostnameId() {
+        return ByteBuffer.wrap(key, 26, 4).getInt();
+    }
+
+    int getPort() {
+        return ByteBuffer.wrap(key, 30, 4).getInt();
+    }
+
+    int getStreamId() {
+        return ByteBuffer.wrap(key, 34, 4).getInt();
+    }
+
+    @Override
+    public String toString() {
+        return "[0x" + DatatypeConverter.printHexBinary(key) + "]";
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
new file mode 100644
index 0000000..a050a76
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from a blocking queue.  Inserts
+ * to RocksDB are done using a single thread to simplify design (such as looking up existing metric data for aggregation,
+ * and fetching/evicting metadata from the cache).
+ * </P>
+ * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
+ * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
+ * method callback.
+ * </P>
+ * The following issues would need to be addressed to implement a multithreaded metrics writer:
+ * <ul>
+ *     <li>Generation of unique unused IDs for new metadata strings needs to be thread safe.</li>
+ *     <li>Ensuring newly created metadata strings are seen by all threads.</li>
+ *     <li>Maintaining a properly cached state of metadata for multiple writers.  The current LRU cache
+ *     evicts data as new metadata is added.</li>
+ *     <li>Processing the aggregation of a metric requires fetching and updating previous aggregates.  A multithreaded
+ *     design would need to ensure two metrics were not updating an aggregated metric at the same time.</li>
+ *     <li>Investigate performance of multiple threads inserting into RocksDB versus a single ordered insert.</li>
+ * </ul>
+ */
+@NotThreadSafe
+public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
+    private RocksDbStore store;
+    private BlockingQueue queue;
+    private WritableStringMetadataCache stringMetadataCache;
+    private Set<Integer> unusedIds = new HashSet<>();
+    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
+    private WriteOptions writeOpts = new WriteOptions();
+    private volatile boolean shutdown = false;
+    private Meter failureMeter;
+    private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
+
+    /**
+     * Constructor for the RocksDbMetricsWriter.
+     *
+     * @param store   The RocksDB store
+     * @param queue   The queue to receive metrics for insertion
+     */
+    RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter)  {
+        this.store = store;
+        this.queue = queue;
+        this.failureMeter = failureMeter;
+
+        aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
+        aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
+        aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
+    }
+
+    /**
+     * Init routine called once the Metadata cache has been created.
+     *
+     * @throws MetricException  on cache error
+     */
+    void init() throws MetricException {
+        this.stringMetadataCache = StringMetadataCache.getWritableStringMetadataCache();
+    }
+
+    /**
+     * Run routine to wait for metrics on a queue and insert into RocksDB.
+     */
+    @Override
+    public void run() {
+        while (!shutdown) {
+            try {
+                Metric m = (Metric) queue.take();
+                processInsert(m);
+            } catch (Exception e) {
+                LOG.error("Failed to insert metric", e);
+                if (this.failureMeter != null) {
+                    this.failureMeter.mark();
+                }
+            }
+        }
+    }
+
+    /**
+     * Performs the actual metric insert, and aggregates over all bucket times.
+     *
+     * @param metric  Metric to store
+     * @throws MetricException  if database write fails
+     */
+    private void processInsert(Metric metric) throws MetricException {
+
+        // convert all strings to numeric Ids for the metric key and add to the metadata cache
+        long metricTimestamp = metric.getTimestamp();
+        Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), metricTimestamp);
+        Integer metricId = storeMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), metricTimestamp);
+        Integer componentId = storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), metricTimestamp);
+        Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), metricTimestamp);
+        Integer hostId = storeMetadataString(KeyType.HOST_STRING, metric.getHostname(), metricTimestamp);
+        Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), metricTimestamp);
+
+        RocksDbKey key = RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, metric.getTimestamp(), metricId,
+                componentId, executorId, hostId, metric.getPort(), streamId);
+
+        // save metric key/value to be batched
+        RocksDbValue value = new RocksDbValue(metric);
+        insertBatch.put(key, value);
+
+        // Aggregate matching metrics over bucket timeframes.
+        // We'll process starting with the longest bucket.  If the metric for this does not exist, we don't have to
+        // search for the remaining bucket metrics.
+        ListIterator li = aggBuckets.listIterator(aggBuckets.size());
+        boolean populate = true;
+        while (li.hasPrevious()) {
+            AggLevel bucket = (AggLevel)li.previous();
+            Metric aggMetric = new Metric(metric);
+            aggMetric.setAggLevel(bucket);
+
+            long msToBucket = 1000L * 60L * bucket.getValue();
+            long roundedToBucket = msToBucket * (metric.getTimestamp() / msToBucket);
+            aggMetric.setTimestamp(roundedToBucket);
+
+            RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, topologyId, aggMetric.getTimestamp(), metricId,
+                    componentId, executorId, hostId, aggMetric.getPort(), streamId);
+
+            if (populate) {
+                // retrieve any existing aggregation matching this one and update the values
+                if (store.populateFromKey(aggKey, aggMetric)) {
+                    aggMetric.addValue(metric.getValue());
+                } else {
+                    // aggregating metric did not exist, don't look for further ones with smaller timestamps
+                    populate = false;
+                }
+            }
+
+            // save metric key/value to be batched
+            RocksDbValue aggVal = new RocksDbValue(aggMetric);
+            insertBatch.put(aggKey, aggVal);
+        }
+
+        processBatchInsert(insertBatch);
+
+        insertBatch.clear();
+    }
+
+    // converts a metadata string into a unique integer.  Updates the timestamp of the string
+    // so we can track when it was last used for later deletion on database cleanup.
+    private int storeMetadataString(KeyType type, String s, long metricTimestamp) throws MetricException {
+        if (s == null) {
+            throw new MetricException("No string for metric metadata string type " + type);
+        }
+
+        // attempt to find it in the string cache
+        StringMetadata stringMetadata = stringMetadataCache.get(s);
+        if (stringMetadata != null) {
+            // make sure the timestamp on the metadata has the latest time
+            stringMetadata.update(metricTimestamp, type);
+            return stringMetadata.getStringId();
+        }
+
+        // attempt to find the string in the database
+        stringMetadata = store.rocksDbGetStringMetadata(type, s);
+        if (stringMetadata != null) {
+            // update to the latest timestamp and add to the string cache
+            stringMetadata.update(metricTimestamp, type);
+            stringMetadataCache.put(s, stringMetadata, false);
+            return stringMetadata.getStringId();
+        }
+
+        // string does not exist, create using an unique string id and add to cache
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(type + "." + s + " does not exist in cache or database");
+        }
+        int stringId = getUniqueMetadataStringId();
+        stringMetadata = new StringMetadata(type, stringId, metricTimestamp);
+        stringMetadataCache.put(s, stringMetadata, true);
+
+        return stringMetadata.getStringId();
+    }
+
+    // get a currently unused unique string id
+    private int getUniqueMetadataStringId() throws MetricException {
+        generateUniqueStringIds();
+        int id = unusedIds.iterator().next();
+        unusedIds.remove(id);
+        return  id;
+    }
+
+    // guarantees a list of unused string Ids exists.  Once the list is empty, creates a new list
+    // by generating a list of random numbers and removing the ones that already are in use.
+    private void generateUniqueStringIds() throws MetricException {
+        int attempts = 0;
+        while (unusedIds.isEmpty()) {
+            attempts++;
+            if (attempts > 100) {
+                String message = "Failed to generate unique ids";
+                LOG.error(message);
+                throw new MetricException(message);
+            }
+            for (int i = 0; i < 600; i++) {
+                int n = ThreadLocalRandom.current().nextInt();
+                if (n == RocksDbStore.INVALID_METADATA_STRING_ID) {
+                    continue;
+                }
+                // remove any entries in the cache
+                if (stringMetadataCache.contains(n)) {
+                    continue;
+                }
+                unusedIds.add(n);
+            }
+            // now scan all metadata and remove any matching string Ids from this list
+            RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
+            RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
+            store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
+                unusedIds.remove(key.getMetadataStringId());
+                return true; // process all metadata
+            });
+        }
+    }
+
+    // writes multiple metric values into the database as a batch operation.  The tree map keeps the keys sorted
+    // for faster insertion to RocksDB.
+    private void processBatchInsert(TreeMap<RocksDbKey, RocksDbValue> batchMap) throws MetricException {
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            // take the batched metric data and write to the database
+            for (RocksDbKey k : batchMap.keySet()) {
+                RocksDbValue v = batchMap.get(k);
+                writeBatch.put(k.getRaw(), v.getRaw());
+            }
+            store.db.write(writeOpts, writeBatch);
+        } catch (Exception e) {
+            String message = "Failed to store data to RocksDB";
+            LOG.error(message, e);
+            throw new MetricException(message, e);
+        }
+    }
+
+    // evicted metadata needs to be stored immediately.  Metadata lookups count on it being in the cache
+    // or database.
+    void handleEvictedMetadata(RocksDbKey key, RocksDbValue val) {
+        try {
+            store.db.put(key.getRaw(), val.getRaw());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    boolean isShutdown() {
+        return this.shutdown;
+    }
+
+    @Override
+    public void close() {
+        this.shutdown = true;
+
+        // get all metadata from the cache to put into the database
+        TreeMap<RocksDbKey, RocksDbValue> batchMap = new TreeMap<>();  // use a new map to prevent threading issues with writer thread
+        for (Map.Entry entry : stringMetadataCache.entrySet()) {
+            String metadataString = (String)entry.getKey();
+            StringMetadata val = (StringMetadata)entry.getValue();
+            RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), metadataString);
+
+            for (KeyType type : val.getMetadataTypes()) {   // save the metadata for all types of strings it matches
+                RocksDbKey rkey = new RocksDbKey(type, val.getStringId());
+                batchMap.put(rkey, rval);
+            }
+        }
+
+        try {
+            processBatchInsert(batchMap);
+        } catch (MetricException e) {
+            LOG.error("Failed to insert all metadata", e);
+        }
+
+        // flush db to disk
+        try (FlushOptions flushOps = new FlushOptions()) {
+            flushOps.setWaitForFlush(true);
+            store.db.flush(flushOps);
+        } catch (RocksDBException e) {
+            LOG.error("Failed ot flush RocksDB", e);
+            if (this.failureMeter != null) {
+                this.failureMeter.mark();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
new file mode 100644
index 0000000..2f44aff
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import com.codahale.metrics.Meter;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.metricstore.AggLevel;
+import org.apache.storm.metricstore.FilterOptions;
+import org.apache.storm.metricstore.Metric;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.metricstore.MetricStore;
+import org.apache.storm.utils.ObjectReader;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.IndexType;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RocksDbStore implements MetricStore, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
+    private static final int MAX_QUEUE_CAPACITY = 4000;
+    static final int INVALID_METADATA_STRING_ID = 0;
+    RocksDB db;
+    private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
+    private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
+    private RocksDbMetricsWriter metricsWriter = null;
+    private MetricsCleaner metricsCleaner = null;
+    private Meter failureMeter = null;
+
+    interface RocksDbScanCallback {
+        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop scan
+    }
+
+    /**
+     * Create metric store instance using the configurations provided via the config map.
+     *
+     * @param config Storm config map
+     * @throws MetricException on preparation error
+     */
+    public void prepare(Map config) throws MetricException {
+        validateConfig(config);
+
+        this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
+
+        RocksDB.loadLibrary();
+        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
+
+        try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
+            // use the hash index for prefix searches
+            BlockBasedTableConfig tfc = new BlockBasedTableConfig();
+            tfc.setIndexType(IndexType.kHashSearch);
+            options.setTableFormatConfig(tfc);
+            options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
+
+            String path = getRocksDbAbsoluteDir(config);
+            LOG.info("Opening RocksDB from {}", path);
+            db = RocksDB.open(options, path);
+        } catch (RocksDBException e) {
+            String message = "Error opening RockDB database";
+            LOG.error(message, e);
+            throw new MetricException(message, e);
+        }
+
+        // create thread to delete old metrics and metadata
+        Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
+        Integer deletionPeriod = 0;
+        if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
+            deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
+        }
+        metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter);
+
+        // create thread to process insertion of all metrics
+        metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
+
+        int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
+        StringMetadataCache.init(metricsWriter, cacheCapacity);
+        readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
+        metricsWriter.init(); // init the writer once the cache is setup
+
+        // start threads after metadata cache created
+        Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
+        thread.setDaemon(true);
+        thread.start();
+
+        thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    /**
+     * Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store.
+     *
+     * @param config Storm config to specify which store type, location of store and creation policy
+     * @throws MetricException if there is a missing required configuration or if the store does not exist but
+     *                         the config specifies not to create the store
+     */
+    private void validateConfig(Map config) throws MetricException {
+        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
+            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
+        }
+
+        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
+            throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy "
+                    + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
+        }
+
+        // validate path defined
+        String storePath = getRocksDbAbsoluteDir(config);
+
+        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
+        if (!createIfMissing) {
+            if (!(new File(storePath).exists())) {
+                throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath);
+            }
+        }
+
+        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) {
+            throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size "
+                    + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
+        }
+
+        if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
+            throw new MetricException("Not a valid RocksDB configuration - Missing metric retention "
+                    + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
+        }
+    }
+
+    private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
+        String storePath = (String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
+        if (storePath == null) {
+            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
+        } else {
+            if (new File(storePath).isAbsolute()) {
+                return storePath;
+            } else {
+                String stormHome = System.getProperty("storm.home");
+                if (stormHome == null) {
+                    throw new MetricException("storm.home not set");
+                }
+                return (stormHome + File.separator + storePath);
+            }
+        }
+    }
+
+    /**
+     * Stores metrics in the store.
+     *
+     * @param metric  Metric to store
+     * @throws MetricException  if database write fails
+     */
+    public void insert(Metric metric) throws MetricException {
+        try {
+            // don't bother blocking on a full queue, just drop metrics in case we can't keep up
+            if (queue.remainingCapacity() <= 0) {
+                LOG.info("Metrics q full, dropping metric");
+                return;
+            }
+            queue.put(metric);
+        } catch (Exception e) {
+            String message = "Failed to insert metric";
+            LOG.error(message, e);
+            if (this.failureMeter != null) {
+                this.failureMeter.mark();
+            }
+            throw new MetricException(message, e);
+        }
+    }
+
+    /**
+     * Fill out the numeric values for a metric.
+     *
+     * @param metric  Metric to populate
+     * @return   true if the metric was populated, false otherwise
+     * @throws MetricException  if read from database fails
+     */
+    @Override
+    public boolean populateValue(Metric metric) throws MetricException {
+        Map<String, Integer> localLookupCache = new HashMap<>(6);
+
+        int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), localLookupCache);
+        if (INVALID_METADATA_STRING_ID == topologyId) {
+            return false;
+        }
+        int metricId = lookupMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), localLookupCache);
+        if (INVALID_METADATA_STRING_ID == metricId) {
+            return false;
+        }
+        int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), localLookupCache);
+        if (INVALID_METADATA_STRING_ID == componentId) {
+            return false;
+        }
+        int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), localLookupCache);
+        if (INVALID_METADATA_STRING_ID == executorId) {
+            return false;
+        }
+        int hostId = lookupMetadataString(KeyType.HOST_STRING, metric.getHostname(), localLookupCache);
+        if (INVALID_METADATA_STRING_ID == hostId) {
+            return false;
+        }
+        int streamId = lookupMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), localLookupCache);
+        if (INVALID_METADATA_STRING_ID == streamId) {
+            return false;
+        }
+
+        RocksDbKey key = RocksDbKey.createMetricKey(metric.getAggLevel(), topologyId, metric.getTimestamp(), metricId,
+                componentId, executorId, hostId, metric.getPort(), streamId);
+
+        return populateFromKey(key, metric);
+    }
+
+    // populate metric values using the provided key
+    boolean populateFromKey(RocksDbKey key, Metric metric) throws MetricException {
+        try {
+            byte[] value = db.get(key.getRaw());
+            if (value == null) {
+                return false;
+            }
+            RocksDbValue rdbValue = new RocksDbValue(value);
+            rdbValue.populateMetric(metric);
+        } catch (Exception e) {
+            String message = "Failed to populate metric";
+            LOG.error(message, e);
+            if (this.failureMeter != null) {
+                this.failureMeter.mark();
+            }
+            throw new MetricException(message, e);
+        }
+        return true;
+    }
+
+    // attempts to lookup the unique Id for a string that may not exist yet.  Returns INVALID_METADATA_STRING_ID
+    // if it does not exist.
+    private int lookupMetadataString(KeyType type, String s, Map<String, Integer> lookupCache) throws MetricException {
+        if (s == null) {
+            if (this.failureMeter != null) {
+                this.failureMeter.mark();
+            }
+            throw new MetricException("No string for metric metadata string type " + type);
+        }
+
+        // attempt to find it in the string cache, this will update the LRU
+        StringMetadata stringMetadata = readOnlyStringMetadataCache.get(s);
+        if (stringMetadata != null) {
+            return stringMetadata.getStringId();
+        }
+
+        // attempt to find it in callers cache
+        Integer id = lookupCache.get(s);
+        if (id != null) {
+            return id;
+        }
+
+        // attempt to find the string in the database
+        stringMetadata = rocksDbGetStringMetadata(type, s);
+        if (stringMetadata != null) {
+            id = stringMetadata.getStringId();
+
+            // add to the callers cache.  We can't add it to the stringMetadataCache, since that could cause an eviction
+            // database write, which we want to only occur from the inserting DB thread.
+            lookupCache.put(s, id);
+
+            return id;
+        }
+
+        // string does not exist
+        return INVALID_METADATA_STRING_ID;
+    }
+
+    // scans the database to look for a metadata string and returns the metadata info
+    StringMetadata rocksDbGetStringMetadata(KeyType type, String s) {
+        RocksDbKey firstKey = RocksDbKey.getInitialKey(type);
+        RocksDbKey lastKey = RocksDbKey.getLastKey(type);
+        final AtomicReference<StringMetadata> reference = new AtomicReference<>();
+        scanRange(firstKey, lastKey, (key, value) -> {
+            if (s.equals(value.getMetdataString())) {
+                reference.set(value.getStringMetadata(key));
+                return false;
+            } else {
+                return true;  // haven't found string, keep searching
+            }
+        });
+        return reference.get();
+    }
+
+    // scans from key start to the key before end, calling back until callback indicates not to process further
+    void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) {
+        try (ReadOptions ro = new ReadOptions()) {
+            ro.setTotalOrderSeek(true);
+            RocksIterator iterator = db.newIterator(ro);
+            for (iterator.seek(start.getRaw()); iterator.isValid(); iterator.next()) {
+                RocksDbKey key = new RocksDbKey(iterator.key());
+                if (key.compareTo(end) >= 0) { // past limit, quit
+                    return;
+                }
+
+                RocksDbValue val = new RocksDbValue(iterator.value());
+                if (!fn.cb(key, val)) {
+                    // if cb returns false, we are done with this section of rows
+                    return;
+                }
+            }
+        }
+    }
+
+    /**
+     * Shutdown the store.
+     */
+    @Override
+    public void close() {
+        metricsWriter.close();
+        metricsCleaner.close();
+    }
+
+    /**
+     *  Scans all metrics in the store and returns the ones matching the specified filtering options.
+     *  Callback returns Metric class results.
+     *
+     * @param filter   options to filter by
+     * @param scanCallback  callback for each Metric found
+     * @throws MetricException  on error
+     */
+    public void scan(FilterOptions filter, ScanCallback scanCallback) throws MetricException {
+        scanInternal(filter, scanCallback, null);
+    }
+
+    /**
+     *  Scans all metrics in the store and returns the ones matching the specified filtering options.
+     *  Callback returns raw key/value data.
+     *
+     * @param filter   options to filter by
+     * @param rawCallback  callback for each Metric found
+     * @throws MetricException  on error
+     */
+    private void scanRaw(FilterOptions filter, RocksDbScanCallback rawCallback) throws MetricException {
+        scanInternal(filter, null, rawCallback);
+    }
+
+    // perform a scan given filter options, and return results in either Metric or raw data.
+    private void scanInternal(FilterOptions filter, ScanCallback scanCallback, RocksDbScanCallback rawCallback) throws MetricException {
+
+        Map<String, Integer> stringToIdCache = new HashMap<>();
+        Map<Integer, String> idToStringCache = new HashMap<>();
+
+        int startTopologyId = 0;
+        int endTopologyId = 0xFFFFFFFF;
+        String filterTopologyId = filter.getTopologyId();
+        if (filterTopologyId != null) {
+            int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, filterTopologyId, stringToIdCache);
+            if (INVALID_METADATA_STRING_ID == topologyId) {
+                return;  // string does not exist in database
+            }
+            startTopologyId = topologyId;
+            endTopologyId = topologyId;
+        }
+
+        long startTime = filter.getStartTime();
+        long endTime = filter.getEndTime();
+
+        int startMetricId = 0;
+        int endMetricId = 0xFFFFFFFF;
+        String filterMetricName = filter.getMetricName();
+        if (filterMetricName != null) {
+            int metricId = lookupMetadataString(KeyType.METRIC_STRING, filterMetricName, stringToIdCache);
+            if (INVALID_METADATA_STRING_ID == metricId) {
+                return;  // string does not exist in database
+            }
+            startMetricId = metricId;
+            endMetricId = metricId;
+        }
+
+        int startComponentId = 0;
+        int endComponentId = 0xFFFFFFFF;
+        String filterComponentId = filter.getComponentId();
+        if (filterComponentId != null) {
+            int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, filterComponentId, stringToIdCache);
+            if (INVALID_METADATA_STRING_ID == componentId) {
+                return;  // string does not exist in database
+            }
+            startComponentId = componentId;
+            endComponentId = componentId;
+        }
+
+        int startExecutorId = 0;
+        int endExecutorId = 0xFFFFFFFF;
+        String filterExecutorName = filter.getExecutorId();
+        if (filterExecutorName != null) {
+            int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, filterExecutorName, stringToIdCache);
+            if (INVALID_METADATA_STRING_ID == executorId) {
+                return;  // string does not exist in database
+            }
+            startExecutorId = executorId;
+            endExecutorId = executorId;
+        }
+
+        int startHostId = 0;
+        int endHostId = 0xFFFFFFFF;
+        String filterHostId = filter.getHostId();
+        if (filterHostId != null) {
+            int hostId = lookupMetadataString(KeyType.HOST_STRING, filterHostId, stringToIdCache);
+            if (INVALID_METADATA_STRING_ID == hostId) {
+                return;  // string does not exist in database
+            }
+            startHostId = hostId;
+            endHostId = hostId;
+        }
+
+        int startPort = 0;
+        int endPort = 0xFFFFFFFF;
+        Integer filterPort = filter.getPort();
+        if (filterPort != null) {
+            startPort = filterPort;
+            endPort = filterPort;
+        }
+
+        int startStreamId = 0;
+        int endStreamId = 0xFFFFFFFF;
+        String filterStreamId = filter.getStreamId();
+        if (filterStreamId != null) {
+            int streamId = lookupMetadataString(KeyType.HOST_STRING, filterStreamId, stringToIdCache);
+            if (INVALID_METADATA_STRING_ID == streamId) {
+                return;  // string does not exist in database
+            }
+            startStreamId = streamId;
+            endStreamId = streamId;
+        }
+
+        ReadOptions ro = new ReadOptions();
+        ro.setTotalOrderSeek(true);
+
+        for (AggLevel aggLevel : filter.getAggLevels()) {
+
+            RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId,
+                    startComponentId, startExecutorId, startHostId, startPort, startStreamId);
+            RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId,
+                    endComponentId, endExecutorId, endHostId, endPort, endStreamId);
+
+            RocksIterator iterator = db.newIterator(ro);
+            for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) {
+                RocksDbKey key = new RocksDbKey(iterator.key());
+
+                if (key.compareTo(endKey) > 0) { // past limit, quit
+                    break;
+                }
+
+                if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) {
+                    continue;
+                }
+
+                long timestamp = key.getTimestamp();
+                if (timestamp < startTime || timestamp > endTime) {
+                    continue;
+                }
+
+                if (startMetricId != 0 && key.getMetricId() != startMetricId) {
+                    continue;
+                }
+
+                if (startComponentId != 0 && key.getComponentId() != startComponentId) {
+                    continue;
+                }
+
+                if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) {
+                    continue;
+                }
+
+                if (startHostId != 0 && key.getHostnameId() != startHostId) {
+                    continue;
+                }
+
+                if (startPort != 0 && key.getPort() != startPort) {
+                    continue;
+                }
+
+                if (startStreamId != 0 && key.getStreamId() != startStreamId) {
+                    continue;
+                }
+
+                RocksDbValue val = new RocksDbValue(iterator.value());
+
+                if (scanCallback != null) {
+                    try {
+                        // populate a metric
+                        String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
+                        String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache);
+                        String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache);
+                        String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache);
+                        String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
+                        String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache);
+
+                        Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname,
+                                streamId, key.getPort(), aggLevel);
+
+                        val.populateMetric(metric);
+
+                        // callback to caller
+                        scanCallback.cb(metric);
+                    } catch (MetricException e) {
+                        LOG.warn("Failed to report found metric: {}", e.getMessage());
+                    }
+                } else {
+                    if (!rawCallback.cb(key, val)) {
+                        return;
+                    }
+                }
+            }
+            iterator.close();
+        }
+        ro.close();
+    }
+
+    // Finds the metadata string that matches the string Id and type provided.  The string should exist, as it is
+    // referenced from a metric.
+    private String metadataIdToString(KeyType type, int id, Map<Integer, String> lookupCache) throws MetricException {
+        String s = readOnlyStringMetadataCache.getMetadataString(id);
+        if (s != null) {
+            return s;
+        }
+        s = lookupCache.get(id);
+        if (s != null) {
+            return s;
+        }
+        // get from DB and add to lookup cache
+        RocksDbKey key = new RocksDbKey(type, id);
+        try {
+            byte[] value = db.get(key.getRaw());
+            if (value == null) {
+                throw new MetricException("Failed to find metadata string for id " + id + " of type " + type);
+            }
+            RocksDbValue rdbValue = new RocksDbValue(value);
+            s = rdbValue.getMetdataString();
+            lookupCache.put(id, s);
+            return s;
+        }  catch (RocksDBException e) {
+            if (this.failureMeter != null) {
+                this.failureMeter.mark();
+            }
+            throw new MetricException("Failed to get from RocksDb", e);
+        }
+    }
+
+    // deletes metrics matching the filter options
+    void deleteMetrics(FilterOptions filter) throws MetricException {
+        try (WriteBatch writeBatch = new WriteBatch();
+             WriteOptions writeOps = new WriteOptions()) {
+
+            scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> {
+                writeBatch.remove(key.getRaw());
+                return true;
+            });
+
+            if (writeBatch.count() > 0) {
+                LOG.info("Deleting {} metrics", writeBatch.count());
+                try {
+                    db.write(writeOps, writeBatch);
+                } catch (Exception e) {
+                    String message = "Failed delete metrics";
+                    LOG.error(message, e);
+                    if (this.failureMeter != null) {
+                        this.failureMeter.mark();
+                    }
+                    throw new MetricException(message, e);
+                }
+            }
+        }
+    }
+
+    // deletes metadata strings before the provided timestamp
+    void deleteMetadataBefore(long firstValidTimestamp) throws MetricException {
+        if (firstValidTimestamp < 1L) {
+            if (this.failureMeter != null) {
+                this.failureMeter.mark();
+            }
+            throw new MetricException("Invalid timestamp for deleting metadata: " + firstValidTimestamp);
+        }
+
+        try (WriteBatch writeBatch = new WriteBatch();
+             WriteOptions writeOps = new WriteOptions()) {
+
+            // search all metadata strings
+            RocksDbKey topologyMetadataPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
+            RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
+            scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> {
+                // we'll assume the metadata was recently used if still in the cache.
+                if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) {
+                    if (value.getLastTimestamp() < firstValidTimestamp) {
+                        writeBatch.remove(key.getRaw());
+                    }
+                }
+                return true;
+            });
+
+            if (writeBatch.count() > 0) {
+                LOG.info("Deleting {} metadata strings", writeBatch.count());
+                try {
+                    db.write(writeOps, writeBatch);
+                } catch (Exception e) {
+                    String message = "Failed delete metadata strings";
+                    LOG.error(message, e);
+                    if (this.failureMeter != null) {
+                        this.failureMeter.mark();
+                    }
+                    throw new MetricException(message, e);
+                }
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
new file mode 100644
index 0000000..58b2c76
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbValue.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.nio.ByteBuffer;
+import org.apache.storm.metricstore.Metric;
+
+
+/**
+ * Class representing the data used as a Value in RocksDB.  Values can be used either for metadata or metrics.
+ *
+ * <p>Formats for Metadata String values are:
+ *
+ * <pre>
+ * Field             Size         Offset
+ *
+ * Version              1              0      The current metadata version - allows migrating if the format changes in the future
+ * Timestamp            8              1      The time when the metadata was last used by a metric.  Allows deleting of old metadata.
+ * Metadata String    any              9      The metadata string
+ *</pre>
+ *
+ * <p>Formats for Metric values are:
+ *
+ * <pre>
+ * Field             Size         Offset
+ *
+ * Version              1              0      The current metric version - allows migrating if the format changes in the future
+ * Value                8              1      The metric value
+ * Count                8              9      The metric count
+ * Min                  8             17      The minimum metric value
+ * Max                  8             25      The maximum metric value
+ * Sum                  8             33      The sum of the metric values
+ * </pre>
+ */
+
+class RocksDbValue {
+    private static int METRIC_VALUE_SIZE = 41;
+    private byte[] value;
+    private static final byte CURRENT_METADATA_VERSION = 0;
+    private static final byte CURRENT_METRIC_VERSION = 0;
+    private static int MIN_METADATA_VALUE_SIZE = 9;
+
+    /**
+     * Constructor from raw data.
+     *
+     * @param value  the raw bytes representing the key
+     */
+    RocksDbValue(byte[] value) {
+        this.value = value;
+    }
+
+    /**
+     * Constructor for a metadata string.
+     *
+     * @param lastTimestamp  the last timestamp when the string was used
+     * @param metadataString   the metadata string
+     */
+    RocksDbValue(long lastTimestamp, String metadataString) {
+        this.value = new byte[MIN_METADATA_VALUE_SIZE + metadataString.length()];
+        ByteBuffer bb = ByteBuffer.wrap(value);
+        bb.put(CURRENT_METADATA_VERSION);
+        bb.putLong(lastTimestamp);
+        bb.put(metadataString.getBytes());
+    }
+
+    /**
+     * Constructor for a metric.
+     *
+     * @param m  the metric to create a value from
+     */
+    RocksDbValue(Metric m) {
+        this.value = new byte[METRIC_VALUE_SIZE];
+        ByteBuffer bb = ByteBuffer.wrap(value);
+        bb.put(CURRENT_METRIC_VERSION);
+        bb.putDouble(m.getValue());
+        bb.putLong(m.getCount());
+        bb.putDouble(m.getMin());
+        bb.putDouble(m.getMax());
+        bb.putDouble(m.getSum());
+    }
+
+    /**
+     * Get the metadata string portion of the value.  Assumes the value is metadata.
+     *
+     * @return   the metadata string
+     */
+    String getMetdataString() {
+        if (this.value.length < MIN_METADATA_VALUE_SIZE) {
+            throw new RuntimeException("RocksDB value is too small to be a metadata string!");
+        }
+        return new String(this.value, 9, this.value.length - 9);
+    }
+
+    /**
+     * Gets StringMetadata associated with the key/value pair.
+     */
+    StringMetadata getStringMetadata(RocksDbKey key) {
+        return new StringMetadata(key.getType(), key.getMetadataStringId(), this.getLastTimestamp());
+    }
+
+    /**
+     * Gets the last time a metadata string was used.
+     */
+    long getLastTimestamp() {
+        return ByteBuffer.wrap(value, 1, 8).getLong();
+    }
+
+    /**
+     * get the raw value bytes
+     */
+    byte[] getRaw() {
+        return this.value;
+    }
+
+    /**
+     * populate metric values from the raw data.
+     */
+    void populateMetric(Metric metric) {
+        ByteBuffer bb = ByteBuffer.wrap(this.value, 0, METRIC_VALUE_SIZE);
+        bb.get();  // version
+        metric.setValue(bb.getDouble());
+        metric.setCount(bb.getLong());
+        metric.setMin(bb.getDouble());
+        metric.setMax(bb.getDouble());
+        metric.setSum(bb.getDouble());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
new file mode 100644
index 0000000..6f54a58
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadata.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class that contains the information associated with a metadata string that remains cached in memory.
+ */
+class StringMetadata {
+    private List<KeyType> types = new ArrayList<>(1);  // its possible a string is used by multiple types of metadata strings
+    private int stringId;
+    private long lastTimestamp;
+
+    /**
+     * Constructor for StringMetadata.
+     *
+     * @param metadataType   the type of metadata string
+     * @param stringId       the unique id for the metadata string
+     * @param lastTimestamp   the timestamp when the metric used the metadata string
+     */
+    StringMetadata(KeyType metadataType, Integer stringId, Long lastTimestamp) {
+        this.types.add(metadataType);
+        this.stringId = stringId;
+        this.lastTimestamp = lastTimestamp;
+    }
+
+    int getStringId() {
+        return this.stringId;
+    }
+
+    long getLastTimestamp() {
+        return this.lastTimestamp;
+    }
+
+    List<KeyType> getMetadataTypes() {
+        return this.types;
+    }
+
+    private void addKeyType(KeyType type) {
+        if (!this.types.contains(type)) {
+            this.types.add(type);
+        }
+    }
+
+    /**
+     * Updates the timestamp of when a metadata string was last used.  Adds the type of the string if it is a new
+     * type.
+     *
+     * @param metricTimestamp   the timestamp of the metric using the metadata string
+     * @param type    the type of metadata string for the metric
+     */
+    void update(Long metricTimestamp, KeyType type) {
+        if (metricTimestamp > this.lastTimestamp) {
+            this.lastTimestamp = metricTimestamp;
+        }
+        addKeyType(type);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
new file mode 100644
index 0000000..7ce8435
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/StringMetadataCache.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.storm.metricstore.MetricException;
+import org.apache.storm.utils.LruMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to create a use a cache that stores Metadata string information in memory.  It allows searching for a
+ * Metadata string's unique id, or looking up the string by the unique id.  The StringMetadata is stored in an
+ * LRU map.  When an entry is added to the cache, an older entry may be evicted, which then needs to be
+ * immediately stored to the database to provide a consistent view of all the metadata strings.
+ *
+ * <p>All write operations adding metadata to RocksDB are done by a single thread (a RocksDbMetricsWriter),
+ * but multiple threads can read values from the cache. To clarify which permissions are accessible by various
+ * threads, the ReadOnlyStringMetadataCache and WritableStringMetadataCache are provided to be used.
+ */
+
+public class StringMetadataCache implements LruMap.CacheEvictionCallback<String, StringMetadata>,
+        WritableStringMetadataCache, ReadOnlyStringMetadataCache {
+    private static final Logger LOG = LoggerFactory.getLogger(StringMetadataCache.class);
+    private Map<String, StringMetadata> lruStringCache;
+    private Map<Integer, String> hashToString = new ConcurrentHashMap<>();
+    private RocksDbMetricsWriter dbWriter;
+    private static StringMetadataCache instance = null;
+
+    /**
+     * Initializes the cache instance.
+     *
+     * @param dbWriter   the RocksDB writer instance to handle writing evicted cache data
+     * @param capacity   the number of StringMetadata instances to hold in memory
+     * @throws MetricException   if creating multiple cache instances
+     */
+    static void init(RocksDbMetricsWriter dbWriter, int capacity) throws MetricException {
+        if (instance == null) {
+            instance = new StringMetadataCache(dbWriter, capacity);
+        } else {
+            throw new MetricException("StringMetadataCache already created");
+        }
+    }
+
+    /**
+     * Provides the WritableStringMetadataCache interface to the cache instance.
+     *
+     * @throws MetricException  if the cache instance was not created
+     */
+    static WritableStringMetadataCache getWritableStringMetadataCache() throws MetricException {
+        if (instance != null) {
+            return instance;
+        } else {
+            throw new MetricException("StringMetadataCache was not initialized");
+        }
+    }
+
+    /**
+     * Provides the ReadOnlyStringMetadataCache interface to the cache instance.
+     *
+     * @throws MetricException  if the cache instance was not created
+     */
+    static ReadOnlyStringMetadataCache getReadOnlyStringMetadataCache() throws MetricException {
+        if (instance != null) {
+            return instance;
+        } else {
+            throw new MetricException("StringMetadataCache was not initialized");
+        }
+    }
+
+    /**
+     * Constructor to create a cache.
+     *
+     * @param dbWriter   The rocks db writer instance the cache should use when evicting data
+     * @param capacity  The cache size
+     */
+    private StringMetadataCache(RocksDbMetricsWriter dbWriter, int capacity) {
+        lruStringCache = Collections.synchronizedMap(new LruMap<>(capacity, this));
+        this.dbWriter = dbWriter;
+    }
+
+    /**
+     * Get the string metadata from the cache.
+     *
+     * @param s   The string to look for
+     * @return   the metadata associated with the string or null if not found
+     */
+    public StringMetadata get(String s) {
+        return lruStringCache.get(s);
+    }
+
+    /**
+     * Add the string metadata to the cache.
+     *
+     * NOTE: this can cause data to be evicted from the cache when full.  When this occurs, the evictionCallback() method
+     * is called to store the metadata back into the RocksDB database.
+     *
+     * This method is only exposed to the WritableStringMetadataCache interface.
+     *
+     * @param s   The string to add
+     * @param stringMetadata  The string's metadata
+     * @param newEntry   Indicates the metadata is being used for the first time and should be written to RocksDB immediately
+     * @throws MetricException   when evicted data fails to save to the database or when the database is shutdown
+     */
+    public void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException {
+        if (dbWriter.isShutdown()) {
+            // another thread could be writing out the metadata cache to the database.
+            throw new MetricException("Shutting down");
+        }
+        try {
+            if (newEntry) {
+                writeMetadataToDisk(s, stringMetadata);
+            }
+            lruStringCache.put(s, stringMetadata);
+            hashToString.put(stringMetadata.getStringId(), s);
+        } catch (Exception e) { // catch any runtime exceptions caused by eviction
+            throw new MetricException("Failed to save string in metadata cache", e);
+        }
+    }
+
+    /**
+     * Callback when data is about to be removed from the cache.  This method then
+     * immediately writes the metadata to RocksDB.
+     *
+     * @param key   The evicted string
+     * @param val  The evicted string's metadata
+     * @throws RuntimeException   when evicted data fails to save to the database
+     */
+    public void evictionCallback(String key, StringMetadata val) {
+        writeMetadataToDisk(key, val);
+    }
+
+    private void writeMetadataToDisk(String key, StringMetadata val) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing {} to RocksDB", key);
+        }
+        // remove reverse lookup from map
+        hashToString.remove(val.getStringId());
+
+        // save the evicted key/value to the database immediately
+        RocksDbValue rval = new RocksDbValue(val.getLastTimestamp(), key);
+
+        for (KeyType type : val.getMetadataTypes()) { // save the metadata for all types of strings it matches
+            RocksDbKey rkey = new RocksDbKey(type, val.getStringId());
+            dbWriter.handleEvictedMetadata(rkey, rval);
+        }
+    }
+
+    /**
+     * Determines if a string Id is contained in the cache.
+     *
+     * @param stringId   The string Id to check
+     * @return   true if the Id is in the cache, false otherwise
+     */
+    public boolean contains(Integer stringId) {
+        return hashToString.containsKey(stringId);
+    }
+
+    /**
+     * Returns the string matching the string Id if in the cache.
+     *
+     * @param stringId   The string Id to check
+     * @return   the associated string if the Id is in the cache, null otherwise
+     */
+    public String getMetadataString(Integer stringId) {
+        return hashToString.get(stringId);
+    }
+
+    /**
+     * Get the map of the cache contents.  Provided to allow writing the data to RocksDB on shutdown.
+     *
+     * @return   the string metadata map entrySet
+     */
+    public Set<Map.Entry<String, StringMetadata>> entrySet() {
+        return lruStringCache.entrySet();
+    }
+
+    static void cleanUp() {
+        instance = null;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
new file mode 100644
index 0000000..2d4165f
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/WritableStringMetadataCache.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.storm.metricstore.MetricException;
+
+/**
+ * The writable interface to a StringMetadataCache intended to be used by a single RocksDBMetricwWriter instance.
+ */
+@NotThreadSafe
+public interface WritableStringMetadataCache extends ReadOnlyStringMetadataCache {
+
+    /**
+     * Add the string metadata to the cache.
+     *
+     * * NOTE: this can cause data to be evicted from the cache when full.  When this occurs, the evictionCallback() method
+     * is called to store the metadata back into the RocksDB database.
+     *
+     * This method is only exposed to the WritableStringMetadataCache interface.
+     *
+     * @param s   The string to add
+     * @param stringMetadata  The string's metadata
+     * @param newEntry   Indicates the metadata is being used for the first time and should be written to RocksDB immediately
+     * @throws MetricException   when evicted data fails to save to the database or when the database is shutdown
+     */
+    void put(String s, StringMetadata stringMetadata, boolean newEntry) throws MetricException;
+
+    /**
+     * Get the map of the cache contents.  Provided to allow writing the data to RocksDB on shutdown.
+     *
+     * @return   the string metadata map entrySet
+     */
+    Set<Map.Entry<String, StringMetadata>> entrySet();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/utils/LruMap.java b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
new file mode 100644
index 0000000..3ed5d06
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/utils/LruMap.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LruMap<A, B> extends LinkedHashMap<A, B> {
+    private int maxSize;
+    private CacheEvictionCallback evCb = null;
+
+    public LruMap(int maxSize) {
+        super(maxSize + 1, 1.0f, true);
+        this.maxSize = maxSize;
+    }
+
+    /**
+     * Creates an LRU map that will call back before data is removed from the map.
+     *
+     * @param maxSize   max capacity for the map
+     * @param evictionCallback   callback to be called before removing data
+     */
+    public LruMap(int maxSize, CacheEvictionCallback evictionCallback) {
+        this(maxSize);
+        this.evCb = evictionCallback;
+    }
+    
+    @Override
+    protected boolean removeEldestEntry(final Map.Entry<A, B> eldest) {
+        boolean evict = size() > this.maxSize;
+        if (evict && this.evCb != null) {
+            this.evCb.evictionCallback(eldest.getKey(), eldest.getValue());
+        }
+        return evict;
+    }
+
+    public interface CacheEvictionCallback<K, V> {
+        void evictionCallback(K key, V val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/48e23a99/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java
new file mode 100644
index 0000000..21d2377
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/metricstore/rocksdb/RocksDbKeyTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.storm.metricstore.rocksdb;
+
+import org.apache.storm.metricstore.AggLevel;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RocksDbKeyTest {
+
+    @Test
+    public void testConstructors() {
+        byte[] raw = new byte[RocksDbKey.KEY_SIZE];
+        raw[0] = KeyType.COMPONENT_STRING.getValue();
+        raw[2] = 0x01;
+        raw[3] = 0x02;
+        raw[4] = 0x03;
+        raw[5] = 0x04;
+        RocksDbKey rawKey = new RocksDbKey(raw);
+
+        RocksDbKey metadataKey = new RocksDbKey(KeyType.COMPONENT_STRING, 0x01020304);
+        Assert.assertEquals(0, metadataKey.compareTo(rawKey));
+        Assert.assertEquals(KeyType.COMPONENT_STRING, metadataKey.getType());
+
+        metadataKey = new RocksDbKey(KeyType.TOPOLOGY_STRING, 0x01020304);
+        Assert.assertTrue(metadataKey.compareTo(rawKey) < 0);
+        Assert.assertEquals(KeyType.TOPOLOGY_STRING, metadataKey.getType());
+
+        metadataKey = new RocksDbKey(KeyType.COMPONENT_STRING, 0x01020305);
+        Assert.assertTrue(metadataKey.compareTo(rawKey) > 0);
+
+        Assert.assertEquals(0x01020304, rawKey.getTopologyId());
+        Assert.assertEquals(KeyType.COMPONENT_STRING, rawKey.getType());
+    }
+
+    @Test
+    public void testMetricKey() {
+        AggLevel aggLevel = AggLevel.AGG_LEVEL_10_MIN;
+        int topologyId = 0x45665;
+        long timestamp = System.currentTimeMillis();
+        int metricId = 0xF3916034;
+        int componentId = 0x82915031;
+        int executorId = 0x434738;
+        int hostId = 0x4348394;
+        int port = 3456;
+        int streamId = 0x84221956;
+        RocksDbKey key = RocksDbKey.createMetricKey(aggLevel, topologyId, timestamp, metricId,
+                componentId, executorId, hostId, port, streamId);
+        Assert.assertEquals(topologyId, key.getTopologyId());
+        Assert.assertEquals(timestamp, key.getTimestamp());
+        Assert.assertEquals(metricId, key.getMetricId());
+        Assert.assertEquals(componentId, key.getComponentId());
+        Assert.assertEquals(executorId, key.getExecutorId());
+        Assert.assertEquals(hostId, key.getHostnameId());
+        Assert.assertEquals(port, key.getPort());
+        Assert.assertEquals(streamId, key.getStreamId());
+    }
+}