You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@cassandra.apache.org by Vinod Joseph <vi...@gmail.com> on 2014/11/25 10:11:08 UTC

Fwd: Issues in moving data from cassandra to elasticsearch in java.

Hi,

            I am working on a java plugin which moves data from cassandra
to elasticsearch. This plugin must run in the server for every 5 seconds.
The data is getting moved, but the issue is that every time the plugin
runs(ie after every 5 seconds) all the data, including data which has been
already moved into elasticsearch in the previous iteration is moving to it.
So we are having duplicate values in the elastic search. How to avoid this
problem.

We are using this plugin to manage logs which are generated during any
online transaction. So we will be having millions of transactions.
Following is the table schema.

CREATE TABLE logs (
  txn_id text,
  logged_at timestamp,
  des text,
  key_name text,
  params text,
  PRIMARY KEY (txn_id, logged_at)
)

The txn_id is a 16 digit number and is not unique. It is a combination of 6
random numbers generated using a random function, followed by the epoch
timestamp in millisec(10 digits).

I want to move only the data which has been generated after the previous
transaction and not the data which was already moved in the previous
transaction.
I tried to do it with static values, counter variables, comparing the
write_time of each row and order by. Still its not working . Please suggest
me any ideas.


Thanks and regards
vinod joseph
8050136948

Re: Issues in moving data from cassandra to elasticsearch in java.

Posted by William Arbaugh <wa...@cs.umd.edu>.
Sounds like you're trying to use C* as a message broker. Perhaps try using Kafka or RabbitMQ as a front-end. Then have two subscribers - one pulls and places into elasticsearch and the other inserts into C*.

Yes it is more complex front-end, but it will give you the functionality you want.

> On Nov 25, 2014, at 10:14 AM, Eric Stevens <mi...@gmail.com> wrote:
> 
> Consider adding log_bucket timestamp, and then indexing that column.  Your data loader can SELECT * FROM logs WHERE log_bucket = ?.  The value you supply there would be the timestamp log bucket you're processing - in your case logged_at % 5.
> 
> However, I'll caution against writing data to Cassandra and then trying to reliably read it back immediately after.  You're likely to miss values this way due to eventual consistency unless you read at CL_ALL.  But then your data loader will break whenever you have any node offline.  Writing then immediately reading data is a typical antipattern in any eventually consistent system.
> 
> If using DataStax Java Driver you can use CL_ALL with DowngradingConsistencyRetryPolicy and you would at least strike a nice balance between reasonably strong consistency and loss of resiliency from CL_ALL (but when you have a node offline, your load process may get significantly slower).  This would mitigate but not eliminate the antipattern.
> 
> 
> On Tue Nov 25 2014 at 2:11:36 AM Vinod Joseph <vi...@gmail.com> wrote:
> Hi,
> 
>             I am working on a java plugin which moves data from cassandra to elasticsearch. This plugin must run in the server for every 5 seconds. The data is getting moved, but the issue is that every time the plugin runs(ie after every 5 seconds) all the data, including data which has been already moved into elasticsearch in the previous iteration is moving to it. So we are having duplicate values in the elastic search. How to avoid this problem. 
> 
> We are using this plugin to manage logs which are generated during any online transaction. So we will be having millions of transactions. 
> Following is the table schema.
> 
> CREATE TABLE logs (
>   txn_id text,
>   logged_at timestamp,
>   des text,
>   key_name text,
>   params text,
>   PRIMARY KEY (txn_id, logged_at)
> )
> 
> The txn_id is a 16 digit number and is not unique. It is a combination of 6 random numbers generated using a random function, followed by the epoch timestamp in millisec(10 digits).
> 
> I want to move only the data which has been generated after the previous transaction and not the data which was already moved in the previous transaction.
> I tried to do it with static values, counter variables, comparing the write_time of each row and order by. Still its not working . Please suggest me any ideas. 
> 
> 
> Thanks and regards
> vinod joseph
> 8050136948
> 
>  


Re: Issues in moving data from cassandra to elasticsearch in java.

Posted by Eric Stevens <mi...@gmail.com>.
Consider adding log_bucket timestamp, and then indexing that column.  Your
data loader can SELECT * FROM logs WHERE log_bucket = ?.  The value you
supply there would be the timestamp log bucket you're processing - in your
case logged_at % 5.

However, I'll caution against writing data to Cassandra and then trying to
reliably read it back immediately after.  You're likely to miss values this
way due to eventual consistency unless you read at CL_ALL.  But then your
data loader will break whenever you have any node offline.  Writing then
immediately reading data is a typical antipattern in any eventually
consistent system.

If using DataStax Java Driver you can use CL_ALL with
DowngradingConsistencyRetryPolicy and you would at least strike a nice
balance between reasonably strong consistency and loss of resiliency from
CL_ALL (but when you have a node offline, your load process may get
significantly slower).  This would mitigate but not eliminate the
antipattern.


On Tue Nov 25 2014 at 2:11:36 AM Vinod Joseph <vi...@gmail.com>
wrote:

> Hi,
>
>             I am working on a java plugin which moves data from cassandra
> to elasticsearch. This plugin must run in the server for every 5 seconds.
> The data is getting moved, but the issue is that every time the plugin
> runs(ie after every 5 seconds) all the data, including data which has been
> already moved into elasticsearch in the previous iteration is moving to it.
> So we are having duplicate values in the elastic search. How to avoid this
> problem.
>
> We are using this plugin to manage logs which are generated during any
> online transaction. So we will be having millions of transactions.
> Following is the table schema.
>
> CREATE TABLE logs (
>   txn_id text,
>   logged_at timestamp,
>   des text,
>   key_name text,
>   params text,
>   PRIMARY KEY (txn_id, logged_at)
> )
>
> The txn_id is a 16 digit number and is not unique. It is a combination of
> 6 random numbers generated using a random function, followed by the epoch
> timestamp in millisec(10 digits).
>
> I want to move only the data which has been generated after the previous
> transaction and not the data which was already moved in the previous
> transaction.
> I tried to do it with static values, counter variables, comparing the
> write_time of each row and order by. Still its not working . Please suggest
> me any ideas.
>
>
> Thanks and regards
> vinod joseph
> 8050136948
>
>
>