You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by fhussonnois <gi...@git.apache.org> on 2015/10/28 12:21:22 UTC

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

GitHub user fhussonnois opened a pull request:

    https://github.com/apache/storm/pull/827

    STORM-1075 add external module storm-cassandra

    https://issues.apache.org/jira/browse/STORM-1075

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhussonnois/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/827.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #827
    
----
commit db148647bd28f16e796d2b6640657d9ea1adf3d2
Author: Florian Hussonnois <fl...@zenika.com>
Date:   2015-10-28T11:08:58Z

    STORM-1075 add external module storm-cassandra

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-158930102
  
    @satishd I think the fluent API handles common use cases. Its purpose is to ease the cassandra integration because creating a CQL statement from tuple is cumbersome. In case of specifics needs developers can still implement their own CQLStatementTupleMapper. So I think it will be bad to remove that API. In fact, there is a risk that developers that will use the connector have to re-develop a DSL.
    I'm not sure, but I think there is not a lot changes into cassandra connector API. DataStax seems to focus their developments onto the cassandra internal system and not onto query language support.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-158351858
  
    @fhussonnois I am working on Select statement and Clause query implementation as part of STORM-1211. You can incorporate remaining comments in your code or let me know if you have need any clarifications. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-153885605
  
    @satishd can you please review this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-161103792
  
    @harshach @satishd I added ignore annotation to skip these tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45364971
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.executor;
    +
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.Statement;
    +import com.google.common.util.concurrent.*;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +/**
    + * Service to asynchronously executes cassandra statements.
    + */
    +public class AsyncExecutor<T> implements Serializable {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(AsyncExecutor.class);
    +
    +    protected Session session;
    +
    +    protected ExecutorService executorService;
    +
    +    protected AsyncResultHandler<T> handler;
    +
    +    private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( );
    +
    +    /**
    +     * Creates a new {@link AsyncExecutor} instance.
    +     */
    +    protected AsyncExecutor(Session session, AsyncResultHandler<T> handler) {
    +        this(session, newSingleThreadExecutor(), handler);
    +    }
    +
    +    /**
    +     * Creates a new {@link AsyncExecutor} instance.
    +     *
    +     * @param session The cassandra session.
    +     * @param executorService The executor service responsible to execute handler.
    +     */
    +    private AsyncExecutor(Session session, ExecutorService executorService, AsyncResultHandler<T> handler) {
    +        this.session   = session;
    +        this.executorService = executorService;
    +        this.handler = handler;
    +    }
    +
    +    protected static ExecutorService newSingleThreadExecutor() {
    +        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build());
    +    }
    +
    +    /**
    +     * Asynchronously executes all statements associated to the specified input. The input will be passed to
    +     * the {@link #handler} once all queries succeed or failed.
    --- End diff --
    
    Yes there is an issue on the comment. This behavior is implemented with 
    http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/util/concurrent/Futures.html#allAsList(java.lang.Iterable)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45045556
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.Time;
    +import com.datastax.driver.core.Statement;
    +import org.apache.storm.cassandra.executor.AsyncResultHandler;
    +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
    +
    +    public static final int DEFAULT_EMIT_FREQUENCY = 2;
    +
    +    private static final int QUEUE_MAX_SIZE = 1000;
    +
    +    private LinkedBlockingQueue<Tuple> queue;
    +    
    +    private int tickFrequencyInSeconds;
    +    
    +    private long lastModifiedTimesMillis;
    +
    +    private String componentID;
    +
    +    private AsyncResultHandler<List<Tuple>> asyncResultHandler;
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
    +        this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
    +    }
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
    +        super(tupleMapper);
    +        this.tickFrequencyInSeconds = tickFrequencyInSeconds;
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        super.prepare(stormConfig, topologyContext, outputCollector);
    +        this.componentID = topologyContext.getThisComponentId();
    +        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    --- End diff --
    
    queue size should be configurable at bolt like withQueueSize or withBatchSize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159151876
  
    @fhussonnois lets use the cql as @satishd suggested and provide a mapper interface like Jdbc https://github.com/apache/storm/tree/master/external/storm-jdbc which can map tuple fields to table column. mapper interface used across all the connectors which will make it easier for users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by jnioche <gi...@git.apache.org>.
Github user jnioche commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45130329
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/CassandraConf.java ---
    @@ -0,0 +1,146 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.client;
    +
    +import com.datastax.driver.core.ConsistencyLevel;
    +import com.google.common.base.Objects;
    +
    +import java.io.Serializable;
    +import java.util.Map;
    +
    +/**
    + * Configuration used by cassandra storm components.
    + */
    +public class CassandraConf implements Serializable {
    +    
    +    public static final String CASSANDRA_USERNAME           = "cassandra.username";
    +    public static final String CASSANDRA_PASSWORD           = "cassandra.password";
    +    public static final String CASSANDRA_KEYSPACE           = "cassandra.keyspace";
    +    public static final String CASSANDRA_CONSISTENCY_LEVEL  = "cassandra.output.consistencyLevel";
    +    public static final String CASSANDRA_NODES              = "cassandra.nodes";
    +    public static final String CASSANDRA_PORT               = "cassandra.port";
    +    public static final String CASSANDRA_BATCH_SIZE_ROWS    = "cassandra.batch.size.rows";
    +
    +    /**
    +     * The authorized cassandra username.
    +     */
    +    private String username;
    +    /**
    +     * The authorized cassandra password
    +     */
    +    private String password;
    +    /**
    +     * The cassandra keyspace.
    +     */
    +    private String keyspace;
    +    /**
    +     * List of contacts nodes.
    +     */
    +    private String[] nodes = {"localhost"};
    +
    +    /**
    +     * The port used to connect to nodes.
    +     */
    +    private int port = 9092;
    +
    +    /**
    +     * Consistency level used to write statements.
    +     */
    +    private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
    +    /**
    +     * The maximal numbers of rows per batch.
    +     */
    +    private int batchSizeRows       = 100;
    +    
    +    /**
    +     * Creates a new {@link CassandraConf} instance.
    +     */
    +    public CassandraConf() {
    +        super();
    +    }
    +
    +    /**
    +     * Creates a new {@link CassandraConf} instance.
    +     *
    +     * @param conf The storm configuration.
    +     */
    +    public CassandraConf(Map<String, Object> conf) {
    +        this.username = getOrElse(conf, CASSANDRA_USERNAME, null);
    +        this.password = getOrElse(conf, CASSANDRA_PASSWORD, null);
    +        this.keyspace = get(conf, CASSANDRA_KEYSPACE);
    +        this.consistencyLevel = ConsistencyLevel.valueOf(getOrElse(conf, CASSANDRA_CONSISTENCY_LEVEL, ConsistencyLevel.ONE.name()));
    +        this.nodes    = getOrElse(conf, CASSANDRA_NODES, "localhost").split(",");
    +        this.batchSizeRows = getOrElse(conf, CASSANDRA_BATCH_SIZE_ROWS, 100);
    +        this.port = conf.get(CASSANDRA_PORT) != null ? Integer.valueOf((String)conf.get(CASSANDRA_PORT)) : 9042;
    +    }
    +
    +    public String getUsername() {
    +        return username;
    +    }
    +
    +    public String getPassword() {
    +        return password;
    +    }
    +
    +    public String getKeyspace() {
    +        return keyspace;
    +    }
    +
    +    public String[] getNodes() {
    +        return nodes;
    +    }
    +
    +    public ConsistencyLevel getConsistencyLevel() {
    +        return consistencyLevel;
    +    }
    +
    +    public int getBatchSizeRows() {
    +        return batchSizeRows;
    +    }
    +
    +    public int getPort() {
    +        return this.port;
    +    }
    +
    +    private <T> T get(Map<String, Object> conf, String key) {
    +        Object o = conf.get(key);
    +        if(o == null) {
    +            throw new IllegalArgumentException("No '" + key + "' value found in configuration!");
    +        }
    +        return (T)o;
    +    }
    +
    +    private <T> T getOrElse(Map<String, Object> conf, String key, T def) {
    --- End diff --
    
    That does pretty much the same as [https://github.com/apache/storm/blob/768a85926373355c15cc139fd86268916abc6850/storm-core/src/jvm/backtype/storm/utils/Utils.java#L344], doesn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r44860034
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.Time;
    +import com.datastax.driver.core.Statement;
    +import org.apache.storm.cassandra.executor.AsyncResultHandler;
    +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
    +
    +    public static final int DEFAULT_EMIT_FREQUENCY = 2;
    +
    +    private static final int QUEUE_MAX_SIZE = 1000;
    +
    +    private LinkedBlockingQueue<Tuple> queue;
    +    
    +    private int tickFrequencyInSeconds;
    +    
    +    private long lastModifiedTimesMillis;
    +
    +    private String componentID;
    +
    +    private AsyncResultHandler<List<Tuple>> asyncResultHandler;
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
    +        this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
    +    }
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
    +        super(tupleMapper);
    +        this.tickFrequencyInSeconds = tickFrequencyInSeconds;
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        super.prepare(stormConfig, topologyContext, outputCollector);
    +        this.componentID = topologyContext.getThisComponentId();
    +        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    +        this.lastModifiedTimesMillis = now();
    +    }
    +
    +    @Override
    +    protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
    +        if( asyncResultHandler == null) {
    +            asyncResultHandler = new BatchAsyncResultHandler(getResultHandler());
    +        }
    +        return asyncResultHandler;
    +    }
    +
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void process(Tuple input) {
    +        if( ! queue.offer(input) ) {
    +            LOG.info(logPrefix() + "The message queue is full, preparing batch statement...");
    +            prepareAndExecuteStatement();
    +            queue.add(input);
    +        }
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void tick() {
    --- End diff --
    
    can we rename this method other than tick.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159349948
  
    @fhussonnois looks good to me as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159341539
  
    @fhussonnois That was the idea, It looks good to me. I assume you still have field("stn_id").as("weatherstation_id") along with the APIs you mentioned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159350084
  
    @fhussonnois I already have some of those mappers code and trident related code with me. I can work on mappers and integrate with field-tuple/column fluent api. Can you refactor your API to remove insert/update builder and push those changes? I can take over from there to complete the remaining changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45070726
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.Time;
    +import com.datastax.driver.core.Statement;
    +import org.apache.storm.cassandra.executor.AsyncResultHandler;
    +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
    +
    +    public static final int DEFAULT_EMIT_FREQUENCY = 2;
    +
    +    private static final int QUEUE_MAX_SIZE = 1000;
    +
    +    private LinkedBlockingQueue<Tuple> queue;
    +    
    +    private int tickFrequencyInSeconds;
    +    
    +    private long lastModifiedTimesMillis;
    +
    +    private String componentID;
    +
    +    private AsyncResultHandler<List<Tuple>> asyncResultHandler;
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
    +        this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
    +    }
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
    +        super(tupleMapper);
    +        this.tickFrequencyInSeconds = tickFrequencyInSeconds;
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        super.prepare(stormConfig, topologyContext, outputCollector);
    +        this.componentID = topologyContext.getThisComponentId();
    +        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    +        this.lastModifiedTimesMillis = now();
    +    }
    +
    +    @Override
    +    protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
    +        if( asyncResultHandler == null) {
    +            asyncResultHandler = new BatchAsyncResultHandler(getResultHandler());
    +        }
    +        return asyncResultHandler;
    +    }
    +
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void process(Tuple input) {
    +        if( ! queue.offer(input) ) {
    +            LOG.info(logPrefix() + "The message queue is full, preparing batch statement...");
    +            prepareAndExecuteStatement();
    +            queue.add(input);
    +        }
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void tick() {
    --- End diff --
    
    what names should be used ? onTickTuple, onSystemTuple... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-161028584
  
    @fhussonnois @satishd I feel like the tests are flaky and I can't make them all pass
    latest run
    CassandraWriterBoltTest.shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields:40->executeAndAssertWith:56->BaseTopologyTest.runLocalTopologyAndWait:50 expected:<61234> but was:<95083> . 
    We can't really expect these to pass all the time when we have sleep(time) in unit tests.  
    
    and another run
    
     BatchCassandraWriterBoltTest.shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields:41->executeAndAssertWith:57->BaseTopologyTest.runLocalTopologyAndWait:50 expected:<11627> but was:<15791>
      CassandraWriterBoltTest.shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields:40->executeAndAssertWith:56->BaseTopologyTest.runLocalTopologyAndWait:50 expected:<66740> but was:<81138>
    
    
    @fhussonnois  lets remove these tests for now and we can address in another jira


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/827


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-158807186
  
    @fhussonnois I think we should accept only cql strings for now instead of giving a fluent API for building queries. This requires implementing all kinds of queries supported by cql just to map column names. Cassandra connector API should be agnostic about the cql by simply using datastax driver. This avoids implementing any new features being added in cql. We should support only simple/prepared statement builder. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by jnioche <gi...@git.apache.org>.
Github user jnioche commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45129715
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.Time;
    +import com.datastax.driver.core.Statement;
    +import org.apache.storm.cassandra.executor.AsyncResultHandler;
    +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
    +
    +    public static final int DEFAULT_EMIT_FREQUENCY = 2;
    +
    +    private static final int QUEUE_MAX_SIZE = 1000;
    +
    +    private LinkedBlockingQueue<Tuple> queue;
    +    
    +    private int tickFrequencyInSeconds;
    +    
    +    private long lastModifiedTimesMillis;
    +
    +    private String componentID;
    +
    +    private AsyncResultHandler<List<Tuple>> asyncResultHandler;
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
    +        this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
    +    }
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
    +        super(tupleMapper);
    +        this.tickFrequencyInSeconds = tickFrequencyInSeconds;
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        super.prepare(stormConfig, topologyContext, outputCollector);
    +        this.componentID = topologyContext.getThisComponentId();
    +        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    +        this.lastModifiedTimesMillis = now();
    +    }
    +
    +    @Override
    +    protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
    +        if( asyncResultHandler == null) {
    +            asyncResultHandler = new BatchAsyncResultHandler(getResultHandler());
    +        }
    +        return asyncResultHandler;
    +    }
    +
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void process(Tuple input) {
    +        if( ! queue.offer(input) ) {
    +            LOG.info(logPrefix() + "The message queue is full, preparing batch statement...");
    +            prepareAndExecuteStatement();
    +            queue.add(input);
    +        }
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void tick() {
    +        prepareAndExecuteStatement();
    +    }
    +
    +    public void prepareAndExecuteStatement() {
    +        int size = queue.size();
    +        if( size > 0 ) {
    +            List<Tuple> inputs = new ArrayList<>(size);
    +            queue.drainTo(inputs);
    +            try {
    +                List<PairStatementTuple> psl = buildStatement(inputs);
    +
    +                int sinceLastModified = updateAndGetSecondsSinceLastModified();
    +                LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified));
    --- End diff --
    
    Using SLF4J's {} placeholders would be more elegant and also efficient; see [http://www.slf4j.org/faq.html#logging_performance]


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-157326983
  
    STORM-1211 and STORM-1212 are filed for trident state/query support and synchronous bolt support. I have partially worked on these as part of this JIRA. I will update those JIRAs with pull requests once I have them ready.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159353605
  
    @satishd I think that work can be done in follow-up JIRAs after this PR is merged. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r44860048
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.Time;
    +import com.datastax.driver.core.Statement;
    +import org.apache.storm.cassandra.executor.AsyncResultHandler;
    +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
    +
    +    public static final int DEFAULT_EMIT_FREQUENCY = 2;
    +
    +    private static final int QUEUE_MAX_SIZE = 1000;
    +
    +    private LinkedBlockingQueue<Tuple> queue;
    +    
    +    private int tickFrequencyInSeconds;
    +    
    +    private long lastModifiedTimesMillis;
    +
    +    private String componentID;
    +
    +    private AsyncResultHandler<List<Tuple>> asyncResultHandler;
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
    +        this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
    +    }
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
    +        super(tupleMapper);
    +        this.tickFrequencyInSeconds = tickFrequencyInSeconds;
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        super.prepare(stormConfig, topologyContext, outputCollector);
    +        this.componentID = topologyContext.getThisComponentId();
    +        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    +        this.lastModifiedTimesMillis = now();
    +    }
    +
    +    @Override
    +    protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
    +        if( asyncResultHandler == null) {
    +            asyncResultHandler = new BatchAsyncResultHandler(getResultHandler());
    +        }
    +        return asyncResultHandler;
    +    }
    +
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void process(Tuple input) {
    +        if( ! queue.offer(input) ) {
    +            LOG.info(logPrefix() + "The message queue is full, preparing batch statement...");
    +            prepareAndExecuteStatement();
    +            queue.add(input);
    +        }
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void tick() {
    +        prepareAndExecuteStatement();
    +    }
    +
    +    public void prepareAndExecuteStatement() {
    +        int size = queue.size();
    +        if( size > 0 ) {
    +            List<Tuple> inputs = new ArrayList<>(size);
    +            queue.drainTo(inputs);
    +            try {
    +                List<PairStatementTuple> psl = buildStatement(inputs);
    +
    +                int sinceLastModified = updateAndGetSecondsSinceLastModified();
    +                LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified));
    +
    +                checkTimeElapsedSinceLastExec(sinceLastModified);
    +
    +                GroupingBatchBuilder batchBuilder = new GroupingBatchBuilder(cassandraConfConfig.getBatchSizeRows(), psl);
    +
    +                int batchSize = 0;
    +                for (PairBatchStatementTuples batch : batchBuilder) {
    +                    LOG.debug(logPrefix() + String.format("Writing data to %s in batches of %s rows.", cassandraConfConfig.getKeyspace(), batch.getInputs().size()));
    +                    getAsyncExecutor().execAsync(batch.getStatement(), batch.getInputs());
    +                    batchSize++;
    +                }
    +
    +                int pending = getAsyncExecutor().getPendingExec();
    +                if (pending > batchSize) {
    +                    LOG.warn( logPrefix() + String.format("Currently pending tasks is superior to the number of submit batches(%s) : %s", batchSize, pending));
    +                }
    +                
    +            } catch (Throwable r) {
    +                LOG.error(logPrefix() + "Error(s) occurred while preparing batch statements");
    +                getAsyncHandler().failure(r, inputs);
    +            }
    +        }
    +    }
    +
    +    private List<PairStatementTuple> buildStatement(List<Tuple> inputs) {
    +        List<PairStatementTuple> stmts = new ArrayList<>(inputs.size());
    +
    +        for(Tuple t : inputs) {
    +            List<Statement> sl = getMapper().map(stormConfig, session, t);
    +            for(Statement s : sl)
    +                stmts.add(new PairStatementTuple(t, s) );
    +        }
    +        return stmts;
    +    }
    +
    +    private void checkTimeElapsedSinceLastExec(int sinceLastModified) {
    +        if(sinceLastModified > tickFrequencyInSeconds)
    +            LOG.warn( logPrefix() + String.format("The time elapsed since last execution exceeded tick tuple frequency - %d > %d seconds", sinceLastModified, tickFrequencyInSeconds));
    +    }
    +
    +    private String logPrefix() {
    +        return componentID + " - ";
    +    }
    +
    +    public BatchCassandraWriterBolt withTickFrequency(long time, TimeUnit unit) {
    +        this.tickFrequencyInSeconds = (int)unit.toSeconds(time);
    +        return this;
    +    }
    +
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public Map<String, Object> getComponentConfiguration() {
    +        Config conf = new Config();
    +        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
    --- End diff --
    
    we should also have a default value that is less than topology.message.timeout.secs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45037220
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java ---
    @@ -0,0 +1,193 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.TupleUtils;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import org.apache.storm.cassandra.BaseExecutionResultHandler;
    +import org.apache.storm.cassandra.CassandraContext;
    +import org.apache.storm.cassandra.ExecutionResultHandler;
    +import org.apache.storm.cassandra.client.CassandraConf;
    +import org.apache.storm.cassandra.client.SimpleClient;
    +import org.apache.storm.cassandra.client.SimpleClientProvider;
    +import org.apache.storm.cassandra.executor.AsyncExecutor;
    +import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
    +import org.apache.storm.cassandra.executor.AsyncResultHandler;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +/**
    + * A base cassandra bolt.
    + *
    + * Default {@link backtype.storm.topology.base.BaseRichBolt}
    + */
    +public abstract class BaseCassandraBolt<T> extends BaseRichBolt {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseCassandraBolt.class);
    +
    +    protected OutputCollector outputCollector;
    +    
    +    protected SimpleClientProvider clientProvider;
    +    protected SimpleClient client;
    +    protected Session session;
    +    protected Map stormConfig;
    +
    +    protected CassandraConf cassandraConfConfig;
    +
    +    private CQLStatementTupleMapper mapper;
    +    private ExecutionResultHandler resultHandler;
    +
    +    transient private  Map<String, Fields> outputsFields = new HashMap<>();
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     * @param mapper
    +     */
    +    public BaseCassandraBolt(CQLStatementTupleMapper mapper, SimpleClientProvider clientProvider) {
    +        this.mapper = mapper;
    +        this.clientProvider = clientProvider;
    +    }
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     * @param tupleMapper
    +     */
    +    public BaseCassandraBolt(CQLStatementTupleMapper tupleMapper) {
    +        this(tupleMapper, new CassandraContext());
    +    }
    +
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        this.outputCollector = outputCollector;
    +        this.stormConfig = stormConfig;
    +        this.cassandraConfConfig = new CassandraConf(stormConfig);
    +        this.client = clientProvider.getClient(this.stormConfig);
    +        try {
    +            session = client.connect();
    +        } catch (NoHostAvailableException e) {
    +            outputCollector.reportError(e);
    +        }
    +    }
    +
    +    public BaseCassandraBolt withResultHandler(ExecutionResultHandler resultHandler) {
    +        this.resultHandler = resultHandler;
    +        return this;
    +    }
    +
    +    public BaseCassandraBolt withOutputFields(Fields fields) {
    +        this.outputsFields.put(null, fields);
    --- End diff --
    
    Not a good practice to pass null to a map. It seems this is passed to set fields for default stream. You should use backtype.storm.utils.Utils.DEFAULT_STREAM_ID as key instead of null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159331571
  
    @fhussonnois I think that sounds like a good approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45304403
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java ---
    @@ -0,0 +1,192 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.bolt;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.utils.Time;
    +import com.datastax.driver.core.Statement;
    +import org.apache.storm.cassandra.executor.AsyncResultHandler;
    +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class);
    +
    +    public static final int DEFAULT_EMIT_FREQUENCY = 2;
    +
    +    private static final int QUEUE_MAX_SIZE = 1000;
    +
    +    private LinkedBlockingQueue<Tuple> queue;
    +    
    +    private int tickFrequencyInSeconds;
    +    
    +    private long lastModifiedTimesMillis;
    +
    +    private String componentID;
    +
    +    private AsyncResultHandler<List<Tuple>> asyncResultHandler;
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) {
    +        this(tupleMapper, DEFAULT_EMIT_FREQUENCY);
    +    }
    +
    +    /**
    +     * Creates a new {@link CassandraWriterBolt} instance.
    +     *
    +     * @param tupleMapper
    +     */
    +    public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) {
    +        super(tupleMapper);
    +        this.tickFrequencyInSeconds = tickFrequencyInSeconds;
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) {
    +        super.prepare(stormConfig, topologyContext, outputCollector);
    +        this.componentID = topologyContext.getThisComponentId();
    +        this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE);
    +        this.lastModifiedTimesMillis = now();
    +    }
    +
    +    @Override
    +    protected AsyncResultHandler<List<Tuple>> getAsyncHandler() {
    +        if( asyncResultHandler == null) {
    +            asyncResultHandler = new BatchAsyncResultHandler(getResultHandler());
    +        }
    +        return asyncResultHandler;
    +    }
    +
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void process(Tuple input) {
    +        if( ! queue.offer(input) ) {
    +            LOG.info(logPrefix() + "The message queue is full, preparing batch statement...");
    +            prepareAndExecuteStatement();
    +            queue.add(input);
    +        }
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    protected void tick() {
    --- End diff --
    
    onTickTuple looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45037454
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.executor;
    +
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.Statement;
    +import com.google.common.util.concurrent.*;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +/**
    + * Service to asynchronously executes cassandra statements.
    + */
    +public class AsyncExecutor<T> implements Serializable {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(AsyncExecutor.class);
    +
    +    protected Session session;
    +
    +    protected ExecutorService executorService;
    +
    +    protected AsyncResultHandler<T> handler;
    +
    +    private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( );
    --- End diff --
    
    Do we really need a map for this? AtomicInteger may be sufficient for now. We can add map implementation if we need it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45037103
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java ---
    @@ -0,0 +1,123 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.client.impl;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.Host;
    +import com.datastax.driver.core.Metadata;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.exceptions.NoHostAvailableException;
    +import com.google.common.base.Preconditions;
    +import org.apache.storm.cassandra.client.SimpleClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Closeable;
    +import java.io.Serializable;
    +import java.util.Set;
    +
    +/**
    + * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance.
    + */
    +public class DefaultClient implements SimpleClient, Closeable, Serializable {
    +    
    +    private final static Logger LOG = LoggerFactory.getLogger(DefaultClient.class);
    +    
    +    private String keyspace;
    +
    +    private Cluster cluster;
    +    
    +    private Session session;
    +
    +    /**
    +     * Create a new {@link DefaultClient} instance.
    +     * 
    +     * @param cluster a cassandra cluster client.
    +     */
    +    public DefaultClient(Cluster cluster, String keyspace) {
    +        Preconditions.checkNotNull(cluster, "Cluster cannot be 'null");
    +        this.cluster = cluster;
    +        this.keyspace = keyspace;
    +        
    +    }
    +
    +    public Set<Host> getAllHosts() {
    +        Metadata metadata = getMetadata();
    +        return metadata.getAllHosts();
    +    }
    +
    +    public Metadata getMetadata() {
    +        return cluster.getMetadata();
    +    }
    +
    +
    +    private String getExecutorName() {
    +        Thread thread = Thread.currentThread();
    +        return thread.getName();
    +    }
    +    /**
    +     * {@inheritDoc}
    +     */
    +    @Override
    +    public synchronized Session connect() throws NoHostAvailableException {
    +        if( isDisconnected() ) {
    +            LOG.info(String.format("Connected to cluster: %s", cluster.getClusterName()));
    --- End diff --
    
    Can you replace String.format with LOG.info(String format, Object... arguments) as it does not build the string if the respective logging level is not enabled. It saves building strings unnecessarily. You may want to refactor it at other log usages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-160937526
  
    @harshach I can't reproduce the error. All tests pass successfully. Did you run tests after merging the branch ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45177772
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---
    @@ -0,0 +1,89 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra;
    +
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.grouping.CustomStreamGrouping;
    +import backtype.storm.task.WorkerTopologyContext;
    +import backtype.storm.topology.FailedException;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Lists;
    +import com.google.common.hash.Hashing;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + *
    + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
    + *
    + * This stream grouping may be used to optimise writes to Apache Cassandra.
    + */
    +public class Murmur3StreamGrouping implements CustomStreamGrouping {
    --- End diff --
    
    @fhussonnois Cassandra should be partitioning rows based on partition key on a row according to the configured partitioner. But here, the grouping is done based on the hash generated on tuple values which may contain values other than partition key. Can you elaborate on this?  Pl add more details in java doc. 
    
    It maybe helpful for users to have information about this grouping in ReadMe.md.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-158982184
  
    @fhussonnois  What I said is building insert or update query with where clause etc can be done by query builder apis or they can give cql statement. But mapping tuples/columns will still be done using fluent APIs. Users do not need to write their own tuple/column mappers and they can use user friendly fluent API.
    
    @harshach What is your opinion on this?
     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-161422378
  
    Thank you @harshach @satishd. I will start working on https://issues.apache.org/jira/browse/STORM-1348 as soon as possible. BTW is my name will be add to the contributors list ? ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-151854229
  
    Before seeing the details, 
    1. It would be better to use source/target version to 1.7. Storm changes its base JDK version to 1.7, but not yet 1.8. Did you use specific syntax or set of APIs provided to JDK 1.8? 
    2. Please remove author tag ("@author fhussonnois") from all comments.
    3. Please add storm-cassandra to storm-dist/binary/src/main/assembly/binary.xml. You can refer other modules to how to add.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45039507
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---
    @@ -0,0 +1,89 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra;
    +
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.grouping.CustomStreamGrouping;
    +import backtype.storm.task.WorkerTopologyContext;
    +import backtype.storm.topology.FailedException;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Lists;
    +import com.google.common.hash.Hashing;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + *
    + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
    + *
    + * This stream grouping may be used to optimise writes to Apache Cassandra.
    + */
    +public class Murmur3StreamGrouping implements CustomStreamGrouping {
    --- End diff --
    
    @harshach, there will be no performance issue using shuffle, field or murmur3. This strategy should be used with the BatchCassandraBolt in order to group all tuples that will be written into a same partition to the same storm task.
    
    In fact, to stream the tuples according to cassandra (data location) we need to retrieve ip addresses of each task within the CustomStreamGrouping. I don't know if that is possible ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-160877676
  
    @fhussonnois I am getting following errors in the tests can you take a look
    Tests in error: 
      BatchCassandraWriterBoltTest.shouldBatchInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields:41->executeAndAssertWith:57->BaseTopologyTest.runLocalTopologyAndWait:49 » IllegalState
      CassandraWriterBoltTest.shouldAsyncInsertGivenStaticTableNameAndDynamicQueryBuildFromAllTupleFields:40->executeAndAssertWith:56->BaseTopologyTest.runLocalTopologyAndWait:49 » IllegalState


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159153247
  
    We can use/refactor the existing fluent API for mappers as I mentioned in my earlier comment. Users can either set their own mappers or use the default mapper implemetation with fluent API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159475189
  
    @fhussonnois You may want to squash/rebase these 14 commits into 1 or 2 commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-160999243
  
    @harshach I ran multiple times but I could not reproduce the issue. Is your zookeeper already running before tests are run? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r44859970
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---
    @@ -0,0 +1,89 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra;
    +
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.grouping.CustomStreamGrouping;
    +import backtype.storm.task.WorkerTopologyContext;
    +import backtype.storm.topology.FailedException;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Lists;
    +import com.google.common.hash.Hashing;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + *
    + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
    + *
    + * This stream grouping may be used to optimise writes to Apache Cassandra.
    + */
    +public class Murmur3StreamGrouping implements CustomStreamGrouping {
    --- End diff --
    
    what happens if the users chose to use shuffle grouping will write perform badly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45036785
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java ---
    @@ -0,0 +1,71 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.client;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PlainTextAuthProvider;
    +import com.datastax.driver.core.QueryOptions;
    +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
    +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
    +import com.datastax.driver.core.policies.RoundRobinPolicy;
    +import com.datastax.driver.core.policies.TokenAwarePolicy;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.storm.cassandra.context.BaseBeanFactory;
    +
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Default interface to build cassandra Cluster from the a Storm Topology configuration.
    + */
    +public class ClusterFactory extends BaseBeanFactory<Cluster> {
    +
    +    /**
    +     * Creates a new Cluster based on the specified configuration.
    +     * @param stormConf the storm configuration.
    +     * @return a new a new {@link com.datastax.driver.core.Cluster} instance.
    +     */
    +    @Override
    +    protected Cluster make(Map<String, Object> stormConf) {
    +        CassandraConf cassandraConf = new CassandraConf(stormConf);
    +
    +        Cluster.Builder cluster = Cluster.builder()
    +                .withoutJMXReporting()
    +                .withoutMetrics()
    +                .addContactPoints(cassandraConf.getNodes())
    +                .withPort(cassandraConf.getPort())
    +                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
    --- End diff --
    
    DefaultRetryPolicy should be used as default policy which is conservative. DowngradingConsistencyRetryPolicy is kind of aggressive and this should not be the default setting. You may want to take this policy from conf.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159316692
  
    @satishd, @harshach ok perfect. So I will refactor API to remove Insert/Update builder. 
    Then I add a simple interface to map tuples to columns : 
    ```java
    public interface CqlMapper  extends Serializable {
        List<Column> getColumns(ITuple tuple);
    }
    ```
    In addition, I will implement a SimpleStatementMapper to build query as follows : 
    
    ```java
    new CassandraWriterBolt(
      async(
        simpleQuery("INSERT INTO weather.temperature (weatherstation_id, event_time, temperature) VALUES(?,?,?)"),with(field("weatherstation_id"), field("event_time").now(), field("temperature")));
      )
    );
    ```
    Same Bolt could be write with QueryBuilder : 
    ```java
    new CassandraWriterBolt(
      async(
    simple(
                    QueryBuilder.insertInto("weather", "temperature")
                            .value("weatherstation_id", "?")
                            .value("event_time", "?")
                            .value("temperature", "?")
                    ,
                    with(field("weatherstation_id"), field("event_time").now(), field("temperature")));
      )
    );
    ```
    What do you think about that ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-161135177
  
    Thanks for the contribution @fhussonnois and keeping up with reviews. I merged into master.
    Thanks @satishd for reviews.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-157723266
  
    @fhussonnois Can you add an example(or an integration test, which can be disabled for now)? It will be helpful for users to try Cassandra connector. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-156642074
  
    +1 for adding this.
    
    I've done a lot of work with Cassandra in the past (Back when the thrift API was the only game in town)
     and have always wanted to see first class support for it in Storm -- even if it is an optional component.
    
    Moving to the DS Java driver is clearly the only way to go.
    
    I'd be willing to act as a sponsor for this as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45037532
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.executor;
    +
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.Statement;
    +import com.google.common.util.concurrent.*;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +/**
    + * Service to asynchronously executes cassandra statements.
    + */
    +public class AsyncExecutor<T> implements Serializable {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(AsyncExecutor.class);
    +
    +    protected Session session;
    +
    +    protected ExecutorService executorService;
    +
    +    protected AsyncResultHandler<T> handler;
    +
    +    private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( );
    +
    +    /**
    +     * Creates a new {@link AsyncExecutor} instance.
    +     */
    +    protected AsyncExecutor(Session session, AsyncResultHandler<T> handler) {
    +        this(session, newSingleThreadExecutor(), handler);
    +    }
    +
    +    /**
    +     * Creates a new {@link AsyncExecutor} instance.
    +     *
    +     * @param session The cassandra session.
    +     * @param executorService The executor service responsible to execute handler.
    +     */
    +    private AsyncExecutor(Session session, ExecutorService executorService, AsyncResultHandler<T> handler) {
    +        this.session   = session;
    +        this.executorService = executorService;
    +        this.handler = handler;
    +    }
    +
    +    protected static ExecutorService newSingleThreadExecutor() {
    +        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build());
    +    }
    +
    +    /**
    +     * Asynchronously executes all statements associated to the specified input. The input will be passed to
    +     * the {@link #handler} once all queries succeed or failed.
    +     */
    +    public List<SettableFuture<T>> execAsync(List<Statement> statements, final T input) {
    +
    +        List<SettableFuture<T>> settableFutures = new ArrayList<>(statements.size());
    +
    +        for(Statement s : statements)
    +            settableFutures.add(execAsync(s, input, AsyncResultHandler.NO_OP_HANDLER));
    +
    +        ListenableFuture<List<T>> allAsList = Futures.allAsList(settableFutures);
    +        Futures.addCallback(allAsList, new FutureCallback<List<T>>(){
    +            @Override
    +            public void onSuccess(List<T> inputs) {
    +                handler.success(input);
    +            }
    +
    +            @Override
    +            public void onFailure(Throwable t) {
    +                handler.failure(t, input);
    +            }
    +        }, executorService);
    +        return settableFutures;
    +    }
    +
    +    /**
    +     * Asynchronously executes the specified batch statement. Inputs will be passed to
    +     * the {@link #handler} once query succeed or failed.
    +     */
    +    public SettableFuture<T> execAsync(final Statement statement, final T inputs) {
    +        return execAsync(statement, inputs, handler);
    +    }
    +    /**
    +     * Asynchronously executes the specified batch statement. Inputs will be passed to
    +     * the {@link #handler} once query succeed or failed.
    +     */
    +    public SettableFuture<T> execAsync(final Statement statement, final T inputs, final AsyncResultHandler<T> handler) {
    +        final SettableFuture<T> settableFuture = SettableFuture.create();
    +        pending.put(settableFuture, true);
    +        ResultSetFuture future = session.executeAsync(statement);
    +        Futures.addCallback(future, new FutureCallback<ResultSet>() {
    +            public void release() {
    +                pending.remove(settableFuture);
    +            }
    +
    +            @Override
    +            public void onSuccess(ResultSet result) {
    +                release();
    +                settableFuture.set(inputs);
    +                handler.success(inputs);
    +            }
    +
    +            @Override
    +            public void onFailure(Throwable t) {
    +                LOG.error(String.format("Failed to execute statement '%s' ", statement), t);
    +                release();
    +                settableFuture.setException(t);
    +                handler.failure(t, inputs);
    +            }
    +        }, executorService);
    +        return settableFuture;
    +    }
    +
    +    /**
    +     * Returns the number of currently executed tasks which are not yet completed.
    +     */
    +    public int getPendingExec( ) {
    --- End diff --
    
    Rename to getPendingTasksSize()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45302419
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---
    @@ -0,0 +1,89 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra;
    +
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.grouping.CustomStreamGrouping;
    +import backtype.storm.task.WorkerTopologyContext;
    +import backtype.storm.topology.FailedException;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Lists;
    +import com.google.common.hash.Hashing;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + *
    + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
    + *
    + * This stream grouping may be used to optimise writes to Apache Cassandra.
    + */
    +public class Murmur3StreamGrouping implements CustomStreamGrouping {
    --- End diff --
    
    It is better to take field names instead of indexes. You can get output declared fields by using the API: WorkerTopologyContext#getComponentOutputFields(GlobalStreamId id) in prepare method and store the respective indexes like below.
    
        public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> targetTasks) {
            this.targetTasks = targetTasks;
    
            partitionKeyIndexes = new ArrayList<>();
            Fields componentOutputFields = workerTopologyContext.getComponentOutputFields(globalStreamId);
            for (String partitionKeyName : partitionKeyNames) {
                partitionKeyIndexes.add(componentOutputFields.fieldIndex(partitionKeyName));
            }
        }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by jnioche <gi...@git.apache.org>.
Github user jnioche commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45128717
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---
    @@ -0,0 +1,89 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra;
    +
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.grouping.CustomStreamGrouping;
    +import backtype.storm.task.WorkerTopologyContext;
    +import backtype.storm.topology.FailedException;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Lists;
    +import com.google.common.hash.Hashing;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + *
    + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
    + *
    + * This stream grouping may be used to optimise writes to Apache Cassandra.
    + */
    +public class Murmur3StreamGrouping implements CustomStreamGrouping {
    --- End diff --
    
    Would it make sense to provide this class as part of the code module? It was mentioned in a [discussion on the user list](https://mail-archives.apache.org/mod_mbox/incubator-storm-user/201510.mbox/%3CCAC19-6AKWc8cH681muaQ1LsFkwiofQhaffjKESfU04LpVoRcow@mail.gmail.com%3E) not long ago and it sounded like it gives better results in same cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-156714120
  
    @fhussonnois sorry for the delay in reviewing this. I've few minor nitpicks overall it looks good to me. 
    @satishd also worked on similar connector, I would like to get his reviews in. I am also willing to act as sponsor. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-152778819
  
    I have updated the PR with the remarks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-161453646
  
    @fhussonnois Sorry about missing that thing. I just added you as contributor to README.md. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by fhussonnois <gi...@git.apache.org>.
Github user fhussonnois commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45238310
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java ---
    @@ -0,0 +1,89 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra;
    +
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.grouping.CustomStreamGrouping;
    +import backtype.storm.task.WorkerTopologyContext;
    +import backtype.storm.topology.FailedException;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.Lists;
    +import com.google.common.hash.Hashing;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutputStream;
    +import java.io.IOException;
    +import java.util.List;
    +
    +/**
    + *
    + * Simple {@link backtype.storm.grouping.CustomStreamGrouping} that uses Murmur3 algorithm to choose the target task of a tuple.
    + *
    + * This stream grouping may be used to optimise writes to Apache Cassandra.
    + */
    +public class Murmur3StreamGrouping implements CustomStreamGrouping {
    --- End diff --
    
    Yes, that's right! Actually, I didn't even noticed there is no method like this on component : customGrouping(componentId, customStreamGrouping, fields...).
    
    So, one solution could be to pass the indexes of the the partition keys as follows : 
    myBolt.customGrouping("comp", new Murmur3StreamGrouping(0,1)) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-161002334
  
    @fhussonnois @satishd Yes after merging the branch. I am running mvn clean package inside storm-cassandra or at the top level.
    @satishd what you mean my zookeeper running before tests ran? is that a requirement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45166030
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---
    @@ -0,0 +1,152 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.executor;
    +
    +import com.datastax.driver.core.ResultSet;
    +import com.datastax.driver.core.ResultSetFuture;
    +import com.datastax.driver.core.Session;
    +import com.datastax.driver.core.Statement;
    +import com.google.common.util.concurrent.*;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +
    +/**
    + * Service to asynchronously executes cassandra statements.
    + */
    +public class AsyncExecutor<T> implements Serializable {
    +
    +    private final static Logger LOG = LoggerFactory.getLogger(AsyncExecutor.class);
    +
    +    protected Session session;
    +
    +    protected ExecutorService executorService;
    +
    +    protected AsyncResultHandler<T> handler;
    +
    +    private Map<SettableFuture<T>, Boolean> pending = new ConcurrentHashMap<>( );
    +
    +    /**
    +     * Creates a new {@link AsyncExecutor} instance.
    +     */
    +    protected AsyncExecutor(Session session, AsyncResultHandler<T> handler) {
    +        this(session, newSingleThreadExecutor(), handler);
    +    }
    +
    +    /**
    +     * Creates a new {@link AsyncExecutor} instance.
    +     *
    +     * @param session The cassandra session.
    +     * @param executorService The executor service responsible to execute handler.
    +     */
    +    private AsyncExecutor(Session session, ExecutorService executorService, AsyncResultHandler<T> handler) {
    +        this.session   = session;
    +        this.executorService = executorService;
    +        this.handler = handler;
    +    }
    +
    +    protected static ExecutorService newSingleThreadExecutor() {
    +        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("cassandra-async-handler-%d").build());
    +    }
    +
    +    /**
    +     * Asynchronously executes all statements associated to the specified input. The input will be passed to
    +     * the {@link #handler} once all queries succeed or failed.
    --- End diff --
    
    The input will be passed to handler#onSuccess once all queries succeed or to handler#onFailure if any one of them fails. Is that right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/827#discussion_r45036898
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/ClusterFactory.java ---
    @@ -0,0 +1,71 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.client;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.PlainTextAuthProvider;
    +import com.datastax.driver.core.QueryOptions;
    +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
    +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
    +import com.datastax.driver.core.policies.RoundRobinPolicy;
    +import com.datastax.driver.core.policies.TokenAwarePolicy;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.storm.cassandra.context.BaseBeanFactory;
    +
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Default interface to build cassandra Cluster from the a Storm Topology configuration.
    + */
    +public class ClusterFactory extends BaseBeanFactory<Cluster> {
    +
    +    /**
    +     * Creates a new Cluster based on the specified configuration.
    +     * @param stormConf the storm configuration.
    +     * @return a new a new {@link com.datastax.driver.core.Cluster} instance.
    +     */
    +    @Override
    +    protected Cluster make(Map<String, Object> stormConf) {
    +        CassandraConf cassandraConf = new CassandraConf(stormConf);
    +
    +        Cluster.Builder cluster = Cluster.builder()
    +                .withoutJMXReporting()
    +                .withoutMetrics()
    +                .addContactPoints(cassandraConf.getNodes())
    +                .withPort(cassandraConf.getPort())
    +                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
    +                .withReconnectionPolicy(new ExponentialReconnectionPolicy(100L, TimeUnit.MINUTES.toMillis(1)))
    --- End diff --
    
    Better not to have hard coded timeout values. Create respective properties in config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-161006346
  
    I ran "mvn clean install" in storm root directory and it worked fine. I had seen some intermittent failures when zookeeper was already running while running "mvn clean install" in storm because of unexpected state in zookeeper. It is not mandatory to have zookeeper running but it is better not to be running while tests are run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1075 add external module storm-cassandra

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on the pull request:

    https://github.com/apache/storm/pull/827#issuecomment-159377762
  
    Created https://issues.apache.org/jira/browse/STORM-1348 for refactoring API to remove insert/update builder.
    
    +1, these changes look good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---