You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by omkreddy <gi...@git.apache.org> on 2016/07/21 14:31:27 UTC

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

GitHub user omkreddy opened a pull request:

    https://github.com/apache/storm/pull/1583

    STORM-1979: Storm Druid Connector implementation.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/omkreddy/storm storm-druid

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1583.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1583
    
----
commit c1b01e32f78cb5c75e20c5db20365f62d2ad3ce9
Author: Manikumar Reddy O <ma...@gmail.com>
Date:   2016-07-21T11:57:03Z

    STORM-1979: Storm Druid Connector implementation. This uses Druid's tranquility library.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72081639
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    +    }
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if(cause instanceof MessageDroppedException) {
    --- End diff --
    
    currently druid logs discarded messages.  Also I need to find way to handle this in Trident bolt. I will do this when need arises. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73549987
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    --- End diff --
    
    @omkreddy I need to look into this more to understand. But we shouldn't be saying that the first tuple is the one that goes into druid. Take a look other connectors in hbase, hive see if mapper is something we can use here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73538213
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    --- End diff --
    
    Druid tranquility client library take a generic type for the events to send. 
    The bolt need to know the type of the event. By default the bolt expects to receive tuples in which the zeroth element is event type. Now this event position in tuple list is made configurable. Users can specify the event position using DruidConfig.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73744692
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * By default this Bolt expects to receive tuples in which "event" field gives your event type.
    + * This logic can be changed by implementing ITupleDruidEventMapper interface.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private DruidConfig druidConfig = null;
    +    private Tranquilizer<E> tranquilizer = null;
    +    private ITupleDruidEventMapper<E> druidEventMapper = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) {
    +        this.beamFactory = beamFactory;
    +        this.druidConfig = druidConfig;
    +        this.druidEventMapper = druidEventMapper;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.builder()
    +                .maxBatchSize(druidConfig.getMaxBatchSize())
    +                .maxPendingBatches(druidConfig.getMaxPendingBatches())
    +                .lingerMillis(druidConfig.getLingerMillis())
    +                .blockOnFull(druidConfig.isBlockOnFull())
    +                .build(beamFactory.makeBeam(stormConf, context));
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if (cause instanceof MessageDroppedException) {
    +                  collector.ack(tuple);
    +                  if (druidConfig.isEnableDiscardStream())
    --- End diff --
    
    @satishd  Since this is internal implementation, I decided to use existing boolean variable.  I felt boolean check is better than null check.  Updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @satishd added you as committer sponsor.  also addressed review comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    +1 after addressing minor comment in README doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @omkreddy You can add me as committer sponsor for this module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73575541
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +
    +import java.io.Serializable;
    +
    +public class DruidConfig implements Serializable {
    +
    +    //Tranquilizer configs for DruidBeamBolt
    +    private int maxBatchSize;
    +    private int maxPendingBatches;
    +    private long lingerMillis;
    +    private boolean blockOnFull;
    +    private boolean enableDiscardStream;
    +    private String discardStreamId;
    +
    +    public int getMaxBatchSize() {
    +        return maxBatchSize;
    +    }
    +
    +    public int getMaxPendingBatches() {
    +        return maxPendingBatches;
    +    }
    +
    +    public long getLingerMillis() {
    +        return lingerMillis;
    +    }
    +
    +    public boolean isBlockOnFull() {
    +        return blockOnFull;
    +    }
    +
    +    public boolean isEnableDiscardStream() {
    +        return enableDiscardStream;
    +    }
    +
    +    public String getDiscardStreamId() {
    +        return discardStreamId;
    +    }
    +
    +    private DruidConfig(Builder builder) {
    +        this.maxBatchSize = builder.maxBatchSize;
    +        this.maxPendingBatches = builder.maxPendingBatches;
    +        this.lingerMillis = builder.lingerMillis;
    +        this.blockOnFull = builder.blockOnFull;
    +        this.enableDiscardStream = builder.enableDiscardStream;
    +        this.discardStreamId = builder.discardStreamId;
    +    }
    +
    +    public static DruidConfig.Builder newBuilder() {
    +        return new Builder();
    +    }
    +
    +    public static class Builder {
    +        private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
    +        private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
    +        private long lingerMillis = Tranquilizer.DefaultLingerMillis();
    +        private boolean blockOnFull =  Tranquilizer.DefaultBlockOnFull();
    +        private boolean enableDiscardStream = false;
    +        private String discardStreamId ="druid-discard-stream";
    --- End diff --
    
    @harshachlet me know your opinion on the above comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72303978
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    +    }
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if(cause instanceof MessageDroppedException) {
    --- End diff --
    
    I am +1 on @satishd recommendation. Atleast we can drive through configuration i.e druid can discard messages and we can provide discard stream of sorts via config if users configures enableDiscardStream and discardStream="druid-discard-stream" than you write the tuples there. Logging on druid is great but this will allow users to act on them immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @ptgoetz Thanks. I added you as committer sponsor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @omkreddy Let me know once the comments are addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73727010
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * By default this Bolt expects to receive tuples in which "event" field gives your event type.
    + * This logic can be changed by implementing ITupleDruidEventMapper interface.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    --- End diff --
    
    `LOG` is never used. You can keep it if you want to use some logging in this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @harshach @satishd I have addressed review comments. Pl review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72304436
  
    --- Diff: external/storm-druid/pom.xml ---
    @@ -0,0 +1,88 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + Licensed to the Apache Software Foundation (ASF) under one or more
    + contributor license agreements.  See the NOTICE file distributed with
    + this work for additional information regarding copyright ownership.
    + The ASF licenses this file to You under the Apache License, Version 2.0
    + (the "License"); you may not use this file except in compliance with
    + the License.  You may obtain a copy of the License at
    +
    +     http://www.apache.org/licenses/LICENSE-2.0
    +
    + Unless required by applicable law or agreed to in writing, software
    + distributed under the License is distributed on an "AS IS" BASIS,
    + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + See the License for the specific language governing permissions and
    + limitations under the License.
    +-->
    +
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <parent>
    +        <artifactId>storm</artifactId>
    +        <groupId>org.apache.storm</groupId>
    +        <version>2.0.0-SNAPSHOT</version>
    +        <relativePath>../../pom.xml</relativePath>
    +    </parent>
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <artifactId>storm-druid</artifactId>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.storm</groupId>
    +            <artifactId>storm-core</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>io.druid</groupId>
    +            <artifactId>tranquility-core_2.11</artifactId>
    +            <version>0.8.2</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.scala-lang</groupId>
    +            <artifactId>scala-library</artifactId>
    +            <version>2.11.8</version>
    --- End diff --
    
    I take back the earlier comment. sorry about that,  the dependencies shouldn't be provided as we don't want to package them into the druid jar.
    also can we drive these versions from parent pom.xml so that we can configure better. Also make the scala version 2.10.5 unless druid recommends 2.11.8.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @satishd Thanks for the review. addressed review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73562274
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    --- End diff --
    
    @omkreddy I guess we have discussed to add a default mapper implementation to return a field for the give name.  But user can always implement Mapper interface and return respective object to be sent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72371043
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import org.apache.storm.trident.operation.TridentCollector;
    +import org.apache.storm.trident.state.BaseStateUpdater;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
    +
    +    @Override
    +    public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
    +        List<E> list = new LinkedList<>();
    +
    +        for (TridentTuple tuple: tuples) {
    +            list.add((E)tuple.getValue(0));
    --- End diff --
    
    same as above its not clear why are we only picking the first value in the tuple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71824642
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    +    }
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if(cause instanceof MessageDroppedException) {
    --- End diff --
    
    > MessageDroppedException: Exception indicating that a message was dropped "on purpose" by the beam. This is not a recoverable exception and so the message must be discarded.
    
    These tuples can be acked and send all these discarded tuples to a given stream so that user can know which tuples are discarded and take an appropriate action. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73777827
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * By default this Bolt expects to receive tuples in which "event" field gives your event type.
    + * This logic can be changed by implementing ITupleDruidEventMapper interface.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private DruidConfig druidConfig = null;
    +    private Tranquilizer<E> tranquilizer = null;
    +    private ITupleDruidEventMapper<E> druidEventMapper = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) {
    +        this.beamFactory = beamFactory;
    +        this.druidConfig = druidConfig;
    +        this.druidEventMapper = druidEventMapper;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.builder()
    +                .maxBatchSize(druidConfig.getMaxBatchSize())
    +                .maxPendingBatches(druidConfig.getMaxPendingBatches())
    +                .lingerMillis(druidConfig.getLingerMillis())
    +                .blockOnFull(druidConfig.isBlockOnFull())
    +                .build(beamFactory.makeBeam(stormConf, context));
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if (cause instanceof MessageDroppedException) {
    +                  collector.ack(tuple);
    +                  if (druidConfig.isEnableDiscardStream())
    --- End diff --
    
    @omkreddy It is not really an internal implementation. `druidConfig.isEnableDiscardStream()` was still using `enableDiscardStream`, which should be set by user. Idea is to avoid that and I can see latest changes reflect on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72305621
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import com.metamx.tranquility.beam.Beam;
    +import com.twitter.util.Await;
    +import org.apache.storm.trident.state.State;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.collection.JavaConversions;
    +
    +import java.util.List;
    +
    +
    +/**
    + * Trident {@link State} implementation for Druid.
    + */
    +public class DruidBeamState<E> implements State  {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
    +
    +    Beam<E> beam = null;
    +
    +    public DruidBeamState(Beam<E> beam) {
    +        this.beam = beam;
    +    }
    +
    +    public void send(List<E> events ) {
    +        try {
    +            LOG.info("Sending %d events", events.size());
    +            Await.result(beam.sendBatch(JavaConversions.collectionAsScalaIterable(events).toList()));
    --- End diff --
    
    looks like indentation missed here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71719407
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    --- End diff --
    
    lets remove this constructor as users might not notice that we are setting the batchSize to 2000.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73573885
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java ---
    @@ -0,0 +1,48 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import org.apache.storm.trident.operation.TridentCollector;
    +import org.apache.storm.trident.state.BaseStateUpdater;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class);
    +
    +    @Override
    +    public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
    +        List<E> discardedTuples = state.update(tuples, collector);
    +        processDiscardedTuples(discardedTuples);
    +    }
    +
    +    /**
    +     * Users can override this method to  process the discarded Tuples
    +     * @param discardedTuples
    +     */
    +    protected void processDiscardedTuples(List<E> discardedTuples) {
    +        LOG.debug("discarded messages : %s" , discardedTuples);
    --- End diff --
    
    Logger format convention is {} instead of %s. You may want to have 
    `LOG.debug("discarded messages : [{}]" , discardedTuples)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73778048
  
    --- Diff: external/storm-druid/README.md ---
    @@ -0,0 +1,143 @@
    +# Storm Druid Bolt and TridentState
    +
    +This module provides core Storm and Trident bolt implementations for writing data to [Druid] (http://druid.io/) data store.
    --- End diff --
    
    @omkreddy Remove space between [] and () to make MD generate a hyperlink. Currently it shows as text. Same with other places in this doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71824739
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    +    }
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if(cause instanceof MessageDroppedException) {
    +                  synchronized (collector) {
    --- End diff --
    
    Does `IOutputCollector#ack/fail` need synchronization? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72387042
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import com.metamx.tranquility.beam.Beam;
    +import com.twitter.util.Await;
    +import org.apache.storm.trident.state.State;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.collection.JavaConversions;
    +
    +import java.util.List;
    +
    +
    +/**
    + * Trident {@link State} implementation for Druid.
    + */
    +public class DruidBeamState<E> implements State  {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
    +
    +    Beam<E> beam = null;
    +
    +    public DruidBeamState(Beam<E> beam) {
    +        this.beam = beam;
    +    }
    +
    +    public void send(List<E> events ) {
    +        try {
    +            LOG.info("Sending %d events", events.size());
    +            Await.result(beam.sendBatch(JavaConversions.collectionAsScalaIterable(events).toList()));
    +        } catch (Exception e) {
    +            final String errorMsg = "Failed in writing messages to Druid";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    +        }
    +    }
    +
    +    public void close() {
    +        try {
    +            Await.result(beam.close());
    +        } catch (Exception e) {
    +            final String errorMsg = "Error while closing Druid beam client";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    +        }
    +    }
    +
    +    @Override
    +    public void beginCommit(Long txid) {
    --- End diff --
    
    @harshach most of the trident states ignore the `beginCommit` and `commit`, because it may be very difficult to handle idempotent updates in a generic state. beginCommit and commit is currently implemented only in HdfsState. The downside is that such states would provide only at-least once semantics.
    
    However key value based stores can provide an `IBackingMap` implementation whereby exactly once semantics can be supported. For e.g. the Redis trident implementation provides a RedisMapState. 
    
    If druid supports key/value based abstraction, the connector should probably provide a MapState implementation. If not the beginCommit and commit should be handled within the DruidBeamState if we are looking to provide exactly once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72082307
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    --- End diff --
    
    removed the constructor.  Now users has to pass the batchSize. as of now we have only one config value. will add config class when need arises.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73574354
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    --- End diff --
    
    @satishd Thanks. Earlier I misunderstood your suggestion.
    @harshach I added a mapper interface and default implementation.
    Thanks for the  review. Updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73741669
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Tuple;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * By default this Bolt expects to receive tuples in which "event" field gives your event type.
    + * This logic can be changed by implementing ITupleDruidEventMapper interface.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private DruidConfig druidConfig = null;
    +    private Tranquilizer<E> tranquilizer = null;
    +    private ITupleDruidEventMapper<E> druidEventMapper = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) {
    +        this.beamFactory = beamFactory;
    +        this.druidConfig = druidConfig;
    +        this.druidEventMapper = druidEventMapper;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.builder()
    +                .maxBatchSize(druidConfig.getMaxBatchSize())
    +                .maxPendingBatches(druidConfig.getMaxPendingBatches())
    +                .lingerMillis(druidConfig.getLingerMillis())
    +                .blockOnFull(druidConfig.isBlockOnFull())
    +                .build(beamFactory.makeBeam(stormConf, context));
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if (cause instanceof MessageDroppedException) {
    +                  collector.ack(tuple);
    +                  if (druidConfig.isEnableDiscardStream())
    --- End diff --
    
    You can check `druidConfig.getDiscardStreamId() != null` and emit to that stream, no need to have `isEnableDiscardStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1583


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @satishd  addressed review comments and squashed the commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73572400
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java ---
    @@ -0,0 +1,48 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import org.apache.storm.trident.operation.TridentCollector;
    +import org.apache.storm.trident.state.BaseStateUpdater;
    +import org.apache.storm.trident.tuple.TridentTuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class);
    +
    +    @Override
    +    public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
    +        List<E> discardedTuples = state.update(tuples, collector);
    +        processDiscardedTuples(discardedTuples);
    +    }
    +
    +    /**
    +     * Users can override this method to  process the discarded Tuples
    +     * @param discardedTuples
    +     */
    +    private void processDiscardedTuples(List<E> discardedTuples) {
    --- End diff --
    
    This should be `protected` as users can extend this and add their own functionality to handle discard tuples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73721068
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +
    +import java.io.Serializable;
    +
    +public class DruidConfig implements Serializable {
    +
    +    //Tranquilizer configs for DruidBeamBolt
    +    private int maxBatchSize;
    +    private int maxPendingBatches;
    +    private long lingerMillis;
    +    private boolean blockOnFull;
    +    private boolean enableDiscardStream;
    +    private String discardStreamId;
    +
    +    public int getMaxBatchSize() {
    +        return maxBatchSize;
    +    }
    +
    +    public int getMaxPendingBatches() {
    +        return maxPendingBatches;
    +    }
    +
    +    public long getLingerMillis() {
    +        return lingerMillis;
    +    }
    +
    +    public boolean isBlockOnFull() {
    +        return blockOnFull;
    +    }
    +
    +    public boolean isEnableDiscardStream() {
    +        return enableDiscardStream;
    +    }
    +
    +    public String getDiscardStreamId() {
    +        return discardStreamId;
    +    }
    +
    +    private DruidConfig(Builder builder) {
    +        this.maxBatchSize = builder.maxBatchSize;
    +        this.maxPendingBatches = builder.maxPendingBatches;
    +        this.lingerMillis = builder.lingerMillis;
    +        this.blockOnFull = builder.blockOnFull;
    +        this.enableDiscardStream = builder.enableDiscardStream;
    +        this.discardStreamId = builder.discardStreamId;
    +    }
    +
    +    public static DruidConfig.Builder newBuilder() {
    +        return new Builder();
    +    }
    +
    +    public static class Builder {
    +        private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
    +        private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
    +        private long lingerMillis = Tranquilizer.DefaultLingerMillis();
    +        private boolean blockOnFull =  Tranquilizer.DefaultBlockOnFull();
    +        private boolean enableDiscardStream = false;
    +        private String discardStreamId ="druid-discard-stream";
    --- End diff --
    
    @satishd sorry missed it. That makes sense. @omkreddy can you address the above as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73725602
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import org.apache.storm.tuple.ITuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Converts {@link ITuple} to Event
    + */
    +public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper {
    +    private static final Logger LOG = LoggerFactory.getLogger(TupleDruidEventMapper.class);
    --- End diff --
    
    @omkreddy You may want to remove this as it is never used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71719105
  
    --- Diff: external/storm-druid/pom.xml ---
    @@ -0,0 +1,88 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + Licensed to the Apache Software Foundation (ASF) under one or more
    + contributor license agreements.  See the NOTICE file distributed with
    + this work for additional information regarding copyright ownership.
    + The ASF licenses this file to You under the Apache License, Version 2.0
    + (the "License"); you may not use this file except in compliance with
    + the License.  You may obtain a copy of the License at
    +
    +     http://www.apache.org/licenses/LICENSE-2.0
    +
    + Unless required by applicable law or agreed to in writing, software
    + distributed under the License is distributed on an "AS IS" BASIS,
    + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + See the License for the specific language governing permissions and
    + limitations under the License.
    +-->
    +
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <parent>
    +        <artifactId>storm</artifactId>
    +        <groupId>org.apache.storm</groupId>
    +        <version>2.0.0-SNAPSHOT</version>
    +        <relativePath>../../pom.xml</relativePath>
    +    </parent>
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <artifactId>storm-druid</artifactId>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.storm</groupId>
    +            <artifactId>storm-core</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>io.druid</groupId>
    +            <artifactId>tranquility-core_2.11</artifactId>
    +            <version>0.8.2</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.scala-lang</groupId>
    +            <artifactId>scala-library</artifactId>
    +            <version>2.11.8</version>
    --- End diff --
    
    we should be making these provided 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72304830
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import com.metamx.tranquility.beam.Beam;
    +import com.twitter.util.Await;
    +import org.apache.storm.trident.state.State;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.collection.JavaConversions;
    +
    +import java.util.List;
    +
    +
    +/**
    + * Trident {@link State} implementation for Druid.
    + */
    +public class DruidBeamState<E> implements State  {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
    +
    +    Beam<E> beam = null;
    +
    +    public DruidBeamState(Beam<E> beam) {
    +        this.beam = beam;
    +    }
    +
    +    public void send(List<E> events ) {
    +        try {
    +            LOG.info("Sending %d events", events.size());
    +            Await.result(beam.sendBatch(JavaConversions.collectionAsScalaIterable(events).toList()));
    +        } catch (Exception e) {
    +            final String errorMsg = "Failed in writing messages to Druid";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    +        }
    +    }
    +
    +    public void close() {
    +        try {
    +            Await.result(beam.close());
    +        } catch (Exception e) {
    +            final String errorMsg = "Error while closing Druid beam client";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    --- End diff --
    
    same as above. we shouldn't be throwing run-time exceptions when close is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73741418
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +
    +import java.io.Serializable;
    +
    +public class DruidConfig implements Serializable {
    +
    +    public static final String DEFAULT_DISCARD_STREAM_ID = "druid-discard-stream";
    +
    +    //Tranquilizer configs for DruidBeamBolt
    +    private int maxBatchSize;
    +    private int maxPendingBatches;
    +    private long lingerMillis;
    +    private boolean blockOnFull;
    +    private boolean enableDiscardStream;
    --- End diff --
    
    @omkreddy We do not need this as it was suggested in earlier comments. User would give only discardStreamId if discarded events need to be handled.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72304747
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import com.metamx.tranquility.beam.Beam;
    +import com.twitter.util.Await;
    +import org.apache.storm.trident.state.State;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.collection.JavaConversions;
    +
    +import java.util.List;
    +
    +
    +/**
    + * Trident {@link State} implementation for Druid.
    + */
    +public class DruidBeamState<E> implements State  {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
    +
    +    Beam<E> beam = null;
    +
    +    public DruidBeamState(Beam<E> beam) {
    +        this.beam = beam;
    +    }
    +
    +    public void send(List<E> events ) {
    +        try {
    +            LOG.info("Sending %d events", events.size());
    +            Await.result(beam.sendBatch(JavaConversions.collectionAsScalaIterable(events).toList()));
    +        } catch (Exception e) {
    +            final String errorMsg = "Failed in writing messages to Druid";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    --- End diff --
    
    do we want to throw the run-time exception. What if this is a temporary failure and on a re-try we might succeed in those cases throwing run time exception will kill the worker JVM itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    missed @satishd suggestions above. Once those addressed I am +1. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71719988
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    +    }
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if(cause instanceof MessageDroppedException) {
    --- End diff --
    
    should we not retry here? instead of calling ack


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @omkreddy I haven't had a chance to review it fully yet, but you can add me as a committer sponsor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71826704
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    +    }
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if(cause instanceof MessageDroppedException) {
    +                  synchronized (collector) {
    --- End diff --
    
    We may not need any synchronization here and at other places as well.
    
    [STORM-841](https://issues.apache.org/jira/browse/STORM-841) confirms it is threadsafe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72304984
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import com.metamx.tranquility.beam.Beam;
    +import com.twitter.util.Await;
    +import org.apache.storm.trident.state.State;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.collection.JavaConversions;
    +
    +import java.util.List;
    +
    +
    +/**
    + * Trident {@link State} implementation for Druid.
    + */
    +public class DruidBeamState<E> implements State  {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
    +
    +    Beam<E> beam = null;
    +
    +    public DruidBeamState(Beam<E> beam) {
    +        this.beam = beam;
    +    }
    +
    +    public void send(List<E> events ) {
    +        try {
    +            LOG.info("Sending %d events", events.size());
    +            Await.result(beam.sendBatch(JavaConversions.collectionAsScalaIterable(events).toList()));
    +        } catch (Exception e) {
    +            final String errorMsg = "Failed in writing messages to Druid";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    +        }
    +    }
    +
    +    public void close() {
    +        try {
    +            Await.result(beam.close());
    +        } catch (Exception e) {
    +            final String errorMsg = "Error while closing Druid beam client";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    +        }
    +    }
    +
    +    @Override
    +    public void beginCommit(Long txid) {
    --- End diff --
    
    @arunmahadevan I remember you recommended that we should make all trident states to implement beginCommit. Can you add details if we can do this here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71718749
  
    --- Diff: external/storm-druid/README.md ---
    @@ -0,0 +1,125 @@
    +# Storm Druid Bolt and TridentState
    +
    +This module provides core Storm and Trident bolt implementations for writing data to [Druid] (http://druid.io/) data store.
    +This implementation uses Druid's [Tranquility library] (https://github.com/druid-io/tranquility) to send messages to druid.
    +
    +Some of the implementation details are borrowed from existing [Tranquility Storm Bolt] (https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
    +This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
    +
    +### Core Bolt
    +Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
    +This Bolt expects to receive tuples in which the zeroth element is your event type.
    +
    +
    +```java
    +
    +    DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(new SampleDruidBeamFactoryImpl());
    +    topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
    +
    +```
    +
    +
    +### Trident State
    +
    +```java
    +
    +    final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
    +
    +    stream.peek(new Consumer() {
    +        @Override
    +        public void accept(TridentTuple input) {
    +             LOG.info("########### Received tuple: [{}]", input);
    +        }
    +    }).partitionPersist(new DruidBeamStateFactory(new SampleDruidBeamFactoryImpl()), new Fields("events"), new DruidBeamStateUpdater());
    +
    +```
    +
    +### Sample Beam Factory Implementation
    +Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
    +See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
    +For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
    +
    +```java
    +
    +public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
    +
    +    @Override
    +    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
    +
    +
    +        final String indexService = "druid/overlord"; // Your overlord's druid.service
    --- End diff --
    
    can you add some comments around these two.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    Thanks @omkreddy , pushed this change to master and 1.x-branch, added you as a contributor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72305132
  
    --- Diff: external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java ---
    @@ -0,0 +1,116 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.druid;
    +
    +import com.google.common.collect.ImmutableList;
    +import com.metamx.common.Granularity;
    +import com.metamx.tranquility.beam.Beam;
    +import com.metamx.tranquility.beam.ClusteredBeamTuning;
    +import com.metamx.tranquility.druid.DruidBeams;
    +import com.metamx.tranquility.druid.DruidDimensions;
    +import com.metamx.tranquility.druid.DruidLocation;
    +import com.metamx.tranquility.druid.DruidRollup;
    +import com.metamx.tranquility.typeclass.Timestamper;
    +import io.druid.data.input.impl.TimestampSpec;
    +import io.druid.granularity.QueryGranularities;
    +import io.druid.query.aggregation.AggregatorFactory;
    +import io.druid.query.aggregation.CountAggregatorFactory;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.CuratorFrameworkFactory;
    +import org.apache.curator.retry.ExponentialBackoffRetry;
    +import org.apache.storm.druid.bolt.DruidBeamFactory;
    +import org.apache.storm.task.IMetricsContext;
    +import org.joda.time.DateTime;
    +import org.joda.time.Period;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Druid bolt must be supplied with a BeamFactory. You can implement one of these using the
    + * [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala)
    + * "buildBeam()" method. See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
    + * For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
    + */
    +public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
    +    Map<String, Object> factoryConf = null;
    +
    +
    +    public SampleDruidBeamFactoryImpl(Map<String, Object> factoryConf) {
    +        this.factoryConf = factoryConf; // This can be used to pass config values
    +    }
    +
    +    @Override
    +    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
    +
    +
    +        final String indexService = "druid/overlord"; // Your overlord's druid.service
    --- End diff --
    
    should we make these as configurable options ? and alos there is no leading / here 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73535370
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import com.metamx.tranquility.beam.Beam;
    +import com.twitter.util.Await;
    +import org.apache.storm.trident.state.State;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.collection.JavaConversions;
    +
    +import java.util.List;
    +
    +
    +/**
    + * Trident {@link State} implementation for Druid.
    + */
    +public class DruidBeamState<E> implements State  {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
    +
    +    Beam<E> beam = null;
    +
    +    public DruidBeamState(Beam<E> beam) {
    +        this.beam = beam;
    +    }
    +
    +    public void send(List<E> events ) {
    +        try {
    +            LOG.info("Sending %d events", events.size());
    +            Await.result(beam.sendBatch(JavaConversions.collectionAsScalaIterable(events).toList()));
    +        } catch (Exception e) {
    +            final String errorMsg = "Failed in writing messages to Druid";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    +        }
    +    }
    +
    +    public void close() {
    +        try {
    +            Await.result(beam.close());
    +        } catch (Exception e) {
    +            final String errorMsg = "Error while closing Druid beam client";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    +        }
    +    }
    +
    +    @Override
    +    public void beginCommit(Long txid) {
    --- End diff --
    
    Druid Tranquility library itself  does not support exactly-once semantics and it is not key-value store. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73534734
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---
    @@ -0,0 +1,73 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.trident;
    +
    +import com.metamx.tranquility.beam.Beam;
    +import com.twitter.util.Await;
    +import org.apache.storm.trident.state.State;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.collection.JavaConversions;
    +
    +import java.util.List;
    +
    +
    +/**
    + * Trident {@link State} implementation for Druid.
    + */
    +public class DruidBeamState<E> implements State  {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);
    +
    +    Beam<E> beam = null;
    +
    +    public DruidBeamState(Beam<E> beam) {
    +        this.beam = beam;
    +    }
    +
    +    public void send(List<E> events ) {
    +        try {
    +            LOG.info("Sending %d events", events.size());
    +            Await.result(beam.sendBatch(JavaConversions.collectionAsScalaIterable(events).toList()));
    +        } catch (Exception e) {
    +            final String errorMsg = "Failed in writing messages to Druid";
    +            LOG.error(errorMsg, e);
    +            throw new RuntimeException(errorMsg);
    --- End diff --
    
    I Agree.  Druid library has internal retry mechanism. Now i not throwing  runtime exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73555706
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    --- End diff --
    
    @harshach I discussed this approach with @satishd  and I felt this way users can send whatever event type they want.  
    
    upmerged the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r72370968
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    --- End diff --
    
    we are looking at only the first value in tuple. What is the data format that we are expecting for the user to send. It might be confusing for the user if they send two values in the tuple but we only take the first one. May be we should mapper interface like hbase or hive bolts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73726170
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import org.apache.storm.tuple.ITuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Converts {@link ITuple} to Event
    + */
    +public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper {
    +    private static final Logger LOG = LoggerFactory.getLogger(TupleDruidEventMapper.class);
    +
    +    public static final String DEFAULT_FIELD_NAME = "event";
    +
    +    public TupleDruidEventMapper() {
    +    }
    +
    +    @Override
    +    public E getEvent(ITuple tuple) {
    +        return (E) tuple.getValueByField(DEFAULT_FIELD_NAME);
    --- End diff --
    
    You may want to take the field name in constructor and use it like `tuple.getValueByField(eventFieldName);`. Otherwise, user should always need to extend this class and override `getEvent(ITuple)` method. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @omkreddy you need to upmerge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    +1. Nice work @omkreddy .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73535652
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -0,0 +1,115 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.MessageDroppedException;
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +import com.twitter.util.Future;
    +import com.twitter.util.FutureEventListener;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichBolt;
    +import org.apache.storm.tuple.Tuple;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Map;
    +
    +/**
    + * Basic bolt implementation for storing data to Druid datastore.
    + * <p/>
    + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    + * to send to druid store.
    + * Some of the concepts are borrowed from Tranquility storm connector implementation.
    + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    + *
    + * This Bolt expects to receive tuples in which the zeroth element is your event type.
    + * <p/>
    + *
    + */
    +public class DruidBeamBolt<E> extends BaseRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);
    +
    +    private volatile  OutputCollector collector;
    +    private DruidBeamFactory<E> beamFactory = null;
    +    private int batchSize;
    +    private Tranquilizer<E> tranquilizer = null;
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory) {
    +        this(beamFactory, 2000);
    +    }
    +
    +    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, int batchSize) {
    +        this.beamFactory = beamFactory;
    +        this.batchSize = batchSize;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.collector = collector;
    +        tranquilizer = Tranquilizer.create(
    +                beamFactory.makeBeam(stormConf, context),
    +                batchSize,
    +                Tranquilizer.DefaultMaxPendingBatches(),
    +                Tranquilizer.DefaultLingerMillis());
    +        this.tranquilizer.start();
    +
    +    }
    +
    +    @Override
    +    public void execute(final Tuple tuple) {
    +      Future future = tranquilizer.send((E)tuple.getValue(0));
    +      future.addEventListener(new FutureEventListener() {
    +          @Override
    +          public void onFailure(Throwable cause) {
    +              if(cause instanceof MessageDroppedException) {
    --- End diff --
    
    implemented above discussed mechanism. now uses have option to process dropped/discarded messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r71718893
  
    --- Diff: external/storm-druid/README.md ---
    @@ -0,0 +1,125 @@
    +# Storm Druid Bolt and TridentState
    +
    +This module provides core Storm and Trident bolt implementations for writing data to [Druid] (http://druid.io/) data store.
    +This implementation uses Druid's [Tranquility library] (https://github.com/druid-io/tranquility) to send messages to druid.
    +
    +Some of the implementation details are borrowed from existing [Tranquility Storm Bolt] (https://github.com/druid-io/tranquility/blob/master/docs/storm.md).
    +This new Bolt added to support latest storm release and maintain the bolt in the storm repo.
    +
    +### Core Bolt
    +Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt`
    +This Bolt expects to receive tuples in which the zeroth element is your event type.
    +
    +
    +```java
    +
    +    DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(new SampleDruidBeamFactoryImpl());
    +    topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
    +
    +```
    +
    +
    +### Trident State
    +
    +```java
    +
    +    final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
    +
    +    stream.peek(new Consumer() {
    +        @Override
    +        public void accept(TridentTuple input) {
    +             LOG.info("########### Received tuple: [{}]", input);
    +        }
    +    }).partitionPersist(new DruidBeamStateFactory(new SampleDruidBeamFactoryImpl()), new Fields("events"), new DruidBeamStateUpdater());
    +
    +```
    +
    +### Sample Beam Factory Implementation
    +Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method.
    +See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
    +For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
    +
    +```java
    +
    +public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
    +
    +    @Override
    +    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
    +
    +
    +        final String indexService = "druid/overlord"; // Your overlord's druid.service
    +        final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
    +        final String dataSource = "test";
    --- End diff --
    
    same here why are we calling dataSource test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    @omkreddy I will merge it in tomorrow w.r.t 24hrs wait policy after last commit. Thanks for your work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1583: STORM-1979: Storm Druid Connector implementation.

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/1583
  
    This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
    to send to druid store. Some of the concepts are borrowed from Tranquility storm connector implementation (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
    
    @harshach pinging for review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1583: STORM-1979: Storm Druid Connector implementation.

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1583#discussion_r73574871
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.druid.bolt;
    +
    +import com.metamx.tranquility.tranquilizer.Tranquilizer;
    +
    +import java.io.Serializable;
    +
    +public class DruidConfig implements Serializable {
    +
    +    //Tranquilizer configs for DruidBeamBolt
    +    private int maxBatchSize;
    +    private int maxPendingBatches;
    +    private long lingerMillis;
    +    private boolean blockOnFull;
    +    private boolean enableDiscardStream;
    +    private String discardStreamId;
    +
    +    public int getMaxBatchSize() {
    +        return maxBatchSize;
    +    }
    +
    +    public int getMaxPendingBatches() {
    +        return maxPendingBatches;
    +    }
    +
    +    public long getLingerMillis() {
    +        return lingerMillis;
    +    }
    +
    +    public boolean isBlockOnFull() {
    +        return blockOnFull;
    +    }
    +
    +    public boolean isEnableDiscardStream() {
    +        return enableDiscardStream;
    +    }
    +
    +    public String getDiscardStreamId() {
    +        return discardStreamId;
    +    }
    +
    +    private DruidConfig(Builder builder) {
    +        this.maxBatchSize = builder.maxBatchSize;
    +        this.maxPendingBatches = builder.maxPendingBatches;
    +        this.lingerMillis = builder.lingerMillis;
    +        this.blockOnFull = builder.blockOnFull;
    +        this.enableDiscardStream = builder.enableDiscardStream;
    +        this.discardStreamId = builder.discardStreamId;
    +    }
    +
    +    public static DruidConfig.Builder newBuilder() {
    +        return new Builder();
    +    }
    +
    +    public static class Builder {
    +        private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
    +        private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
    +        private long lingerMillis = Tranquilizer.DefaultLingerMillis();
    +        private boolean blockOnFull =  Tranquilizer.DefaultBlockOnFull();
    +        private boolean enableDiscardStream = false;
    +        private String discardStreamId ="druid-discard-stream";
    --- End diff --
    
    "druid-discard-stream" can be declared as a public constant like DEFAULT_DISCARD_STREAM_ID in `DruidConfig` class, so that users can use thos stream id while building the topology. 
    
    I prefer to remove `enableDiscardStream` and have only `discardStreamId` which should be given by the user if user wants discarded events to be forwarded to that stream. This API makes it simpler to set only the discardStreamId. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---