You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by agresch <gi...@git.apache.org> on 2018/01/05 19:15:15 UTC

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

GitHub user agresch opened a pull request:

    https://github.com/apache/storm/pull/2504

    STORM-2156: store metrics into RocksDB

    
    This PR adds a RocksDB key/value database to Nimbus for storing metrics from the Supervisors.  The implementation is replaceable by implementing the MetricStore interface and updating the storm.metricstore.class configuration option.
    
    As metrics are stored in RocksDB, their string values (for topology id and executor Id, etc.) are converted to unique integer Ids, and these strings are also stored to the database as metadata indexed by the integer Id.  When a metric is stored, it is also aggregated with any existing metric within the same 1, 10, and 60 minute timeframe.
    
    Metric.java contains all the relevant metric properties stored in the database.
    
    FilterOptions.java provides all the options available to pass to scan the database for various metrics.
    
    The Supervisor sends metrics (currently only for memory usage) to Nimbus by thrift in processMetrics() in Container.java
    
    Nimbus receives the Supervisor metrics and inserts them in the database in processWorkerMetrics().
    
    RocksDbStore is the RocksDB metric store implementation.
    
    RocksDbKey provides documentation on how metric data and metadata strings are stored as a key in RocksDB.  RocksDbValue provides similar documentation for the value format.
    
    StringMetadataCache provides an in-memory cache of the most recently used string metadata for fast lookup.  The cache size is configurable.
    
    RocksDbMetricsWriter performs the metric writes to RocksDB, taking metrics off the queue and aggregating with existing data, and storing metadata as required.  There's a single thread inserting to prevent threading issues.  Metric database inserts in my tests typically take 80 to 150 microseconds (including the aggregation) for an individual metric.
    
    MetricsCleaner exists to periodically delete old metrics and remove unused metadata strings.  Metric retention time is configurable.
    
    RocksDbStoreTest provide sample code for inserting and searching for metrics.
    
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/agresch/storm agresch_rocksdb

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2504.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2504
    
----
commit 517310586e69a139002140fec12b2e12527a0fa2
Author: Aaron Gresch <ag...@...>
Date:   2017-12-07T17:36:03Z

    STORM-2156: store metrics into RocksDB

