You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Gregory Melekh <gm...@interwise.com> on 2017/12/24 12:41:04 UTC

Flink 1.3 -- get data from Cassandra as generic ResultSet and convert it to DataSet

Hello Dev team.

I have StreamExecutionEnvironment job that consumes from kafka simple cql select queries. I try to handle this queries asynchronically using following code:

public class GenericCassandraReader extends RichAsyncFunction {
private static final Logger logger = LoggerFactory.getLogger(GenericCassandraReader.class);
private ExecutorService executorService;

private final Properties props;
private Session client;

public ExecutorService getExecutorService() {
    return executorService;
}

public GenericCassandraReader(Properties props, ExecutorService executorService) {
    super();
    this.props = props;
    this.executorService = executorService;
}

@Override
public void open(Configuration parameters) throws Exception {
    client = Cluster.builder().addContactPoint(props.getProperty("cqlHost"))
            .withPort(Integer.parseInt(props.getProperty("cqlPort"))).build()
            .connect(props.getProperty("keyspace"));

}

@Override
public void close() throws Exception {
    client.close();
    synchronized (GenericCassandraReader.class) {
        try {
            if (!getExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS)) {
                getExecutorService().shutdownNow();
            }
        } catch (InterruptedException e) {
            getExecutorService().shutdownNow();
        }
    }
}

@Override
public void asyncInvoke(final UserDefinedType input, final AsyncCollector<ResultSet> asyncCollector) throws Exception {
    getExecutorService().submit(new Runnable() {
        @Override
        public void run() {
            ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(input.query);

            Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {

                public void onSuccess(ResultSet resultSet) {
                    asyncCollector.collect(Collections.singleton(resultSet));
                }

                public void onFailure(Throwable t) {
                    asyncCollector.collect(t);
                }
            });
        }
    });
}
}

each response of this code provides Cassandra ResultSet with different amount of fields .
Any Ideas for handling Cassandra ResultSet in Flink or should I use another technics to reach my goal ?
Thanks for any help in advance!


Re: Flink 1.3 -- get data from Cassandra as generic ResultSet and convert it to DataSet

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Gregory,

1) There is no special support for ResultSet in Flink, so you have to
manually parse it and emit records of the type you want.

2) In your asyncInvoke you don't need to submit to the executorService
since the client is already async and returns a future. You can use
the code in run() directly.

– Ufuk


On Sun, Dec 24, 2017 at 1:41 PM, Gregory Melekh <gm...@interwise.com> wrote:
> Hello Dev team.
>
> I have StreamExecutionEnvironment job that consumes from kafka simple cql select queries. I try to handle this queries asynchronically using following code:
>
> public class GenericCassandraReader extends RichAsyncFunction {
> private static final Logger logger = LoggerFactory.getLogger(GenericCassandraReader.class);
> private ExecutorService executorService;
>
> private final Properties props;
> private Session client;
>
> public ExecutorService getExecutorService() {
>     return executorService;
> }
>
> public GenericCassandraReader(Properties props, ExecutorService executorService) {
>     super();
>     this.props = props;
>     this.executorService = executorService;
> }
>
> @Override
> public void open(Configuration parameters) throws Exception {
>     client = Cluster.builder().addContactPoint(props.getProperty("cqlHost"))
>             .withPort(Integer.parseInt(props.getProperty("cqlPort"))).build()
>             .connect(props.getProperty("keyspace"));
>
> }
>
> @Override
> public void close() throws Exception {
>     client.close();
>     synchronized (GenericCassandraReader.class) {
>         try {
>             if (!getExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS)) {
>                 getExecutorService().shutdownNow();
>             }
>         } catch (InterruptedException e) {
>             getExecutorService().shutdownNow();
>         }
>     }
> }
>
> @Override
> public void asyncInvoke(final UserDefinedType input, final AsyncCollector<ResultSet> asyncCollector) throws Exception {
>     getExecutorService().submit(new Runnable() {
>         @Override
>         public void run() {
>             ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(input.query);
>
>             Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
>
>                 public void onSuccess(ResultSet resultSet) {
>                     asyncCollector.collect(Collections.singleton(resultSet));
>                 }
>
>                 public void onFailure(Throwable t) {
>                     asyncCollector.collect(t);
>                 }
>             });
>         }
>     });
> }
> }
>
> each response of this code provides Cassandra ResultSet with different amount of fields .
> Any Ideas for handling Cassandra ResultSet in Flink or should I use another technics to reach my goal ?
> Thanks for any help in advance!
>