You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by mkoch1 <gi...@git.apache.org> on 2016/11/16 20:16:21 UTC

[GitHub] storm pull request #1781: STORM-1369: Add MapState implementation to storm-c...

GitHub user mkoch1 opened a pull request:

    https://github.com/apache/storm/pull/1781

    STORM-1369: Add MapState implementation to storm-cassandra.

    STORM-1369: Add MapState implementation to storm-cassandra.
    
    https://issues.apache.org/jira/browse/STORM-1369

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ef-labs/storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1781.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1781
    
----
commit 18014c3212b23f1d91b22d0717700574d4b75e05
Author: Magnus Koch <ma...@ef.com>
Date:   2016-11-16T20:13:31Z

    Feature/storm 1369 on master (#1)
    
    STORM-1369: Add MapState implementation to storm-cassandra.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    @mkoch1 Could you rebase this? After rebasing I'll merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1781: STORM-1369: Add MapState implementation to storm-c...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/1781


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1781: STORM-1369: Add MapState implementation to storm-c...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1781#discussion_r88945391
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.trident.state;
    +
    +import com.datastax.driver.core.*;
    +import com.google.common.base.Preconditions;
    +import org.apache.storm.cassandra.client.SimpleClient;
    +import org.apache.storm.cassandra.client.SimpleClientProvider;
    +import org.apache.storm.cassandra.executor.AsyncExecutor;
    +import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
    +import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.apache.storm.topology.FailedException;
    +import org.apache.storm.trident.state.*;
    +import org.apache.storm.trident.state.map.IBackingMap;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.ITuple;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.*;
    +
    +/**
    + * An IBackingState implementation for Cassandra.
    + *
    + * The implementation stores state as a binary blob in cassandra using a {@link Serializer}.
    + * It supports Opaque, Transactional and NonTransactional states, given a matching serializer.
    + *
    + * Configuration is done with three separate constructs:
    + *  - One tuple mapper for multiGet, which should map keys to a select statement and return {@link Values}.
    + *  - One state mapper, which maps the state to/from a {@link Values} representation, which is used for binding.
    + *  - One tuple mapper for multiPut, which should map {@link Values} to an INSERT or UPDATE statement.
    + *
    + * {@link #multiPut(List, List)} uses Cassandra batch statements (if so configured).
    + * {@link #multiGet(List)} queries Cassandra asynchronously, with a default parallelism of 500.
    + *
    + * @param <T>
    + */
    +public class CassandraBackingMap<T> implements IBackingMap<T> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(CassandraBackingMap.class);
    +
    +    private final Map conf;
    +    private final Options<T> options;
    +    private final Fields allFields;
    +
    +    private SimpleClient client;
    +    private Session session;
    +    private AyncCQLResultSetValuesMapper cqlResultSetValuesMapper;
    +    private AsyncExecutor executor;
    +
    +
    +    protected CassandraBackingMap(Map conf, Options<T> options) {
    +        this.conf = conf;
    +        this.options = options;
    +        List<String> allFields = options.keyFields.toList();
    +        allFields.addAll(options.stateMapper.getStateFields().toList());
    +        this.allFields = new Fields(allFields);
    +    }
    +
    +    @Override
    +    public List<T> multiGet(List<List<Object>> keys) {
    +        LOG.info("multiGet fetching {} values.", keys.size());
    +        List<Statement> selects = new ArrayList<>();
    +        List<ITuple> keyTuples = new ArrayList<>();
    +
    +        for (int i = 0; i < keys.size(); i++) {
    +            SimpleTuple keyTuple = new SimpleTuple(options.keyFields, keys.get(i));
    +            List<Statement> mappedStatements = options.multiGetCqlStatementMapper.map(conf, session, keyTuple);
    +            if (mappedStatements.size() > 1) {
    +                throw new IllegalArgumentException("Only one statement per map state item is supported.");
    +            }
    +            selects.add(mappedStatements.size() == 1 ? mappedStatements.get(0) : null);
    +            keyTuples.add(keyTuple);
    +        }
    +
    +        List<List<Values>> batchRetrieveResult = cqlResultSetValuesMapper
    +                .map(session, selects, keyTuples);
    +
    +        List<T> states = new ArrayList<>();
    +        for (List<Values> values : batchRetrieveResult) {
    +            T state = (T) options.stateMapper.fromValues(values);
    +            states.add(state);
    +        }
    +
    +        return states;
    +
    +    }
    +
    +    @Override
    +    public void multiPut(List<List<Object>> keys, List<T> values) {
    +        LOG.info("multiPut writing {} values.", keys.size());
    --- End diff --
    
    debug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by mkoch1 <gi...@git.apache.org>.
Github user mkoch1 commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    Squashed. I don't really see how this should cause any conflicts though. The only changes in existing code are a couple of namespace corrections (to avoid wildcard imports, as suggested earlier) and documentation. Other than that, it's all new code in new classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    Unfortunately this patch uses Cassandra-unit, which is LGPL licensed, so in it's current state we can't accept it. For that reason I am -1. We've run into this before, and I've lobbied the Cassandra-unit author to switch to an Apache compatible license (https://github.com/jsevellec/cassandra-unit/issues/163) to no avail.
    
    We can, however, accept it if the Cassandra-unit dependency and tests that depend on it are removed.
    
    Again, it's unfortunate. If more people encourage the Cassandra-unit author to choose a different license, perhaps he'll consider it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1781: STORM-1369: Add MapState implementation to storm-c...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1781#discussion_r88944857
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---
    @@ -138,6 +139,109 @@ public void onFailure(Throwable t) {
         }
     
         /**
    +     * Asynchronously executes the specified select statements. Results will be passed to the {@link AsyncResultSetHandler}
    +     * once each query has succeed or failed.
    +     */
    +    public SettableFuture<List<T>> execAsync(final List<Statement> statements, final List<T> inputs, Integer maxParallelRequests, final AsyncResultSetHandler<T> handler) {
    +
    +        final SettableFuture<List<T>> settableFuture = SettableFuture.create();
    +        final AsyncContext<T> asyncContext = new AsyncContext<>(inputs, maxParallelRequests, settableFuture);
    +
    +        for (int i = 0; i < statements.size(); i++) {
    +
    +            // Acquire a slot
    +            if (asyncContext.acquire()) {
    +
    +
    +                try {
    +                    pending.incrementAndGet();
    +                    final int statementIndex = i;
    +                    final T input = inputs.get(i);
    +                    final Statement statement = statements.get(i);
    +                    ResultSetFuture future = session.executeAsync(statement);
    +                    Futures.addCallback(future, new FutureCallback<ResultSet>() {
    +                        @Override
    +                        public void onSuccess(ResultSet result) {
    +                            try {
    +                                handler.success(input, result);
    +                            } catch (Throwable throwable) {
    +                                asyncContext.exception(throwable);
    +                            } finally {
    +                                pending.decrementAndGet();
    +                                asyncContext.release();
    +                            }
    +                        }
    +
    +                        @Override
    +                        public void onFailure(Throwable throwable) {
    +                            handler.failure(throwable, input);
    +                            asyncContext
    +                                    .exception(throwable)
    +                                    .release();
    +                            LOG.error(String.format("Failed to execute statement '%s' ", statement), throwable);
    --- End diff --
    
    do we need a `pending.decrementAndGet();` here too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    @mkoch1 
    Could you squash commits into one if you don't mind? I know it's more than 2000 lines change but looks like separating commits are not needed.
    And I would like to wait for 1.1.0 RC vote to pass to avoid any conflicts with the release. That says, this is not going to be included to 1.1.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by mkoch1 <gi...@git.apache.org>.
Github user mkoch1 commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    Pls note that mutliPut needs an update - will push commit today or tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1781: STORM-1369: Add MapState implementation to storm-c...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1781#discussion_r88944445
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---
    @@ -23,16 +23,16 @@
     import com.datastax.driver.core.Session;
     import com.datastax.driver.core.Statement;
     import com.google.common.util.concurrent.*;
    +import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
    +import org.apache.storm.topology.FailedException;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.Serializable;
    -import java.util.ArrayList;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.ConcurrentHashMap;
    +import java.util.*;
    --- End diff --
    
    In general we prefer to not use the wildcard on includes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by mkoch1 <gi...@git.apache.org>.
Github user mkoch1 commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    Rebase is done, sorry for the delay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by mkoch1 <gi...@git.apache.org>.
Github user mkoch1 commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    That should do it - the batching option has been removed from MapState, in favor of parallel processing with existing opaque and transactional logic to handle consistency. Cassandra batch statements are more trouble than they're worth.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1781: STORM-1369: Add MapState implementation to storm-c...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1781#discussion_r88945322
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/trident/state/CassandraBackingMap.java ---
    @@ -0,0 +1,232 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.storm.cassandra.trident.state;
    +
    +import com.datastax.driver.core.*;
    +import com.google.common.base.Preconditions;
    +import org.apache.storm.cassandra.client.SimpleClient;
    +import org.apache.storm.cassandra.client.SimpleClientProvider;
    +import org.apache.storm.cassandra.executor.AsyncExecutor;
    +import org.apache.storm.cassandra.executor.AsyncExecutorProvider;
    +import org.apache.storm.cassandra.query.AyncCQLResultSetValuesMapper;
    +import org.apache.storm.cassandra.query.CQLStatementTupleMapper;
    +import org.apache.storm.topology.FailedException;
    +import org.apache.storm.trident.state.*;
    +import org.apache.storm.trident.state.map.IBackingMap;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.ITuple;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.Serializable;
    +import java.util.*;
    +
    +/**
    + * An IBackingState implementation for Cassandra.
    + *
    + * The implementation stores state as a binary blob in cassandra using a {@link Serializer}.
    + * It supports Opaque, Transactional and NonTransactional states, given a matching serializer.
    + *
    + * Configuration is done with three separate constructs:
    + *  - One tuple mapper for multiGet, which should map keys to a select statement and return {@link Values}.
    + *  - One state mapper, which maps the state to/from a {@link Values} representation, which is used for binding.
    + *  - One tuple mapper for multiPut, which should map {@link Values} to an INSERT or UPDATE statement.
    + *
    + * {@link #multiPut(List, List)} uses Cassandra batch statements (if so configured).
    + * {@link #multiGet(List)} queries Cassandra asynchronously, with a default parallelism of 500.
    + *
    + * @param <T>
    + */
    +public class CassandraBackingMap<T> implements IBackingMap<T> {
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(CassandraBackingMap.class);
    +
    +    private final Map conf;
    +    private final Options<T> options;
    +    private final Fields allFields;
    +
    +    private SimpleClient client;
    +    private Session session;
    +    private AyncCQLResultSetValuesMapper cqlResultSetValuesMapper;
    +    private AsyncExecutor executor;
    +
    +
    +    protected CassandraBackingMap(Map conf, Options<T> options) {
    +        this.conf = conf;
    +        this.options = options;
    +        List<String> allFields = options.keyFields.toList();
    +        allFields.addAll(options.stateMapper.getStateFields().toList());
    +        this.allFields = new Fields(allFields);
    +    }
    +
    +    @Override
    +    public List<T> multiGet(List<List<Object>> keys) {
    +        LOG.info("multiGet fetching {} values.", keys.size());
    --- End diff --
    
    This should probably be debug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by mkoch1 <gi...@git.apache.org>.
Github user mkoch1 commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    Did not notice that, sorry. Replaced now with an external resource implementation. The code is not all that complex so it's not a big deal. 
    
    One thing I should point out: I borrowed the base implementation from a different project, also under Apache 2.0 license. I've included their copyright line in the file: https://github.com/apache/storm/pull/1781/commits/e46496337af84a610ef742c8cf5988d8dab19e47#diff-a8ce0c2f5e3ce0193d3ebc49f54301fa
    
    Original code: https://github.com/spring-projects/spring-insight-plugins/blob/master/collection-plugins/cassandra12/src/test/java/com/springsource/insight/plugin/cassandra/embeded/EmbeddedCassandraService.java
    
    Please let me know if there are any issues with this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #1781: STORM-1369: Add MapState implementation to storm-cassandr...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the issue:

    https://github.com/apache/storm/pull/1781
  
    @mkoch1 Thanks for addressing the license issue. The way you handled the borrowed code looks fine, the important part is to leave the original copyright statement, which you did. Thanks for pointing that out.
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #1781: STORM-1369: Add MapState implementation to storm-c...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1781#discussion_r88944588
  
    --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---
    @@ -138,6 +139,109 @@ public void onFailure(Throwable t) {
         }
     
         /**
    +     * Asynchronously executes the specified select statements. Results will be passed to the {@link AsyncResultSetHandler}
    +     * once each query has succeed or failed.
    +     */
    +    public SettableFuture<List<T>> execAsync(final List<Statement> statements, final List<T> inputs, Integer maxParallelRequests, final AsyncResultSetHandler<T> handler) {
    +
    +        final SettableFuture<List<T>> settableFuture = SettableFuture.create();
    +        final AsyncContext<T> asyncContext = new AsyncContext<>(inputs, maxParallelRequests, settableFuture);
    +
    +        for (int i = 0; i < statements.size(); i++) {
    +
    +            // Acquire a slot
    +            if (asyncContext.acquire()) {
    +
    +
    --- End diff --
    
    nit: don't need the extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---