----


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162630427
  
    --- Diff: pom.xml ---
    @@ -324,6 +324,7 @@
             <jool.version>0.9.12</jool.version>
             <caffeine.version>2.3.5</caffeine.version>
             <jaxb-version>2.3.0</jaxb-version>
    +        <rocksdb-version>5.8.6</rocksdb-version>
    --- End diff --
    
    No.  It was the latest version at the time.  Open to advice/suggestions.  5.8.7 and 5.9.2 now also exist.


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    @agresch 
    Never mind. I can see document file. For me, removing first blank line works. Could you try out and see it works, and if it works, could you squash commits into one and let me know?


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162543795
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/MetricsCleaner.java ---
    @@ -0,0 +1,101 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Gauge;
    +import com.codahale.metrics.Meter;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metricstore.FilterOptions;
    +import org.apache.storm.metricstore.MetricException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class for removing expired metrics and unused metadata from the RocksDB store.
    + */
    +public class MetricsCleaner implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(MetricsCleaner.class);
    +    private static long DEFAULT_SLEEP_MS = 4L * 60L * 60L * 1000L;
    +    private RocksDbStore store;
    +    private long retentionHours;
    +    private volatile boolean shutdown = false;
    +    private long sleepMs = DEFAULT_SLEEP_MS;
    +    private Meter failureMeter;
    +    private long purgeTimestamp = 0L;
    +
    +    MetricsCleaner(RocksDbStore store, int retentionHours, int hourlyPeriod, Meter failureMeter) {
    +        this.store = store;
    +        this.retentionHours = retentionHours;
    +        if (hourlyPeriod > 0) {
    +            this.sleepMs = hourlyPeriod * 60L * 60L * 1000L;
    +        }
    +        this.failureMeter = failureMeter;
    +
    +        Gauge<Long> gauge = new Gauge<Long>() {
    +            @Override
    +            public Long getValue() {
    +                return purgeTimestamp;
    +            }
    +        };
    +        StormMetricsRegistry.registerProvidedGauge("MetricsCleaner:purgeTimestamp", gauge);
    +    }
    +
    +    @Override
    +    public void close() {
    +        shutdown = true;
    +    }
    +
    +    @Override
    +    public void run() {
    +        while (true) {
    +            if (shutdown) {
    --- End diff --
    
    Is this a reason to have separate if statement instead of `while (!shutdown)`?


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162544306
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---
    @@ -0,0 +1,323 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +import org.apache.http.annotation.NotThreadSafe;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from a blocking queue.  Inserts
    + * to RocksDB are done using a single thread to simplify design (such as looking up existing metric data for aggregation,
    + * and fetching/evicting metadata from the cache).
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + * </P>
    + * The following issues would need to be addressed to implement a multithreaded metrics writer:
    + * <ul>
    + *     <li>Generation of unique unused IDs for new metadata strings needs to be thread safe.</li>
    + *     <li>Ensuring newly created metadata strings are seen by all threads.</li>
    + *     <li>Maintaining a properly cached state of metadata for multiple writers.  The current LRU cache
    + *     evicts data as new metadata is added.</li>
    + *     <li>Processing the aggregation of a metric requires fetching and updating previous aggregates.  A multithreaded
    + *     design would need to ensure two metrics were not updating an aggregated metric at the same time.</li>
    + *     <li>Investigate performance of multiple threads inserting into RocksDB versus a single ordered insert.</li>
    + * </ul>
    + */
    +@NotThreadSafe
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
    +    private WriteOptions writeOpts = new WriteOptions();
    +    private volatile boolean shutdown = false;
    +    private Meter failureMeter;
    +    private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
    +
    +    /**
    +     * Constructor for the RocksDbMetricsWriter.
    +     *
    +     * @param store   The RocksDB store
    +     * @param queue   The queue to receive metrics for insertion
    +     */
    +    RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter)  {
    +        this.store = store;
    +        this.queue = queue;
    +        this.failureMeter = failureMeter;
    +
    +        aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
    +    }
    +
    +    /**
    +     * Init routine called once the Metadata cache has been created.
    +     *
    +     * @throws MetricException  on cache error
    +     */
    +    void init() throws MetricException {
    +        this.stringMetadataCache = StringMetadataCache.getWritableStringMetadataCache();
    +    }
    +
    +    /**
    +     * Run routine to wait for metrics on a queue and insert into RocksDB.
    +     */
    +    @Override
    +    public void run() {
    +        while (true) {
    +            if (shutdown) {
    --- End diff --
    
    Same here.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160250660
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---
    @@ -0,0 +1,306 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from the a blocking queue.
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + */
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
    --- End diff --
    
    The MetricsWriter is a single thread implementation.  RocksDB documentation recommended a single writer for speed, and this simplified the caching of metadata.  If we need to switch to multiple threads, I completely agree the design is not thread safe.


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    yes, removing the line of space fixed the issue for me.  Thanks. 


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162768150
  
    --- Diff: pom.xml ---
    @@ -324,6 +324,7 @@
             <jool.version>0.9.12</jool.version>
             <caffeine.version>2.3.5</caffeine.version>
             <jaxb-version>2.3.0</jaxb-version>
    +        <rocksdb-version>5.8.6</rocksdb-version>
    --- End diff --
    
    Actually I don't have experience with RocksDB. If you see RocksDB 5.8.6 is running properly, let's just use the version.


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    Other than creating a storm.home constant, I added the changes you requested.  
    
    I am having trouble getting the html for the new  md file generated to validate the formatting.  I ran "jekyll serve -w" and was able to see an updated link for my page in index.md on the html side, but it did not generate any html for storm-metricstore.md.  I could use some advice on how this is supposed to work.  Thanks.


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    @agresch Thanks for handling the issue. Merging.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160241831
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -1081,6 +1088,14 @@ public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stor
                 BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper)
             throws Exception {
             this.conf = conf;
    +
    +        this.metricsStore = null;
    +        try {
    +            this.metricsStore = MetricStoreConfig.configure(conf);
    +        } catch (Exception e) {
    +            LOG.error("Failed to initialize metric store", e);
    --- End diff --
    
    Could we add in a comment that the metrics store is not critical to the operation of the cluster so if it does not come up we will not stop the cluster from coming up.


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    Addressed all the code review comments from @revans2 other than the inline comments. 


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160246323
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java ---
    @@ -0,0 +1,636 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +import java.io.File;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.FilterOptions;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.apache.storm.metricstore.MetricStore;
    +import org.apache.storm.utils.ObjectReader;
    +import org.rocksdb.BlockBasedTableConfig;
    +import org.rocksdb.IndexType;
    +import org.rocksdb.Options;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class RocksDbStore implements MetricStore, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
    +    private static final int MAX_QUEUE_CAPACITY = 4000;
    +    static final int INVALID_METADATA_STRING_ID = 0;
    +    RocksDB db;
    +    private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
    +    private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
    +    private RocksDbMetricsWriter metricsWriter = null;
    +    private MetricsCleaner metricsCleaner = null;
    +    private Meter failureMeter = null;
    +
    +    interface RocksDbScanCallback {
    +        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop scan
    +    }
    +
    +    /**
    +     * Create metric store instance using the configurations provided via the config map.
    +     *
    +     * @param config Storm config map
    +     * @throws MetricException on preparation error
    +     */
    +    public void prepare(Map config) throws MetricException {
    +        validateConfig(config);
    +
    +        this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
    +
    +        RocksDB.loadLibrary();
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +
    +        try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
    +            // use the hash index for prefix searches
    +            BlockBasedTableConfig tfc = new BlockBasedTableConfig();
    +            tfc.setIndexType(IndexType.kHashSearch);
    +            options.setTableFormatConfig(tfc);
    +            options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
    +
    +            String path = getRocksDbAbsoluteDir(config);
    +            LOG.info("Opening RocksDB from " + path);
    +            db = RocksDB.open(options, path);
    +        } catch (RocksDBException e) {
    +            String message = "Error opening RockDB database";
    +            LOG.error(message, e);
    +            throw new MetricException(message, e);
    +        }
    +
    +        // create thread to delete old metrics and metadata
    +        Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
    +        Integer retentionPeriod = 0;
    +        if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_PERIOD_HOURS)) {
    +            retentionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_PERIOD_HOURS).toString());
    +        }
    +        metricsCleaner = new MetricsCleaner(this, retentionHours, retentionPeriod, failureMeter);
    +
    +        // create thread to process insertion of all metrics
    +        metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
    +
    +        int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
    +        StringMetadataCache.init(metricsWriter, cacheCapacity);
    +        readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
    +        metricsWriter.init(); // init the writer once the cache is setup
    +
    +        // start threads after metadata cache created
    +        Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
    +        thread.start();
    +        thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
    +        thread.start();
    +    }
    +
    +    /**
    +     * Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store.
    +     *
    +     * @param config Storm config to specify which store type, location of store and creation policy
    +     * @throws MetricException if there is a missing required configuration or if the store does not exist but
    +     *                         the config specifies not to create the store
    +     */
    +    private void validateConfig(Map config) throws MetricException {
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy "
    +                    + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
    +        }
    +
    +        // validate path defined
    +        String storePath = getRocksDbAbsoluteDir(config);
    +
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +        if (!createIfMissing) {
    +            if (!(new File(storePath).exists())) {
    +                throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath);
    +            }
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size "
    +                    + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
    +        }
    +
    +        if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metric retention "
    +                    + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
    +        }
    +    }
    +
    +    private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
    +        String storePath = conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION).toString();
    +        if (storePath == null) {
    --- End diff --
    
    storePath should never be null because we called toString on the original object just one line above.  We probably want to cast it to a String instead of calling toString on it.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162322576
  
    --- Diff: storm-client/src/storm.thrift ---
    @@ -836,3 +837,24 @@ exception HBAuthorizationException {
     exception HBExecutionException {
       1: required string msg;
     }
    +
    +struct WorkerMetricFields {
    --- End diff --
    
    You could check importing ttypes.py to see it works fine. If current patch doesn't break ttypes.py please ignore my comment.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160250982
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---
    @@ -0,0 +1,306 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from the a blocking queue.
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + */
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
    +    private WriteOptions writeOpts = new WriteOptions();
    +    private volatile boolean shutdown = false;
    +    private Meter failureMeter;
    +    private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
    +
    +    /**
    +     * Constructor for the RocksDbMetricsWriter.
    +     *
    +     * @param store   The RocksDB store
    +     * @param queue   The queue to receive metrics for insertion
    +     */
    +    RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter)  {
    +        this.store = store;
    +        this.queue = queue;
    +        this.failureMeter = failureMeter;
    +
    +        aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
    +    }
    +
    +    /**
    +     * Init routine called once the Metadata cache has been created.
    +     *
    +     * @throws MetricException  on cache error
    +     */
    +    void init() throws MetricException {
    +        this.stringMetadataCache = StringMetadataCache.getWritableStringMetadataCache();
    +    }
    +
    +    /**
    +     * Run routine to wait for metrics on a queue and insert into RocksDB.
    +     */
    +    @Override
    +    public void run() {
    +        while (true) {
    +            if (shutdown) {
    +                return;
    +            }
    +            try {
    +                Metric m = (Metric) queue.take();
    +                processInsert(m);
    +            } catch (Exception e) {
    +                LOG.error("Failed to insert metric", e);
    +                if (this.failureMeter != null) {
    +                    this.failureMeter.mark();
    +                }
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Performs the actual metric insert, and aggregates over all bucket times.
    +     *
    +     * @param metric  Metric to store
    +     * @throws MetricException  if database write fails
    +     */
    +    private void processInsert(Metric metric) throws MetricException {
    +
    +        // convert all strings to numeric Ids for the metric key and add to the metadata cache
    +        long metricTimestamp = metric.getTimestamp();
    +        Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), metricTimestamp);
    +        Integer metricId = storeMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), metricTimestamp);
    +        Integer componentId = storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), metricTimestamp);
    +        Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), metricTimestamp);
    +        Integer hostId = storeMetadataString(KeyType.HOST_STRING, metric.getHostname(), metricTimestamp);
    +        Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), metricTimestamp);
    +
    +        RocksDbKey key = RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, metric.getTimestamp(), metricId,
    +                componentId, executorId, hostId, metric.getPort(), streamId);
    +
    +        // save metric key/value to be batched
    +        RocksDbValue value = new RocksDbValue(metric);
    +        insertBatch.put(key, value);
    +
    +        // Aggregate matching metrics over bucket timeframes.
    +        // We'll process starting with the longest bucket.  If the metric for this does not exist, we don't have to
    +        // search for the remaining bucket metrics.
    +        ListIterator li = aggBuckets.listIterator(aggBuckets.size());
    +        boolean populate = true;
    +        while (li.hasPrevious()) {
    +            AggLevel bucket = (AggLevel)li.previous();
    +            Metric aggMetric = new Metric(metric);
    +            aggMetric.setAggLevel(bucket);
    +
    +            long msToBucket = 1000L * 60L * bucket.getValue();
    +            long roundedToBucket = msToBucket * (metric.getTimestamp() / msToBucket);
    +            aggMetric.setTimestamp(roundedToBucket);
    +
    +            RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, topologyId, aggMetric.getTimestamp(), metricId,
    +                    componentId, executorId, hostId, aggMetric.getPort(), streamId);
    +
    +            if (populate) {
    +                // retrieve any existing aggregation matching this one and update the values
    +                if (store.populateFromKey(aggKey, aggMetric)) {
    +                    aggMetric.addValue(metric.getValue());
    +                } else {
    +                    // aggregating metric did not exist, don't look for further ones with smaller timestamps
    +                    populate = false;
    +                }
    +            }
    +
    +            // save metric key/value to be batched
    +            RocksDbValue aggVal = new RocksDbValue(aggMetric);
    +            insertBatch.put(aggKey, aggVal);
    +        }
    +
    +        processBatchInsert(insertBatch);
    +
    +        insertBatch.clear();
    +    }
    +
    +    // converts a metadata string into a unique integer.  Updates the timestamp of the string
    +    // so we can track when it was last used for later deletion on database cleanup.
    +    private int storeMetadataString(KeyType type, String s, long metricTimestamp) throws MetricException {
    +        if (s == null) {
    +            throw new MetricException("No string for metric metadata string type " + type);
    +        }
    +
    +        // attempt to find it in the string cache
    +        StringMetadata stringMetadata = stringMetadataCache.get(s);
    +        if (stringMetadata != null) {
    +            // make sure the timestamp on the metadata has the latest time
    +            stringMetadata.update(metricTimestamp, type);
    +            return stringMetadata.getStringId();
    +        }
    +
    +        // attempt to find the string in the database
    +        stringMetadata = store.rocksDbGetStringMetadata(type, s);
    +        if (stringMetadata != null) {
    +            // update to the latest timestamp and add to the string cache
    +            stringMetadata.update(metricTimestamp, type);
    +            stringMetadataCache.put(s, stringMetadata, false);
    +            return stringMetadata.getStringId();
    +        }
    +
    +        // string does not exist, create using an unique string id and add to cache
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug(type + "." + s + " does not exist in cache or database");
    +        }
    +        int stringId = getUniqueMetadataStringId();
    +        stringMetadata = new StringMetadata(type, stringId, metricTimestamp);
    +        stringMetadataCache.put(s, stringMetadata, true);
    +
    +        return stringMetadata.getStringId();
    +    }
    +
    +    // get a currently unused unique string id
    +    private int getUniqueMetadataStringId() throws MetricException {
    +        generateUniqueStringIds();
    +        int id = unusedIds.iterator().next();
    +        unusedIds.remove(id);
    +        return  id;
    +    }
    +
    +    // guarantees a list of unused string Ids exists.  Once the list is empty, creates a new list
    +    // by generating a list of random numbers and removing the ones that already are in use.
    +    private void generateUniqueStringIds() throws MetricException {
    +        int attempts = 0;
    +        while (unusedIds.isEmpty()) {
    +            attempts++;
    +            if (attempts > 100) {
    +                String message = "Failed to generate unique ids";
    +                LOG.error(message);
    +                throw new MetricException(message);
    +            }
    +            for (int i = 0; i < 600; i++) {
    +                int n = ThreadLocalRandom.current().nextInt();
    +                if (n == RocksDbStore.INVALID_METADATA_STRING_ID) {
    +                    continue;
    +                }
    +                // remove any entries in the cache
    +                if (stringMetadataCache.contains(n)) {
    +                    continue;
    +                }
    +                unusedIds.add(n);
    +            }
    +            // now scan all metadata and remove any matching string Ids from this list
    +            RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
    +            RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
    +            store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
    +                unusedIds.remove(key.getMetadataStringId());
    +                return true; // process all metadata
    +            });
    +        }
    +    }
    +
    +    // writes multiple metric values into the database as a batch operation
    +    private void processBatchInsert(Map<RocksDbKey, RocksDbValue> batchMap) throws MetricException {
    +        try (WriteBatch writeBatch = new WriteBatch()) {
    --- End diff --
    
    The reason the insertBatch map is being used to is force sort the keys.  RocksDB indicated sorting would be faster for doing an insert.  The comment on line 54 was trying to imply this information.  Let me know if I can be clearer.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160240315
  
    --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -291,12 +291,22 @@ public static long bitXor(Long a, Long b) {
          * runtime to avoid any zombie process in case cleanup function hangs.
          */
         public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
    +        addShutdownHookWithDelayedForceKill(func, 1);
    +    }
    +
    +    /**
    +     * Adds the user supplied function as a shutdown hook for cleanup.
    +     * Also adds a function that sleeps for numSecs and then halts the
    +     * runtime to avoid any zombie process in case cleanup function hangs.
    +     */
    +    public static void addShutdownHookWithDelayedForceKill (Runnable func, int numSecs) {
             Runnable sleepKill = new Runnable() {
                 @Override
                 public void run() {
                     try {
    -                    Time.sleepSecs(1);
    -                    LOG.warn("Forceing Halt...");
    +                    LOG.info("Halting after " + numSecs + " seconds");
    --- End diff --
    
    Nit could we use the slf4j logging format like
    
    ```
    LOG.info("Halting after {} second", numSeconds);
    ```


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160241598
  
    --- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
    @@ -1023,6 +1023,43 @@
         public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS =
             "storm.supervisor.medium.memory.grace.period.ms";
     
    +    /**
    +     * Class implementing MetricStore.
    +     */
    +    @NotNull
    +    @isString
    +    public static final String STORM_METRIC_STORE_CLASS = "storm.metricstore.class";
    +
    +    /**
    +     * RocksDB file location.
    +     */
    +    @isString
    +    public static final String STORM_ROCKSDB_LOCATION = "storm.metricstore.rocksdb.location";
    +
    +    /**
    +     * RocksDB create if missing flag.
    +     */
    +    @isBoolean
    +    public static final String STORM_ROCKSDB_CREATE_IF_MISSING = "storm.metricstore.rocksdb.create_if_missing";
    +
    +    /**
    +     * RocksDB metadata cache capacity.
    +     */
    +    @isInteger
    +    public static final String STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY = "storm.metricstore.rocksdb.metadata_string_cache_capacity";
    +
    +    /**
    +     * RocksDB setting for length of metric retention.
    +     */
    +    @isInteger
    +    public static final String STORM_ROCKSDB_METRIC_RETENTION_HOURS = "storm.metricstore.rocksdb.retention_hours";
    +
    +    /**
    +     * RocksDB setting for period of metric deletion thread.
    +     */
    +    @isInteger
    +    public static final String STORM_ROCKSDB_METRIC_RETENTION_PERIOD_HOURS = "storm.metricstore.rocksdb.retention_period_hours";
    --- End diff --
    
    Could we add in deletion to the name of the config somewhere?  I am not sure that it is that clear what the config does from just the name of it.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162768100
  
    --- Diff: storm-server/pom.xml ---
    @@ -64,6 +64,10 @@
                 <artifactId>auto-service</artifactId>
                 <optional>true</optional>
             </dependency>
    +        <dependency>
    +            <groupId>org.rocksdb</groupId>
    +            <artifactId>rocksdbjni</artifactId>
    --- End diff --
    
    Good. We are good to go then.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160243540
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/MetricStore.java ---
    @@ -0,0 +1,76 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.Map;
    +
    +public interface MetricStore {
    --- End diff --
    
    Can we make this extend AutoCloseable?


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162323436
  
    --- Diff: storm-server/pom.xml ---
    @@ -64,6 +64,10 @@
                 <artifactId>auto-service</artifactId>
                 <optional>true</optional>
             </dependency>
    +        <dependency>
    +            <groupId>org.rocksdb</groupId>
    +            <artifactId>rocksdbjni</artifactId>
    --- End diff --
    
    Just curious: I don't have experience with rocksdb and since it leverages JNI, I would like to be clear about how it works with multiple OS.


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    @HeartSaVioR not a problem.  I often forget that others are not able to read my thoughts and I sometimes forget to mention the context.
    
    The code was not really a prototype.  We intend it to be production ready, but before pushing anything large like this in we want to harden it with some stress tests.  That was something that we had not done yet.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160245890
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java ---
    @@ -0,0 +1,636 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +import java.io.File;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.FilterOptions;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.apache.storm.metricstore.MetricStore;
    +import org.apache.storm.utils.ObjectReader;
    +import org.rocksdb.BlockBasedTableConfig;
    +import org.rocksdb.IndexType;
    +import org.rocksdb.Options;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class RocksDbStore implements MetricStore, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
    +    private static final int MAX_QUEUE_CAPACITY = 4000;
    +    static final int INVALID_METADATA_STRING_ID = 0;
    +    RocksDB db;
    +    private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
    +    private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
    +    private RocksDbMetricsWriter metricsWriter = null;
    +    private MetricsCleaner metricsCleaner = null;
    +    private Meter failureMeter = null;
    +
    +    interface RocksDbScanCallback {
    +        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop scan
    +    }
    +
    +    /**
    +     * Create metric store instance using the configurations provided via the config map.
    +     *
    +     * @param config Storm config map
    +     * @throws MetricException on preparation error
    +     */
    +    public void prepare(Map config) throws MetricException {
    +        validateConfig(config);
    +
    +        this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
    +
    +        RocksDB.loadLibrary();
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +
    +        try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
    +            // use the hash index for prefix searches
    +            BlockBasedTableConfig tfc = new BlockBasedTableConfig();
    +            tfc.setIndexType(IndexType.kHashSearch);
    +            options.setTableFormatConfig(tfc);
    +            options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
    +
    +            String path = getRocksDbAbsoluteDir(config);
    +            LOG.info("Opening RocksDB from " + path);
    +            db = RocksDB.open(options, path);
    +        } catch (RocksDBException e) {
    +            String message = "Error opening RockDB database";
    +            LOG.error(message, e);
    +            throw new MetricException(message, e);
    +        }
    +
    +        // create thread to delete old metrics and metadata
    +        Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
    +        Integer retentionPeriod = 0;
    +        if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_PERIOD_HOURS)) {
    +            retentionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_PERIOD_HOURS).toString());
    +        }
    +        metricsCleaner = new MetricsCleaner(this, retentionHours, retentionPeriod, failureMeter);
    +
    +        // create thread to process insertion of all metrics
    +        metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
    +
    +        int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
    +        StringMetadataCache.init(metricsWriter, cacheCapacity);
    +        readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
    +        metricsWriter.init(); // init the writer once the cache is setup
    +
    +        // start threads after metadata cache created
    +        Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
    +        thread.start();
    +        thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
    +        thread.start();
    --- End diff --
    
    Should we mark these threads as daemons?  If not we should make sure that they shut down when we close the Store.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160776702
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -1081,6 +1088,14 @@ public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stor
                 BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper)
             throws Exception {
             this.conf = conf;
    +
    +        this.metricsStore = null;
    --- End diff --
    
    Re-verified that a null MetricStore should be safe.


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    Updated the PR JIRA to reflect the discussion and new sub-task JIRA. 


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    @agresch 
    Could you push your documentation to this branch? I could see and try out from my side.


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    I think this patch (or following up patch, at least) should address workers' metrics as well. Is it just waiting for Metrics V2? (#2203) Or to avoid backward incompatibility between old workers?


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    I left the questions because this PR looks like addressing only part of STORM-2156.
    
    The summary of issue STORM-2156 is:
    
    ```
    Add RocksDB instance in Nimbus and write heartbeat based metrics to it
    ```
    
    and description is:
    
    ```
    There should be a RocksDB instance in Nimbus where we write metrics from the heartbeats. This should allow us to replace storage for the statistics we see in the UI and expand the abilities of UIs to allow for time series charting.
    Eventually this data will likely come via thrift to Nimbus as the overall metric system is overhauled.
    ```
    
    (Even STORM-2156 should have follow up issue for representing RocksDB metrics to UI.)
    
    In order to replace the metrics data in ZK heartbeat, it should be mandatory to address worker metrics (to supervisor) to nimbus. I don't mind it would be based on STORM-2693 which transfers worker metrics into Nimbus (addressing Metrics V1), or it would be following up patch for STORM-2153 (Metrics V2) being implementation of metrics collector which communicates to Nimbus. (For latter we should migrate most of built-in metrics to Metrics V2 so that it can be available.)
    
    Actually it only addresses metrics transfer from supervisor to nimbus which doesn't bring origin intention and benefits of issue. I guess you have follow up issues or even patches but they're opaque for me now so I don't see much benefit of this. Please also file follow up issues and group issues together (design doc explaining overall plan would be great) so that we can imagine what will be changed and how the changes improve Storm.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162322132
  
    --- Diff: storm-client/src/storm.thrift ---
    @@ -836,3 +837,24 @@ exception HBAuthorizationException {
     exception HBExecutionException {
       1: required string msg;
     }
    +
    +struct WorkerMetricFields {
    --- End diff --
    
    Looks like WorkerMetricFields and WorkerMetricList, WorkerMetrics are used "before", forward reference.
    
    We encountered the issue because of forward reference of thrift definition. https://issues.apache.org/jira/browse/STORM-1842
    
    Could you rearrange the order so that all the structs are defined earlier before using them?


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160242680
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -4166,4 +4184,23 @@ public void shutdown() {
         public boolean isWaiting() {
             return timer.isTimerWaiting();
         }
    +
    +    @Override
    +    public void processWorkerMetrics(WorkerMetrics metrics) throws org.apache.thrift.TException {
    +        if (this.metricsStore == null) {
    --- End diff --
    
    It would also be nice to have a metric for the number of times that this method was called, like with the other RPC metrics we have.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162319081
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---
    @@ -0,0 +1,306 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from the a blocking queue.
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + */
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
    --- End diff --
    
    Maybe better to describe the intention, especially regarding thread-safety.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160245178
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---
    @@ -0,0 +1,306 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from the a blocking queue.
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + */
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
    +    private WriteOptions writeOpts = new WriteOptions();
    +    private volatile boolean shutdown = false;
    +    private Meter failureMeter;
    +    private ArrayList<AggLevel> aggBuckets = new ArrayList<>();
    +
    +    /**
    +     * Constructor for the RocksDbMetricsWriter.
    +     *
    +     * @param store   The RocksDB store
    +     * @param queue   The queue to receive metrics for insertion
    +     */
    +    RocksDbMetricsWriter(RocksDbStore store, BlockingQueue queue, Meter failureMeter)  {
    +        this.store = store;
    +        this.queue = queue;
    +        this.failureMeter = failureMeter;
    +
    +        aggBuckets.add(AggLevel.AGG_LEVEL_1_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_10_MIN);
    +        aggBuckets.add(AggLevel.AGG_LEVEL_60_MIN);
    +    }
    +
    +    /**
    +     * Init routine called once the Metadata cache has been created.
    +     *
    +     * @throws MetricException  on cache error
    +     */
    +    void init() throws MetricException {
    +        this.stringMetadataCache = StringMetadataCache.getWritableStringMetadataCache();
    +    }
    +
    +    /**
    +     * Run routine to wait for metrics on a queue and insert into RocksDB.
    +     */
    +    @Override
    +    public void run() {
    +        while (true) {
    +            if (shutdown) {
    +                return;
    +            }
    +            try {
    +                Metric m = (Metric) queue.take();
    +                processInsert(m);
    +            } catch (Exception e) {
    +                LOG.error("Failed to insert metric", e);
    +                if (this.failureMeter != null) {
    +                    this.failureMeter.mark();
    +                }
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Performs the actual metric insert, and aggregates over all bucket times.
    +     *
    +     * @param metric  Metric to store
    +     * @throws MetricException  if database write fails
    +     */
    +    private void processInsert(Metric metric) throws MetricException {
    +
    +        // convert all strings to numeric Ids for the metric key and add to the metadata cache
    +        long metricTimestamp = metric.getTimestamp();
    +        Integer topologyId = storeMetadataString(KeyType.TOPOLOGY_STRING, metric.getTopologyId(), metricTimestamp);
    +        Integer metricId = storeMetadataString(KeyType.METRIC_STRING, metric.getMetricName(), metricTimestamp);
    +        Integer componentId = storeMetadataString(KeyType.COMPONENT_STRING, metric.getComponentId(), metricTimestamp);
    +        Integer executorId = storeMetadataString(KeyType.EXEC_ID_STRING, metric.getExecutorId(), metricTimestamp);
    +        Integer hostId = storeMetadataString(KeyType.HOST_STRING, metric.getHostname(), metricTimestamp);
    +        Integer streamId = storeMetadataString(KeyType.STREAM_ID_STRING, metric.getStreamId(), metricTimestamp);
    +
    +        RocksDbKey key = RocksDbKey.createMetricKey(AggLevel.AGG_LEVEL_NONE, topologyId, metric.getTimestamp(), metricId,
    +                componentId, executorId, hostId, metric.getPort(), streamId);
    +
    +        // save metric key/value to be batched
    +        RocksDbValue value = new RocksDbValue(metric);
    +        insertBatch.put(key, value);
    +
    +        // Aggregate matching metrics over bucket timeframes.
    +        // We'll process starting with the longest bucket.  If the metric for this does not exist, we don't have to
    +        // search for the remaining bucket metrics.
    +        ListIterator li = aggBuckets.listIterator(aggBuckets.size());
    +        boolean populate = true;
    +        while (li.hasPrevious()) {
    +            AggLevel bucket = (AggLevel)li.previous();
    +            Metric aggMetric = new Metric(metric);
    +            aggMetric.setAggLevel(bucket);
    +
    +            long msToBucket = 1000L * 60L * bucket.getValue();
    +            long roundedToBucket = msToBucket * (metric.getTimestamp() / msToBucket);
    +            aggMetric.setTimestamp(roundedToBucket);
    +
    +            RocksDbKey aggKey = RocksDbKey.createMetricKey(bucket, topologyId, aggMetric.getTimestamp(), metricId,
    +                    componentId, executorId, hostId, aggMetric.getPort(), streamId);
    +
    +            if (populate) {
    +                // retrieve any existing aggregation matching this one and update the values
    +                if (store.populateFromKey(aggKey, aggMetric)) {
    +                    aggMetric.addValue(metric.getValue());
    +                } else {
    +                    // aggregating metric did not exist, don't look for further ones with smaller timestamps
    +                    populate = false;
    +                }
    +            }
    +
    +            // save metric key/value to be batched
    +            RocksDbValue aggVal = new RocksDbValue(aggMetric);
    +            insertBatch.put(aggKey, aggVal);
    +        }
    +
    +        processBatchInsert(insertBatch);
    +
    +        insertBatch.clear();
    +    }
    +
    +    // converts a metadata string into a unique integer.  Updates the timestamp of the string
    +    // so we can track when it was last used for later deletion on database cleanup.
    +    private int storeMetadataString(KeyType type, String s, long metricTimestamp) throws MetricException {
    +        if (s == null) {
    +            throw new MetricException("No string for metric metadata string type " + type);
    +        }
    +
    +        // attempt to find it in the string cache
    +        StringMetadata stringMetadata = stringMetadataCache.get(s);
    +        if (stringMetadata != null) {
    +            // make sure the timestamp on the metadata has the latest time
    +            stringMetadata.update(metricTimestamp, type);
    +            return stringMetadata.getStringId();
    +        }
    +
    +        // attempt to find the string in the database
    +        stringMetadata = store.rocksDbGetStringMetadata(type, s);
    +        if (stringMetadata != null) {
    +            // update to the latest timestamp and add to the string cache
    +            stringMetadata.update(metricTimestamp, type);
    +            stringMetadataCache.put(s, stringMetadata, false);
    +            return stringMetadata.getStringId();
    +        }
    +
    +        // string does not exist, create using an unique string id and add to cache
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug(type + "." + s + " does not exist in cache or database");
    +        }
    +        int stringId = getUniqueMetadataStringId();
    +        stringMetadata = new StringMetadata(type, stringId, metricTimestamp);
    +        stringMetadataCache.put(s, stringMetadata, true);
    +
    +        return stringMetadata.getStringId();
    +    }
    +
    +    // get a currently unused unique string id
    +    private int getUniqueMetadataStringId() throws MetricException {
    +        generateUniqueStringIds();
    +        int id = unusedIds.iterator().next();
    +        unusedIds.remove(id);
    +        return  id;
    +    }
    +
    +    // guarantees a list of unused string Ids exists.  Once the list is empty, creates a new list
    +    // by generating a list of random numbers and removing the ones that already are in use.
    +    private void generateUniqueStringIds() throws MetricException {
    +        int attempts = 0;
    +        while (unusedIds.isEmpty()) {
    +            attempts++;
    +            if (attempts > 100) {
    +                String message = "Failed to generate unique ids";
    +                LOG.error(message);
    +                throw new MetricException(message);
    +            }
    +            for (int i = 0; i < 600; i++) {
    +                int n = ThreadLocalRandom.current().nextInt();
    +                if (n == RocksDbStore.INVALID_METADATA_STRING_ID) {
    +                    continue;
    +                }
    +                // remove any entries in the cache
    +                if (stringMetadataCache.contains(n)) {
    +                    continue;
    +                }
    +                unusedIds.add(n);
    +            }
    +            // now scan all metadata and remove any matching string Ids from this list
    +            RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
    +            RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
    +            store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
    +                unusedIds.remove(key.getMetadataStringId());
    +                return true; // process all metadata
    +            });
    +        }
    +    }
    +
    +    // writes multiple metric values into the database as a batch operation
    +    private void processBatchInsert(Map<RocksDbKey, RocksDbValue> batchMap) throws MetricException {
    +        try (WriteBatch writeBatch = new WriteBatch()) {
    --- End diff --
    
    Because of the thread safety problems with the map, it might be simpler to use the WriteBatch object directly instead of trying to use the map first.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160797290
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
    @@ -936,6 +936,9 @@ static DynamicState handleRunning(DynamicState dynamicState, StaticState staticS
                 }
                 dynamicState = dynamicState.withProfileActions(mod, modPending);
             }
    +
    +        dynamicState.container.processMetrics();
    --- End diff --
    
    This is something I just noticed.  `handleRunning` should be called about once a second.  I don't think we want to send metrics that frequently.  Most of the metrics are on a 30 second to 1 min frequency.  I think standardizing on 1 min for us is going to make storing and querying them a lot simpler.  Could we have a small amount of state that we can use to throttle this to once a min?


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162668430
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java ---
    @@ -0,0 +1,639 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +import java.io.File;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.FilterOptions;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.apache.storm.metricstore.MetricStore;
    +import org.apache.storm.utils.ObjectReader;
    +import org.rocksdb.BlockBasedTableConfig;
    +import org.rocksdb.IndexType;
    +import org.rocksdb.Options;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class RocksDbStore implements MetricStore, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
    +    private static final int MAX_QUEUE_CAPACITY = 4000;
    +    static final int INVALID_METADATA_STRING_ID = 0;
    +    RocksDB db;
    +    private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
    +    private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
    +    private RocksDbMetricsWriter metricsWriter = null;
    +    private MetricsCleaner metricsCleaner = null;
    +    private Meter failureMeter = null;
    +
    +    interface RocksDbScanCallback {
    +        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop scan
    +    }
    +
    +    /**
    +     * Create metric store instance using the configurations provided via the config map.
    +     *
    +     * @param config Storm config map
    +     * @throws MetricException on preparation error
    +     */
    +    public void prepare(Map config) throws MetricException {
    +        validateConfig(config);
    +
    +        this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
    +
    +        RocksDB.loadLibrary();
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +
    +        try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
    +            // use the hash index for prefix searches
    +            BlockBasedTableConfig tfc = new BlockBasedTableConfig();
    +            tfc.setIndexType(IndexType.kHashSearch);
    +            options.setTableFormatConfig(tfc);
    +            options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
    +
    +            String path = getRocksDbAbsoluteDir(config);
    +            LOG.info("Opening RocksDB from {}", path);
    +            db = RocksDB.open(options, path);
    +        } catch (RocksDBException e) {
    +            String message = "Error opening RockDB database";
    +            LOG.error(message, e);
    +            throw new MetricException(message, e);
    +        }
    +
    +        // create thread to delete old metrics and metadata
    +        Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
    +        Integer deletionPeriod = 0;
    +        if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
    +            deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
    +        }
    +        metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter);
    +
    +        // create thread to process insertion of all metrics
    +        metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
    +
    +        int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
    +        StringMetadataCache.init(metricsWriter, cacheCapacity);
    +        readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
    +        metricsWriter.init(); // init the writer once the cache is setup
    +
    +        // start threads after metadata cache created
    +        Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
    +        thread.setDaemon(true);
    +        thread.start();
    +
    +        thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
    +        thread.setDaemon(true);
    +        thread.start();
    +    }
    +
    +    /**
    +     * Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store.
    +     *
    +     * @param config Storm config to specify which store type, location of store and creation policy
    +     * @throws MetricException if there is a missing required configuration or if the store does not exist but
    +     *                         the config specifies not to create the store
    +     */
    +    private void validateConfig(Map config) throws MetricException {
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy "
    +                    + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
    +        }
    +
    +        // validate path defined
    +        String storePath = getRocksDbAbsoluteDir(config);
    +
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +        if (!createIfMissing) {
    +            if (!(new File(storePath).exists())) {
    +                throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath);
    +            }
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size "
    +                    + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
    +        }
    +
    +        if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metric retention "
    +                    + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
    +        }
    +    }
    +
    +    private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
    +        String storePath = (String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        if (storePath == null) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        } else {
    +            if (new File(storePath).isAbsolute()) {
    +                return storePath;
    +            } else {
    +                String stormHome = System.getProperty("storm.home");
    --- End diff --
    
    I did not find one.  Can you point out which constant I should use (or where to create one)?


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162634637
  
    --- Diff: storm-server/pom.xml ---
    @@ -64,6 +64,10 @@
                 <artifactId>auto-service</artifactId>
                 <optional>true</optional>
             </dependency>
    +        <dependency>
    +            <groupId>org.rocksdb</groupId>
    +            <artifactId>rocksdbjni</artifactId>
    --- End diff --
    
    I tested on a mac and a RHEL vm.  The rocksDB jar also contains a win64 jni dll.  If an error is thrown creating the metrics store, everything should be treated as a noop.   


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160241080
  
    --- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
    @@ -1023,6 +1023,43 @@
         public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS =
             "storm.supervisor.medium.memory.grace.period.ms";
     
    +    /**
    +     * Class implementing MetricStore.
    +     */
    +    @NotNull
    +    @isString
    +    public static final String STORM_METRIC_STORE_CLASS = "storm.metricstore.class";
    +
    +    /**
    +     * RocksDB file location.
    --- End diff --
    
    Could we expand the javadocs to explain that this is only for the `org.apache.storm.metricstore.rocksdb.RocksDbStore`, and that would be good for all of the newly added javadocs that are specific to the RocksDBMetaStore.
      


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162323041
  
    --- Diff: storm-client/src/storm.thrift ---
    @@ -836,3 +837,24 @@ exception HBAuthorizationException {
     exception HBExecutionException {
       1: required string msg;
     }
    +
    +struct WorkerMetricFields {
    --- End diff --
    
    minor: `WorkerMetricPoint` might be clearer than `WorkerMetricFields`.


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160240938
  
    --- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
    @@ -1023,6 +1023,43 @@
         public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD_MS =
             "storm.supervisor.medium.memory.grace.period.ms";
     
    +    /**
    +     * Class implementing MetricStore.
    +     */
    +    @NotNull
    +    @isString
    --- End diff --
    
    We have an `@isImplementationOfClass` annotation that I think would be more appropriate for this config.


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    Further metrics can be added to the database as desired.  The intent of this patch was to get the database functional.  Given the patch size and ongoing metrics work, I would suggest adding more metrics as a follow on JIRA.


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    I'll put up a commit for the remaining issues (or comment if I have further questions).


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160242577
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -4166,4 +4184,23 @@ public void shutdown() {
         public boolean isWaiting() {
             return timer.isTimerWaiting();
         }
    +
    +    @Override
    +    public void processWorkerMetrics(WorkerMetrics metrics) throws org.apache.thrift.TException {
    +        if (this.metricsStore == null) {
    --- End diff --
    
    Can we add in some authorization calls before we do anything with executing the command?
    
    ```
    checkAuthorization(null, null, "processWorkerMetrics");
    ```
    
    Then we would need to update 
    
    https://github.com/apache/storm/blob/7ecb3d73e8e909c01d39e03a7a7ed45a2fb81859/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java#L52
    
    to have processWorkerMetrics in the list.



---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162320163
  
    --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/authorizer/SimpleACLAuthorizer.java ---
    @@ -35,6 +35,8 @@
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +
    --- End diff --
    
    nit: unnecessary two empty lines


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    Will look into these issues.  


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160241948
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -1081,6 +1088,14 @@ public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stor
                 BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper)
             throws Exception {
             this.conf = conf;
    +
    +        this.metricsStore = null;
    --- End diff --
    
    Can we verify that if the metricsStore is null that we don't get any NullPointerExceptions in really bad places.


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    @HeartSaVioR sorry about that I think we probably were not clear enough on this, what is covered and what is not currently covered by this pull request.
    
    Reading through the description of STORM-2156 you are 100% right that this does not cover everything there. I will file a new subtask for STORM-2156 and we will update this pull request to be under it.  I am sorry about the confusion.  Thanks for calling us out on this.
    
    For those who care here is some more history about this patch:
    
    Originally this work was based off of the metrics v2 patch and was done by @abellina and @lavindev as an intern here.  But it ended up being a very large patch and it looked like it would take a very long time to go in.  We would have to wait for metrics v2 to go in, then get this reviewed and in along with the glue code that is not here, and then have both of them ported to master.
    
    To try and speed it up I asked @agresch to take out just the rocksdb metrics storage piece and a small number of metrics that don't require the v2 patch, put it on master, make sure it is solid and see if we can get that in by itself.  I thought this would be great because it would at least provide the minimum metrics needed to start looking at elasticity.
    
    The prototype that @lavindev and @abellina includes all of the integration with metrics v2, the UI updated to show both sets of metrics side by side so we could judge how close they were to each other, a real time metrics graphing proof-of-concept on the UI, and a plugin to store the metrics in HBase instead of rocksdb.  So we have code to cover all of STORM-2156 and more.  We are just trying to optimize how quickly we can get it in so we can hopefully do a 2.x release sometime this quarter.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162349715
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---
    @@ -0,0 +1,306 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from the a blocking queue.
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + */
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
    --- End diff --
    
    I tried to do this on line 43.  I will expand that description and can add thoughts on future multithreading design.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162319898
  
    --- Diff: pom.xml ---
    @@ -324,6 +324,7 @@
             <jool.version>0.9.12</jool.version>
             <caffeine.version>2.3.5</caffeine.version>
             <jaxb-version>2.3.0</jaxb-version>
    +        <rocksdb-version>5.8.6</rocksdb-version>
    --- End diff --
    
    Just curious: is there some reason to pick 5.8.6?


---

[GitHub] storm pull request #2504: STORM-2156: store metrics into RocksDB

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160244314
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java ---
    @@ -0,0 +1,306 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.ListIterator;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.ThreadLocalRandom;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.rocksdb.FlushOptions;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Class designed to perform all metrics inserts into RocksDB.  Metrics are processed from the a blocking queue.
    + * </P>
    + * A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids.  As entries are added to the full cache, older
    + * entries are evicted from the cache and need to be written to the database.  This happens as the handleEvictedMetadata()
    + * method callback.
    + */
    +public class RocksDbMetricsWriter implements Runnable, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbMetricsWriter.class);
    +    private RocksDbStore store;
    +    private BlockingQueue queue;
    +    private WritableStringMetadataCache stringMetadataCache;
    +    private Set<Integer> unusedIds = new HashSet<>();
    +    private TreeMap<RocksDbKey, RocksDbValue> insertBatch = new TreeMap<>(); // RocksDB should insert in sorted key order
    --- End diff --
    
    I don't think this is being used in a thread safe way.
      


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    @revans2 Great news! Looking forward to new news about that.


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    @revans2 Thanks a lot for sharing current status. Actually I have been aware of the patch going through @abellina forked repo, so I expect HBase plugin and UI patch would be adopted internally, and I also expect most of metrics would be integrated with new system. I thought only metrics V2 is a blocker for your team to bring such features to Apache side, and that’s why I really would want to integrate metrics V2 sooner. I was not aware that they were prototypes instead of production ready, hence also thought it can be included to Storm 2.x sooner (2.0.0 ideally).
    If this patch itself is for baseline of other features like elasticity (its plan is also not transparent and I am not aware of that) I think the patch is welcome to get reviewed independently and merged in. I wasn’t just not clear what benefits the patch will provide.


---

[GitHub] storm issue #2504: STORM-2156: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    Before looking into the patch in detail, I'd like to see which functionalities the patch would want to bring.
    
    1. As far as I read from MetricStore, it looks like providing an aggregated point from time-range query. Do I understand correctly? Because what I've expected is like a time-series one, and it would replace metrics consumers and eventually provide time-series representation. I expect it can still replace metrics consumers for the store side btw.
    
    2. I guess I know the answer (at least partially) and eventual following-up patch would be storing metrics into external storage (like HBase), but just to double check: how it will behave when leader Nimbus goes down and one of standby Nimbus promotes to leader? Will metrics stored previously be unavailable? If Nimbus gets leadership again, how Nimbus shows the gap while it didn't receive the metrics? (especially aggregated values) Do we want to apply interpolation, or just treat it as no metrics (hence 0 for sum and None for avg)?


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162768064
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java ---
    @@ -0,0 +1,639 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +import java.io.File;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.FilterOptions;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.apache.storm.metricstore.MetricStore;
    +import org.apache.storm.utils.ObjectReader;
    +import org.rocksdb.BlockBasedTableConfig;
    +import org.rocksdb.IndexType;
    +import org.rocksdb.Options;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class RocksDbStore implements MetricStore, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
    +    private static final int MAX_QUEUE_CAPACITY = 4000;
    +    static final int INVALID_METADATA_STRING_ID = 0;
    +    RocksDB db;
    +    private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
    +    private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
    +    private RocksDbMetricsWriter metricsWriter = null;
    +    private MetricsCleaner metricsCleaner = null;
    +    private Meter failureMeter = null;
    +
    +    interface RocksDbScanCallback {
    +        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop scan
    +    }
    +
    +    /**
    +     * Create metric store instance using the configurations provided via the config map.
    +     *
    +     * @param config Storm config map
    +     * @throws MetricException on preparation error
    +     */
    +    public void prepare(Map config) throws MetricException {
    +        validateConfig(config);
    +
    +        this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
    +
    +        RocksDB.loadLibrary();
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +
    +        try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
    +            // use the hash index for prefix searches
    +            BlockBasedTableConfig tfc = new BlockBasedTableConfig();
    +            tfc.setIndexType(IndexType.kHashSearch);
    +            options.setTableFormatConfig(tfc);
    +            options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
    +
    +            String path = getRocksDbAbsoluteDir(config);
    +            LOG.info("Opening RocksDB from {}", path);
    +            db = RocksDB.open(options, path);
    +        } catch (RocksDBException e) {
    +            String message = "Error opening RockDB database";
    +            LOG.error(message, e);
    +            throw new MetricException(message, e);
    +        }
    +
    +        // create thread to delete old metrics and metadata
    +        Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
    +        Integer deletionPeriod = 0;
    +        if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
    +            deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
    +        }
    +        metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter);
    +
    +        // create thread to process insertion of all metrics
    +        metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
    +
    +        int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
    +        StringMetadataCache.init(metricsWriter, cacheCapacity);
    +        readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
    +        metricsWriter.init(); // init the writer once the cache is setup
    +
    +        // start threads after metadata cache created
    +        Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
    +        thread.setDaemon(true);
    +        thread.start();
    +
    +        thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
    +        thread.setDaemon(true);
    +        thread.start();
    +    }
    +
    +    /**
    +     * Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store.
    +     *
    +     * @param config Storm config to specify which store type, location of store and creation policy
    +     * @throws MetricException if there is a missing required configuration or if the store does not exist but
    +     *                         the config specifies not to create the store
    +     */
    +    private void validateConfig(Map config) throws MetricException {
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy "
    +                    + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
    +        }
    +
    +        // validate path defined
    +        String storePath = getRocksDbAbsoluteDir(config);
    +
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +        if (!createIfMissing) {
    +            if (!(new File(storePath).exists())) {
    +                throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath);
    +            }
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size "
    +                    + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
    +        }
    +
    +        if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metric retention "
    +                    + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
    +        }
    +    }
    +
    +    private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
    +        String storePath = (String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        if (storePath == null) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        } else {
    +            if (new File(storePath).isAbsolute()) {
    +                return storePath;
    +            } else {
    +                String stormHome = System.getProperty("storm.home");
    --- End diff --
    
    Yeah my bad. We didn't have one. Let's leave it as it is since it's a nit.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162505874
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
    @@ -693,4 +701,44 @@ public long getMemoryReservationMb() {
         public String getWorkerId() {
             return _workerId;
         }
    +
    +    /**
    +     * Send worker metrics to Nimbus.
    +     */
    +    void processMetrics() {
    +        try {
    +            if (_usedMemory.get(_port) != null) {
    +                // Make sure we don't process too frequently.
    +                long nextMetricProcessTime = this.lastMetricProcessTime + 60L * 1000L;
    +                long currentTimeMsec = System.currentTimeMillis();
    +                if (currentTimeMsec < nextMetricProcessTime) {
    +                    return;
    +                }
    +
    +                String hostname = Utils.hostname();
    +
    +                // create metric for memory
    +                String metricName = "UsedMemory";
    --- End diff --
    
    minor: maybe better to have constants for strings here, since they're likely to be reused while retrieving information.


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162506645
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/FilterOptions.java ---
    @@ -0,0 +1,159 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * FilterOptions provides a method to select various filtering options for doing a scan of the metrics database.
    + */
    +public class FilterOptions {
    +    private static final String componentId = "componentId";
    +    private static final String topologyId = "topologyId";
    +    private static final String startTime = "startTime";
    +    private static final String endTime = "endTime";
    +    private static final String metricName = "metricName";
    +    private static final String executorId = "executorId";
    +    private static final String hostId = "hostId";
    +    private static final String port = "port";
    +    private static final String streamId = "streamId";
    +    private Map<String, Object> options = new HashMap<>();
    --- End diff --
    
    Any reason to store options into map with unnecessary explicit type conversion instead of defining elements as fields?


---

[GitHub] storm issue #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on the issue:

    https://github.com/apache/storm/pull/2504
  
    This JIRA currently only addresses metrics on the Nimbus leader.  If Nimbus goes does down and switches to the standby, the options would be to sync the RocksDB database from one server to another, send metrics from the supervisors to both Nimbi (is that the correct pluralism?), or allow metrics to be dropped.  
    
    We plan further work to HBase as an option as well, which would limit the metric loss here.  
    
    I'd recommend dealing with Nimbus standby options in a future JIRA, but welcome discussion.
    
    



---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2504


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r162545064
  
    --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java ---
    @@ -0,0 +1,639 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.storm.metricstore.rocksdb;
    +
    +import com.codahale.metrics.Meter;
    +import java.io.File;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.metricstore.AggLevel;
    +import org.apache.storm.metricstore.FilterOptions;
    +import org.apache.storm.metricstore.Metric;
    +import org.apache.storm.metricstore.MetricException;
    +import org.apache.storm.metricstore.MetricStore;
    +import org.apache.storm.utils.ObjectReader;
    +import org.rocksdb.BlockBasedTableConfig;
    +import org.rocksdb.IndexType;
    +import org.rocksdb.Options;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +import org.rocksdb.RocksIterator;
    +import org.rocksdb.WriteBatch;
    +import org.rocksdb.WriteOptions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +
    +public class RocksDbStore implements MetricStore, AutoCloseable {
    +    private static final Logger LOG = LoggerFactory.getLogger(RocksDbStore.class);
    +    private static final int MAX_QUEUE_CAPACITY = 4000;
    +    static final int INVALID_METADATA_STRING_ID = 0;
    +    RocksDB db;
    +    private ReadOnlyStringMetadataCache readOnlyStringMetadataCache = null;
    +    private BlockingQueue queue = new LinkedBlockingQueue(MAX_QUEUE_CAPACITY);
    +    private RocksDbMetricsWriter metricsWriter = null;
    +    private MetricsCleaner metricsCleaner = null;
    +    private Meter failureMeter = null;
    +
    +    interface RocksDbScanCallback {
    +        boolean cb(RocksDbKey key, RocksDbValue val);  // return false to stop scan
    +    }
    +
    +    /**
    +     * Create metric store instance using the configurations provided via the config map.
    +     *
    +     * @param config Storm config map
    +     * @throws MetricException on preparation error
    +     */
    +    public void prepare(Map config) throws MetricException {
    +        validateConfig(config);
    +
    +        this.failureMeter = StormMetricsRegistry.registerMeter("RocksDB:metric-failures");
    +
    +        RocksDB.loadLibrary();
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +
    +        try (Options options = new Options().setCreateIfMissing(createIfMissing)) {
    +            // use the hash index for prefix searches
    +            BlockBasedTableConfig tfc = new BlockBasedTableConfig();
    +            tfc.setIndexType(IndexType.kHashSearch);
    +            options.setTableFormatConfig(tfc);
    +            options.useCappedPrefixExtractor(RocksDbKey.KEY_SIZE);
    +
    +            String path = getRocksDbAbsoluteDir(config);
    +            LOG.info("Opening RocksDB from {}", path);
    +            db = RocksDB.open(options, path);
    +        } catch (RocksDBException e) {
    +            String message = "Error opening RockDB database";
    +            LOG.error(message, e);
    +            throw new MetricException(message, e);
    +        }
    +
    +        // create thread to delete old metrics and metadata
    +        Integer retentionHours = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS).toString());
    +        Integer deletionPeriod = 0;
    +        if (config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS)) {
    +            deletionPeriod = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METRIC_DELETION_PERIOD_HOURS).toString());
    +        }
    +        metricsCleaner = new MetricsCleaner(this, retentionHours, deletionPeriod, failureMeter);
    +
    +        // create thread to process insertion of all metrics
    +        metricsWriter = new RocksDbMetricsWriter(this, this.queue, this.failureMeter);
    +
    +        int cacheCapacity = Integer.parseInt(config.get(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY).toString());
    +        StringMetadataCache.init(metricsWriter, cacheCapacity);
    +        readOnlyStringMetadataCache = StringMetadataCache.getReadOnlyStringMetadataCache();
    +        metricsWriter.init(); // init the writer once the cache is setup
    +
    +        // start threads after metadata cache created
    +        Thread thread = new Thread(metricsCleaner, "RocksDbMetricsCleaner");
    +        thread.setDaemon(true);
    +        thread.start();
    +
    +        thread = new Thread(metricsWriter, "RocksDbMetricsWriter");
    +        thread.setDaemon(true);
    +        thread.start();
    +    }
    +
    +    /**
    +     * Implements configuration validation of Metrics Store, validates storm configuration for Metrics Store.
    +     *
    +     * @param config Storm config to specify which store type, location of store and creation policy
    +     * @throws MetricException if there is a missing required configuration or if the store does not exist but
    +     *                         the config specifies not to create the store
    +     */
    +    private void validateConfig(Map config) throws MetricException {
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_LOCATION))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING))) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Does not specify creation policy "
    +                    + DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING);
    +        }
    +
    +        // validate path defined
    +        String storePath = getRocksDbAbsoluteDir(config);
    +
    +        boolean createIfMissing = ObjectReader.getBoolean(config.get(DaemonConfig.STORM_ROCKSDB_CREATE_IF_MISSING), false);
    +        if (!createIfMissing) {
    +            if (!(new File(storePath).exists())) {
    +                throw new MetricException("Configuration specifies not to create a store but no store currently exists at " + storePath);
    +            }
    +        }
    +
    +        if (!(config.containsKey(DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY))) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metadata string cache size "
    +                    + DaemonConfig.STORM_ROCKSDB_METADATA_STRING_CACHE_CAPACITY);
    +        }
    +
    +        if (!config.containsKey(DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS)) {
    +            throw new MetricException("Not a valid RocksDB configuration - Missing metric retention "
    +                    + DaemonConfig.STORM_ROCKSDB_METRIC_RETENTION_HOURS);
    +        }
    +    }
    +
    +    private String getRocksDbAbsoluteDir(Map conf) throws MetricException {
    +        String storePath = (String)conf.get(DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        if (storePath == null) {
    +            throw new MetricException("Not a vaild RocksDB configuration - Missing store location " + DaemonConfig.STORM_ROCKSDB_LOCATION);
    +        } else {
    +            if (new File(storePath).isAbsolute()) {
    +                return storePath;
    +            } else {
    +                String stormHome = System.getProperty("storm.home");
    --- End diff --
    
    I guess we already have constant for "storm.home".


---

[GitHub] storm pull request #2504: STORM-2887: store metrics into RocksDB

Posted by agresch <gi...@git.apache.org>.
Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2504#discussion_r160804460
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
    @@ -936,6 +936,9 @@ static DynamicState handleRunning(DynamicState dynamicState, StaticState staticS
                 }
                 dynamicState = dynamicState.withProfileActions(mod, modPending);
             }
    +
    +        dynamicState.container.processMetrics();
    --- End diff --
    
    Yes, will fix.


---