You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by satishd <gi...@git.apache.org> on 2016/06/13 05:26:04 UTC

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

GitHub user satishd opened a pull request:

    https://github.com/apache/storm/pull/1484

    STORM-1893 OpenTSDB bolt and trident state for storing any timeseries data.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/satishd/storm tsdb-bolt

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1484
    
----
commit c5972b69bc5ef1d948b5d63946d48fd026376736
Author: Satish Duggana <sd...@hortonworks.com>
Date:   2016-06-08T06:05:42Z

    STORM-1893 OpenTSDB bolt and trident state for storing any timeseries data.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66950623
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/ITupleOpenTsdbDatapointMapper.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    --- End diff --
    
    the classes need to be moved to package org.apache.**storm**.opentsdb.bolt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66925973
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/TupleOpenTsdbDatapointMapper.java ---
    @@ -0,0 +1,120 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.storm.tuple.ITuple;
    +
    +import java.util.Map;
    +
    +/**
    + * Converts {@link org.apache.storm.tuple.Tuple} to {@link OpenTsdbMetricDatapoint}
    --- End diff --
    
    Tuple -> ITuple


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66950068
  
    --- Diff: external/storm-opentsdb/README.md ---
    @@ -0,0 +1,73 @@
    +# Storm OpenTSDB Bolt and TridentState
    +  
    +OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
    +Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
    +configured HBase cluster to push/query the data.
    +
    +Time series data point consists of:
    + - a metric name.
    + - a UNIX timestamp (seconds or milliseconds since Epoch).
    + - a value (64 bit integer or single-precision floating point value).
    + - a set of tags (key-value pairs) that describe the time series the point belongs to.
    +
    +Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper`
    +  
    +This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB.
    +
    +## Examples
    +
    +### Core Bolt
    +Below example describes the usage of core bolt which is `org.apache.stom.opentsdb.bolt.OpenTsdbBolt`
    --- End diff --
    
    The package name should org.apache.*storm*...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66925841
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same insatcne by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    +            }
    +
    +            if (batchHelper.shouldFlush()) {
    +                LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size());
    +
    +                ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet());
    +
    +                if(failTupleForFailedMetrics && clientResponse.getFailed() > 0) {
    +                    final List<ClientResponse.Details.Error> errors = clientResponse.getErrors();
    +                    LOG.error("Some of the metric points failed with errors: [{}]", clientResponse);
    +                    if(errors != null && !errors.isEmpty()) {
    +
    +                        Set<Tuple> failedTuples = new HashSet<>();
    +                        for (ClientResponse.Details.Error error : errors) {
    +                            final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint());
    +                            if(failedTuple != null) {
    +                                failedTuples.add(failedTuple);
    +                            }
    +                        }
    +
    +                        for (Tuple batchedTuple : batchHelper.getBatchTuples()) {
    +                            if(failedTuples.contains(batchedTuple)) {
    +                                collector.fail(batchedTuple);
    +                            } else {
    +                                collector.ack(batchedTuple);
    +                            }
    +                        }
    +
    +                    } else {
    +                        throw new RuntimeException("Some of the metric points failed with details: " + errors);
    +                    }
    +                } else {
    +                    LOG.debug("Acknowledging batched tuples");
    +                    batchHelper.ack();
    +                }
    +                metricPointsWithTuple.clear();
    +            }
    +        } catch (Exception e) {
    +            batchHelper.fail(e);
    +            metricPointsWithTuple.clear();
    +        }
    +    }
    +
    +    private List<OpenTsdbMetricDatapoint> getMetricPoints(Tuple tuple) {
    +        List<OpenTsdbMetricDatapoint> metricDataPoints = new ArrayList<>();
    +        for (TupleOpenTsdbDatapointMapper tupleOpenTsdbDatapointMapper : tupleOpenTsdbDatapointMappers) {
    +            metricDataPoints.add(tupleOpenTsdbDatapointMapper.getMetricPoint(tuple));
    +        }
    +
    +        return metricDataPoints;
    +    }
    +
    +    @Override
    +    public void cleanup() {
    +        openTsdbClient.cleanup();
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        // this is a sink and no result to emit.
    +    }
    +
    +    @Override
    +    public Map<String, Object> getComponentConfiguration() {
    +        return TupleUtils.putTickFrequencyIntoComponentConfig(null, flushIntervalInSeconds);
    --- End diff --
    
    We know that super.getComponentConfiguration() is null, but it would be more clear to state that explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66952076
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/client/ClientResponse.java ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.client;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +/**
    + * This class represents the response from OpenTsdb for a request sent.
    + */
    +public interface ClientResponse extends Serializable {
    +
    +
    +    public class Summary implements ClientResponse {
    +        private int failed;
    +        private int success;
    +
    +        public Summary() {
    +        }
    +
    +        public Summary(int failed, int success) {
    +            this.failed = failed;
    +            this.success = success;
    +        }
    +
    +        public int getFailed() {
    +            return failed;
    +        }
    +
    +        public int getSuccess() {
    +            return success;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (!(o instanceof Summary)) return false;
    +
    +            Summary summary = (Summary) o;
    +
    +            if (failed != summary.failed) return false;
    +            return success == summary.success;
    +
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            int result = failed;
    +            result = 31 * result + success;
    +            return result;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "Summary{" +
    +                    "failed=" + failed +
    +                    ", success=" + success +
    +                    '}';
    +        }
    +    }
    +
    +    public class Details extends Summary {
    +        private List<Error> errors;
    +
    +        public Details() {
    +        }
    +
    +        public Details(int failed, int success, List<Error> errors) {
    +            super(failed, success);
    +            this.errors = errors;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (!(o instanceof Details)) return false;
    +            if (!super.equals(o)) return false;
    +
    +            Details details = (Details) o;
    +
    +            return errors.equals(details.errors);
    +
    +        }
    +
    +        public List<Error> getErrors() {
    +            return errors;
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            int result = super.hashCode();
    +            result = 31 * result + errors.hashCode();
    +            return result;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "Details{" +
    +                    "errors=" + errors +
    +                    '}'+super.toString();
    --- End diff --
    
    minor. '}' can be after super.toString()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66920336
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same insatcne by setting {@code failTupleForFailedMetrics} to true
    --- End diff --
    
    insatcne -> instance


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66953536
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/client/OpenTsdbClient.java ---
    @@ -0,0 +1,151 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.client;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
    +import org.glassfish.jersey.client.ClientConfig;
    +import org.glassfish.jersey.client.ClientProperties;
    +import org.glassfish.jersey.client.RequestEntityProcessing;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import javax.ws.rs.client.Client;
    +import javax.ws.rs.client.ClientBuilder;
    +import javax.ws.rs.client.Entity;
    +import javax.ws.rs.client.WebTarget;
    +import java.io.Serializable;
    +import java.util.Collection;
    +
    +/**
    + * Client to connect to OpenTsdb TSD for storing timeseries datapoints.
    + */
    +public class OpenTsdbClient {
    +    private static final String PUT_PATH = "/api/put";
    +    private static Logger LOG = LoggerFactory.getLogger(OpenTsdbClient.class);
    +
    +    private final String urlString;
    +    private final boolean sync;
    +    private final long syncTimeout;
    +    private final ResponseType responseType;
    +    private final boolean enableChunkedEncoding;
    +
    +    private WebTarget target;
    +    private Client client;
    +
    +    public enum ResponseType {
    +        None(""),
    +        Summary("summary"),
    +        Details("details");
    +
    +        private final String value;
    +
    +        ResponseType(String value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    protected OpenTsdbClient(String urlString, boolean sync, long syncTimeOut, ResponseType responseType, boolean enableChunkedEncoding) {
    +        this.urlString = urlString;
    +        this.sync = sync;
    +        this.syncTimeout = syncTimeOut;
    +        this.responseType = responseType;
    +        this.enableChunkedEncoding = enableChunkedEncoding;
    +
    +        init();
    +    }
    +
    +    private void init() {
    +
    +        final ApacheConnectorProvider apacheConnectorProvider = new ApacheConnectorProvider();
    +        final ClientConfig clientConfig = new ClientConfig().connectorProvider(apacheConnectorProvider);
    +
    +        // transfer encoding should be set as jersey sets it on by default.
    +        clientConfig.property(ClientProperties.REQUEST_ENTITY_PROCESSING,
    +                enableChunkedEncoding ? RequestEntityProcessing.CHUNKED : RequestEntityProcessing.BUFFERED);
    +
    +        client = ClientBuilder.newClient(clientConfig);
    +
    +        target = client.target(urlString).path(PUT_PATH);
    +        if(sync) {
    +            target = target.queryParam("sync").queryParam("sync_timeout", syncTimeout);
    +        }
    +        if(!(responseType == ResponseType.None)) {
    +            target.queryParam(responseType.value);
    +        }
    +
    +        LOG.debug("target uri [{}]", target.getUri());
    --- End diff --
    
    This can be made info. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66956898
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same instance by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    +            }
    +
    +            if (batchHelper.shouldFlush()) {
    +                LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size());
    +
    +                ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet());
    +
    +                if(failTupleForFailedMetrics && clientResponse.getFailed() > 0) {
    +                    final List<ClientResponse.Details.Error> errors = clientResponse.getErrors();
    +                    LOG.error("Some of the metric points failed with errors: [{}]", clientResponse);
    +                    if(errors != null && !errors.isEmpty()) {
    +
    +                        Set<Tuple> failedTuples = new HashSet<>();
    +                        for (ClientResponse.Details.Error error : errors) {
    +                            final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint());
    +                            if(failedTuple != null) {
    +                                failedTuples.add(failedTuple);
    +                            }
    +                        }
    +
    +                        for (Tuple batchedTuple : batchHelper.getBatchTuples()) {
    +                            if(failedTuples.contains(batchedTuple)) {
    +                                collector.fail(batchedTuple);
    +                            } else {
    +                                collector.ack(batchedTuple);
    +                            }
    +                        }
    +
    +                    } else {
    +                        throw new RuntimeException("Some of the metric points failed with details: " + errors);
    +                    }
    +                } else {
    +                    LOG.debug("Acknowledging batched tuples");
    +                    batchHelper.ack();
    +                }
    +                metricPointsWithTuple.clear();
    +            }
    +        } catch (Exception e) {
    --- End diff --
    
    I was thinking that if there is an exception in the part of code dealing with just one tuple e.g. getMetricPoints(tuple), should the whole batch be failed? The part of code inside `batchHelper.shouldFlush` operates on whole batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66933230
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same insatcne by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    --- End diff --
    
    Yeah right. I forgot this. Then I'm OK to have similar behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66957240
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same instance by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    +            }
    +
    +            if (batchHelper.shouldFlush()) {
    +                LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size());
    +
    +                ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet());
    +
    +                if(failTupleForFailedMetrics && clientResponse.getFailed() > 0) {
    +                    final List<ClientResponse.Details.Error> errors = clientResponse.getErrors();
    +                    LOG.error("Some of the metric points failed with errors: [{}]", clientResponse);
    +                    if(errors != null && !errors.isEmpty()) {
    +
    +                        Set<Tuple> failedTuples = new HashSet<>();
    +                        for (ClientResponse.Details.Error error : errors) {
    +                            final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint());
    +                            if(failedTuple != null) {
    +                                failedTuples.add(failedTuple);
    +                            }
    +                        }
    +
    +                        for (Tuple batchedTuple : batchHelper.getBatchTuples()) {
    +                            if(failedTuples.contains(batchedTuple)) {
    +                                collector.fail(batchedTuple);
    +                            } else {
    +                                collector.ack(batchedTuple);
    +                            }
    +                        }
    +
    +                    } else {
    +                        throw new RuntimeException("Some of the metric points failed with details: " + errors);
    +                    }
    +                } else {
    +                    LOG.debug("Acknowledging batched tuples");
    +                    batchHelper.ack();
    +                }
    +                metricPointsWithTuple.clear();
    +            }
    +        } catch (Exception e) {
    --- End diff --
    
    should be fine then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66950531
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/OpenTsdbMetricDatapoint.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb;
    +
    +import java.io.Serializable;
    +import java.util.Collections;
    +import java.util.Map;
    +
    +/**
    + * This class represents a metric data point in OpenTSDB's format.
    + */
    +public class OpenTsdbMetricDatapoint implements Serializable {
    +
    +    // metric name
    +    private final String metric;
    +
    +    // map of tag value pairs
    +    private final Map<String, String> tags;
    +
    +    // timestamp either in milliseconds or seconds at which this metric is occurred.
    +    private final long timestamp;
    +
    +    // value of the metric
    +    private final Object value;
    +
    +    private OpenTsdbMetricDatapoint() {
    +        this(null, null, 0L, null);
    --- End diff --
    
    should null values be allowed?  This can result in NPE in equals/hashCode method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66951360
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    --- End diff --
    
    is flushInterval and batch size not mandatory parameters? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66929834
  
    --- Diff: external/storm-opentsdb/README.md ---
    @@ -0,0 +1,73 @@
    +# Storm OpenTSDB Bolt and TridentState
    +  
    +OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
    +Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
    +configured HBase cluster to push/query the data.
    +
    +Time series data point consists of:
    + - a metric name.
    + - a UNIX timestamp (seconds or millisecinds since Epoch).
    + - a value (64 bit integer or single-precision floating point value).
    + - a set of tags (key-value pairs) that describe the time series the point belongs to.
    +
    +Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper`
    +  
    +This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB.
    +
    +## Examples
    +
    +### Core Bolt
    +Below example describes the usage of core colt viz `org.apache.stom.opentsdb.bolt.OpenTsdbBolt`
    --- End diff --
    
    Thanks for clarifying.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66918729
  
    --- Diff: external/storm-opentsdb/README.md ---
    @@ -0,0 +1,73 @@
    +# Storm OpenTSDB Bolt and TridentState
    +  
    +OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
    +Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
    +configured HBase cluster to push/query the data.
    +
    +Time series data point consists of:
    + - a metric name.
    + - a UNIX timestamp (seconds or millisecinds since Epoch).
    + - a value (64 bit integer or single-precision floating point value).
    + - a set of tags (key-value pairs) that describe the time series the point belongs to.
    +
    +Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper`
    +  
    +This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB.
    +
    +## Examples
    +
    +### Core Bolt
    +Below example describes the usage of core colt viz `org.apache.stom.opentsdb.bolt.OpenTsdbBolt`
    --- End diff --
    
    I'm not sure it is typo, but I guess colt -> bolt and viz -> via.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    Thanks @satishd I had mostly minor comments except the httpclients version upgrade. I would prefer that upgrade to be isolated to storm-opentsdb module. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66918553
  
    --- Diff: external/storm-opentsdb/README.md ---
    @@ -0,0 +1,73 @@
    +# Storm OpenTSDB Bolt and TridentState
    +  
    +OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
    +Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
    +configured HBase cluster to push/query the data.
    +
    +Time series data point consists of:
    + - a metric name.
    + - a UNIX timestamp (seconds or millisecinds since Epoch).
    --- End diff --
    
    millisecinds -> milliseconds


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66954356
  
    --- Diff: external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.opentsdb;
    +
    +import org.apache.stom.opentsdb.bolt.OpenTsdbBolt;
    +import org.apache.stom.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.StormSubmitter;
    +import org.apache.storm.topology.TopologyBuilder;
    +
    +import java.util.Collections;
    +
    +/**
    + * Sample application to use OpenTSDB bolt.
    + */
    +public class SampleOpenTsdbBoltTopology {
    --- End diff --
    
    I feel that this class and MetricGenSpout should be rather moved to examples/storm-starter. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66950359
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/OpenTsdbMetricDatapoint.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb;
    +
    +import java.io.Serializable;
    +import java.util.Collections;
    +import java.util.Map;
    +
    +/**
    + * This class represents a metric data point in OpenTSDB's format.
    + */
    +public class OpenTsdbMetricDatapoint implements Serializable {
    +
    +    // metric name
    +    private final String metric;
    +
    +    // map of tag value pairs
    +    private final Map<String, String> tags;
    +
    +    // timestamp either in milliseconds or seconds at which this metric is occurred.
    +    private final long timestamp;
    +
    +    // value of the metric
    +    private final Object value;
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66955350
  
    --- Diff: external/storm-opentsdb/src/test/java/org/apache/storm/opentsdb/SampleOpenTsdbBoltTopology.java ---
    @@ -0,0 +1,70 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.opentsdb;
    +
    +import org.apache.stom.opentsdb.bolt.OpenTsdbBolt;
    +import org.apache.stom.opentsdb.bolt.TupleOpenTsdbDatapointMapper;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.Config;
    +import org.apache.storm.LocalCluster;
    +import org.apache.storm.StormSubmitter;
    +import org.apache.storm.topology.TopologyBuilder;
    +
    +import java.util.Collections;
    +
    +/**
    + * Sample application to use OpenTSDB bolt.
    + */
    +public class SampleOpenTsdbBoltTopology {
    --- End diff --
    
    storm-opsntsdb's jersey related dependencies had collision with some example topologies. That is the reason why it is kept inside this module for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66922591
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same insatcne by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    --- End diff --
    
    It just counts multiple metric datapoints into one since BatchHelper handles batch via tuple. In fact, "count of mappers" data points are created for each tuple.
    Since target of batching is metric datapoints instead of tuples, it seems not intuitive behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    +1. Thank you @satishd 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    @abhishekagarwal87 Added suggested comment and squashed commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66934149
  
    --- Diff: pom.xml ---
    @@ -228,7 +228,7 @@
             <clojure.math.numeric-tower.version>0.0.1</clojure.math.numeric-tower.version>
             <carbonite.version>1.5.0</carbonite.version>
             <snakeyaml.version>1.11</snakeyaml.version>
    -        <httpclient.version>4.3.3</httpclient.version>
    +        <httpclient.version>4.5</httpclient.version>
    --- End diff --
    
    This is needed for storm-opentsdb. It is good to have this bump for other modules as it has improvements than existing version. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66955486
  
    --- Diff: pom.xml ---
    @@ -228,7 +228,7 @@
             <clojure.math.numeric-tower.version>0.0.1</clojure.math.numeric-tower.version>
             <carbonite.version>1.5.0</carbonite.version>
             <snakeyaml.version>1.11</snakeyaml.version>
    -        <httpclient.version>4.3.3</httpclient.version>
    +        <httpclient.version>4.5</httpclient.version>
    --- End diff --
    
    I am not sure about this change. Is this dependency shaded? There has been a discussion recently on updating versions of non-shaded dependencies. 
    https://www.mail-archive.com/dev@storm.apache.org/msg32949.html
    If it is not shaded, it is better to upgrade the version only for storm-opentsdb module


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66952161
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/client/ClientResponse.java ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.client;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +
    +import java.io.Serializable;
    +import java.util.List;
    +
    +/**
    + * This class represents the response from OpenTsdb for a request sent.
    + */
    +public interface ClientResponse extends Serializable {
    +
    +
    +    public class Summary implements ClientResponse {
    +        private int failed;
    +        private int success;
    +
    +        public Summary() {
    +        }
    +
    +        public Summary(int failed, int success) {
    +            this.failed = failed;
    +            this.success = success;
    +        }
    +
    +        public int getFailed() {
    +            return failed;
    +        }
    +
    +        public int getSuccess() {
    +            return success;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (!(o instanceof Summary)) return false;
    +
    +            Summary summary = (Summary) o;
    +
    +            if (failed != summary.failed) return false;
    +            return success == summary.success;
    +
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            int result = failed;
    +            result = 31 * result + success;
    +            return result;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "Summary{" +
    +                    "failed=" + failed +
    +                    ", success=" + success +
    +                    '}';
    +        }
    +    }
    +
    +    public class Details extends Summary {
    +        private List<Error> errors;
    +
    +        public Details() {
    +        }
    +
    +        public Details(int failed, int success, List<Error> errors) {
    +            super(failed, success);
    +            this.errors = errors;
    +        }
    +
    +        @Override
    +        public boolean equals(Object o) {
    +            if (this == o) return true;
    +            if (!(o instanceof Details)) return false;
    +            if (!super.equals(o)) return false;
    +
    +            Details details = (Details) o;
    +
    +            return errors.equals(details.errors);
    +
    +        }
    +
    +        public List<Error> getErrors() {
    +            return errors;
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            int result = super.hashCode();
    +            result = 31 * result + errors.hashCode();
    +            return result;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "Details{" +
    +                    "errors=" + errors +
    +                    '}'+super.toString();
    +        }
    +
    +        public static class Error implements Serializable {
    +            private String error;
    +            private OpenTsdbMetricDatapoint datapoint;
    +
    +            public Error() {
    +            }
    +
    +            public Error(String error, OpenTsdbMetricDatapoint datapoint) {
    +                this.error = error;
    +                this.datapoint = datapoint;
    +            }
    +
    +            public String getError() {
    +                return error;
    +            }
    +
    +            public OpenTsdbMetricDatapoint getDatapoint() {
    +                return datapoint;
    +            }
    +
    +            @Override
    +            public boolean equals(Object o) {
    +                if (this == o) return true;
    +                if (!(o instanceof Error)) return false;
    +
    +                Error error1 = (Error) o;
    +
    +                if (!error.equals(error1.error)) return false;
    +                return datapoint.equals(error1.datapoint);
    +
    +            }
    +
    +            @Override
    +            public int hashCode() {
    +                int result = error.hashCode();
    +                result = 31 * result + datapoint.hashCode();
    +                return result;
    +            }
    +
    +            @Override
    +            public String toString() {
    +                return "Error{" +
    +                        "error='" + error + '\'' +
    +                        ", datapoints=" + datapoint +
    --- End diff --
    
    typo. datapoint=


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    @harshach Added it to binary.xml.
    
    @HeartSaVioR @abhishekagarwal87 Addressed review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66956682
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same instance by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    +            }
    +
    +            if (batchHelper.shouldFlush()) {
    +                LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size());
    +
    +                ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet());
    +
    +                if(failTupleForFailedMetrics && clientResponse.getFailed() > 0) {
    +                    final List<ClientResponse.Details.Error> errors = clientResponse.getErrors();
    +                    LOG.error("Some of the metric points failed with errors: [{}]", clientResponse);
    +                    if(errors != null && !errors.isEmpty()) {
    +
    +                        Set<Tuple> failedTuples = new HashSet<>();
    +                        for (ClientResponse.Details.Error error : errors) {
    +                            final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint());
    +                            if(failedTuple != null) {
    +                                failedTuples.add(failedTuple);
    +                            }
    +                        }
    +
    +                        for (Tuple batchedTuple : batchHelper.getBatchTuples()) {
    +                            if(failedTuples.contains(batchedTuple)) {
    +                                collector.fail(batchedTuple);
    +                            } else {
    +                                collector.ack(batchedTuple);
    +                            }
    +                        }
    +
    +                    } else {
    +                        throw new RuntimeException("Some of the metric points failed with details: " + errors);
    +                    }
    +                } else {
    +                    LOG.debug("Acknowledging batched tuples");
    +                    batchHelper.ack();
    +                }
    +                metricPointsWithTuple.clear();
    +            }
    +        } catch (Exception e) {
    --- End diff --
    
    This contract is along the lines of `HBaseBolt`, `HiveBolt` etc which are using `BatchHelper`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66954285
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/OpenTsdbMetricDatapoint.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb;
    +
    +import java.io.Serializable;
    +import java.util.Collections;
    +import java.util.Map;
    +
    +/**
    + * This class represents a metric data point in OpenTSDB's format.
    + */
    +public class OpenTsdbMetricDatapoint implements Serializable {
    +
    +    // metric name
    +    private final String metric;
    +
    +    // map of tag value pairs
    +    private final Map<String, String> tags;
    +
    +    // timestamp either in milliseconds or seconds at which this metric is occurred.
    +    private final long timestamp;
    +
    +    // value of the metric
    +    private final Object value;
    +
    +    private OpenTsdbMetricDatapoint() {
    +        this(null, null, 0L, null);
    --- End diff --
    
    This is for jackson serialization and it is a private constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    @abhishekagarwal87 can you check if your comments are addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    @satishd - can you add some comment on the private constructor in OpenTsdb regarding jackson serialization? Please squash the commits as well since it is ready for merge. 
    Rest looks good to me. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    I've taken a look through whole places. Looks good overall.
    I'd like to sponsor on this module. Could you add me to sponsor? Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66951119
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    --- End diff --
    
    link of example topology is missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66918897
  
    --- Diff: external/storm-opentsdb/README.md ---
    @@ -0,0 +1,73 @@
    +# Storm OpenTSDB Bolt and TridentState
    +  
    +OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
    +Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
    +configured HBase cluster to push/query the data.
    +
    +Time series data point consists of:
    + - a metric name.
    + - a UNIX timestamp (seconds or millisecinds since Epoch).
    + - a value (64 bit integer or single-precision floating point value).
    + - a set of tags (key-value pairs) that describe the time series the point belongs to.
    +
    +Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper`
    +  
    +This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB.
    +
    +## Examples
    +
    +### Core Bolt
    +Below example describes the usage of core colt viz `org.apache.stom.opentsdb.bolt.OpenTsdbBolt`
    +
    +```java
    +
    +        OpenTsdbClient.Builder builder =  OpenTsdbClient.newBuilder(openTsdbUrl).sync(30_000).returnDetails();
    +        final OpenTsdbBolt openTsdbBolt = new OpenTsdbBolt(builder, Collections.singletonList(TupleOpenTsdbDatapointMapper.DEFAULT_MAPPER));
    +        openTsdbBolt.withBatchSize(10).withFlushInterval(2000);
    +        topologyBuilder.setBolt("opentsdb", openTsdbBolt).shuffleGrouping("metric-gen");
    +        
    +```
    +
    +
    +### Trident Bolt
    --- End diff --
    
    Bolt -> State


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    Since we have possibility to write same data points in multiple times, it might be helpful to mention `tsd.storage.fix_duplicates` for OpenTSDB 2.1.
    It's on `Duplicate Data Points` sections from http://opentsdb.net/docs/build/html/user_guide/writing.html
    
    Or we can just mention that points are written at-least-once so users need to take care of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    +1 . Overall looks good to me. @satishd can you add this module here as well https://github.com/apache/storm/blob/master/storm-dist/binary/src/main/assembly/binary.xml#L171


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66956202
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same instance by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    +            }
    +
    +            if (batchHelper.shouldFlush()) {
    +                LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size());
    +
    +                ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet());
    +
    +                if(failTupleForFailedMetrics && clientResponse.getFailed() > 0) {
    +                    final List<ClientResponse.Details.Error> errors = clientResponse.getErrors();
    +                    LOG.error("Some of the metric points failed with errors: [{}]", clientResponse);
    +                    if(errors != null && !errors.isEmpty()) {
    +
    +                        Set<Tuple> failedTuples = new HashSet<>();
    +                        for (ClientResponse.Details.Error error : errors) {
    +                            final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint());
    +                            if(failedTuple != null) {
    +                                failedTuples.add(failedTuple);
    +                            }
    +                        }
    +
    +                        for (Tuple batchedTuple : batchHelper.getBatchTuples()) {
    +                            if(failedTuples.contains(batchedTuple)) {
    +                                collector.fail(batchedTuple);
    +                            } else {
    +                                collector.ack(batchedTuple);
    +                            }
    +                        }
    +
    +                    } else {
    +                        throw new RuntimeException("Some of the metric points failed with details: " + errors);
    +                    }
    +                } else {
    +                    LOG.debug("Acknowledging batched tuples");
    +                    batchHelper.ack();
    +                }
    +                metricPointsWithTuple.clear();
    +            }
    +        } catch (Exception e) {
    --- End diff --
    
    `batchHelper.shouldFlush()` checks only whether to flush or not. `batchHelper.fail(e)` fails the whole batch of tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    +1
    Thanks @satishd for the great work!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66950323
  
    --- Diff: external/storm-opentsdb/pom.xml ---
    @@ -0,0 +1,97 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + Licensed to the Apache Software Foundation (ASF) under one or more
    + contributor license agreements.  See the NOTICE file distributed with
    + this work for additional information regarding copyright ownership.
    + The ASF licenses this file to You under the Apache License, Version 2.0
    + (the "License"); you may not use this file except in compliance with
    + the License.  You may obtain a copy of the License at
    +
    +     http://www.apache.org/licenses/LICENSE-2.0
    +
    + Unless required by applicable law or agreed to in writing, software
    + distributed under the License is distributed on an "AS IS" BASIS,
    + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + See the License for the specific language governing permissions and
    + limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <artifactId>storm</artifactId>
    +        <groupId>org.apache.storm</groupId>
    +        <version>2.0.0-SNAPSHOT</version>
    +        <relativePath>../../pom.xml</relativePath>
    +    </parent>
    +
    +    <artifactId>storm-opentsdb</artifactId>
    +
    +    <developers>
    +        <developer>
    +            <id>satishd</id>
    +            <name>Satish Duggana</name>
    +            <email>satish.duggana@gmail.com</email>
    +        </developer>
    +    </developers>
    +
    +    <properties>
    +        <jersey.version>2.23</jersey.version>
    +    </properties>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.storm</groupId>
    +            <artifactId>storm-core</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +            <version>18.0</version>
    --- End diff --
    
    should this come from parent pom?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1484: STORM-1893 OpenTSDB bolt and trident state for storing an...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1484
  
    Removed http client version upgrade as part of this change and it is isolated only to storm-opentsdb.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66953314
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/bolt/OpenTsdbBolt.java ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb.bolt;
    +
    +import org.apache.stom.opentsdb.OpenTsdbMetricDatapoint;
    +import org.apache.stom.opentsdb.client.ClientResponse;
    +import org.apache.stom.opentsdb.client.OpenTsdbClient;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.utils.BatchHelper;
    +import org.apache.storm.utils.TupleUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +/**
    + * Basic bolt implementation for storing timeseries datapoints to OpenTSDB.
    + *
    + * List of {@link OpenTsdbMetricDatapoint} is generated for each tuple with the configured {@code tupleOpenTsdbDatapointMappers}.
    + * All these datapoints are batched till the given {@code batchSize} or {@code flushIntervalInSeconds} is reached.
    + *
    + * Example topology:
    + *
    + */
    +public class OpenTsdbBolt extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(OpenTsdbBolt.class);
    +
    +    private final OpenTsdbClient.Builder openTsdbClientBuilder;
    +    private final List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers;
    +    private int batchSize;
    +    private int flushIntervalInSeconds;
    +    private boolean failTupleForFailedMetrics;
    +
    +    private BatchHelper batchHelper;
    +    private OpenTsdbClient openTsdbClient;
    +    private Map<OpenTsdbMetricDatapoint, Tuple> metricPointsWithTuple = new HashMap<>();
    +    private OutputCollector collector;
    +
    +    public OpenTsdbBolt(OpenTsdbClient.Builder openTsdbClientBuilder, List<TupleOpenTsdbDatapointMapper> tupleOpenTsdbDatapointMappers) {
    +        this.openTsdbClientBuilder = openTsdbClientBuilder;
    +        this.tupleOpenTsdbDatapointMappers = tupleOpenTsdbDatapointMappers;
    +    }
    +
    +    public OpenTsdbBolt withFlushInterval(int flushIntervalInSeconds) {
    +        this.flushIntervalInSeconds = flushIntervalInSeconds;
    +        return this;
    +    }
    +
    +    public OpenTsdbBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    /**
    +     * When it is invoked, this bolt acks only the tuples which have successful metrics stored into OpenTSDB and fails
    +     * the respective tuples of the failed metrics.
    +     *
    +     * @return same instance by setting {@code failTupleForFailedMetrics} to true
    +     */
    +    public OpenTsdbBolt failTupleForFailedMetrics() {
    +        this.failTupleForFailedMetrics = true;
    +        return this;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        batchHelper = new BatchHelper(batchSize, collector);
    +        openTsdbClient = openTsdbClientBuilder.build();
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            if (batchHelper.shouldHandle(tuple)) {
    +                final List<OpenTsdbMetricDatapoint> metricDataPoints = getMetricPoints(tuple);
    +                for (OpenTsdbMetricDatapoint metricDataPoint : metricDataPoints) {
    +                    metricPointsWithTuple.put(metricDataPoint, tuple);
    +                }
    +                batchHelper.addBatch(tuple);
    +            }
    +
    +            if (batchHelper.shouldFlush()) {
    +                LOG.debug("Sending metrics of size [{}]", metricPointsWithTuple.size());
    +
    +                ClientResponse.Details clientResponse = openTsdbClient.writeMetricPoints(metricPointsWithTuple.keySet());
    +
    +                if(failTupleForFailedMetrics && clientResponse.getFailed() > 0) {
    +                    final List<ClientResponse.Details.Error> errors = clientResponse.getErrors();
    +                    LOG.error("Some of the metric points failed with errors: [{}]", clientResponse);
    +                    if(errors != null && !errors.isEmpty()) {
    +
    +                        Set<Tuple> failedTuples = new HashSet<>();
    +                        for (ClientResponse.Details.Error error : errors) {
    +                            final Tuple failedTuple = metricPointsWithTuple.get(error.getDatapoint());
    +                            if(failedTuple != null) {
    +                                failedTuples.add(failedTuple);
    +                            }
    +                        }
    +
    +                        for (Tuple batchedTuple : batchHelper.getBatchTuples()) {
    +                            if(failedTuples.contains(batchedTuple)) {
    +                                collector.fail(batchedTuple);
    +                            } else {
    +                                collector.ack(batchedTuple);
    +                            }
    +                        }
    +
    +                    } else {
    +                        throw new RuntimeException("Some of the metric points failed with details: " + errors);
    +                    }
    +                } else {
    +                    LOG.debug("Acknowledging batched tuples");
    +                    batchHelper.ack();
    +                }
    +                metricPointsWithTuple.clear();
    +            }
    +        } catch (Exception e) {
    --- End diff --
    
    should this catch be inside `batchHelper.shouldFlush()` since whole batch is failed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66929422
  
    --- Diff: external/storm-opentsdb/README.md ---
    @@ -0,0 +1,73 @@
    +# Storm OpenTSDB Bolt and TridentState
    +  
    +OpenTSDB offers a scalable and highly available storage for time series data. It consists of a
    +Time Series Daemon (TSD) servers along with command line utilities. Each TSD connects to the 
    +configured HBase cluster to push/query the data.
    +
    +Time series data point consists of:
    + - a metric name.
    + - a UNIX timestamp (seconds or millisecinds since Epoch).
    + - a value (64 bit integer or single-precision floating point value).
    + - a set of tags (key-value pairs) that describe the time series the point belongs to.
    +
    +Storm bolt and trident state creates the above time series data from a tuple based on the given `TupleMetricPointMapper`
    +  
    +This module provides core Storm and Trident bolt implementations for writing data to OpenTSDB.
    +
    +## Examples
    +
    +### Core Bolt
    +Below example describes the usage of core colt viz `org.apache.stom.opentsdb.bolt.OpenTsdbBolt`
    --- End diff --
    
    viz mean `which is`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by abhishekagarwal87 <gi...@git.apache.org>.
Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1484#discussion_r66956501
  
    --- Diff: external/storm-opentsdb/src/main/java/org/apache/stom/opentsdb/OpenTsdbMetricDatapoint.java ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.stom.opentsdb;
    +
    +import java.io.Serializable;
    +import java.util.Collections;
    +import java.util.Map;
    +
    +/**
    + * This class represents a metric data point in OpenTSDB's format.
    + */
    +public class OpenTsdbMetricDatapoint implements Serializable {
    +
    +    // metric name
    +    private final String metric;
    +
    +    // map of tag value pairs
    +    private final Map<String, String> tags;
    +
    +    // timestamp either in milliseconds or seconds at which this metric is occurred.
    +    private final long timestamp;
    +
    +    // value of the metric
    +    private final Object value;
    +
    +    private OpenTsdbMetricDatapoint() {
    +        this(null, null, 0L, null);
    --- End diff --
    
    Thanks for explanation satish. can you add this as a comment on the constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1484: STORM-1893 OpenTSDB bolt and trident state for sto...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1484


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---