You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by wang Wu <fa...@gmail.com> on 2020/07/18 11:11:06 UTC

Custom Cassandra IO

I notice that the standard Cassandra IO setup Cluster with basics settings. Is it possible to implement custom Cassandra IO in which I can customise Datastax driver? Any sample code will be helpful. Thanks

Re: Custom Cassandra IO

Posted by Alexey Romanenko <ar...@gmail.com>.
Please, see my answers inline.

> On 23 Jul 2020, at 17:10, wang Wu <fa...@gmail.com> wrote:
> 
> Thank you. It works with the bellow code. Just 2 questions
> 1. Why if I initialise session inside the constructor FeatureToCassandraDoFn, it becomes null every time I access it inside processElement?

Because it’s defined as “transient”. When your ParDo transform “executes” DoFn on worker, it uses “serialised” instance of this - so transient members won’t be serialised. That is why it’s recommended to use @Setup method for initialisation of non-serialisable members. Also, the same instance of DoFn can be used to process one or more bundles on the same worker but there is no guarantee on how many.

I’d recommend to take a look on sequence diagram “DoFn lifecycle" on ParDo doc page for more details [1]


> 2. Is this function applied in parallel for elements of unbounded collection? The pipeline look like this:

Yes. The elements of input PCollection (bounded or unbounded) will be divided into bundles and every bundle will be processed on specific worker by DoFn instance. ParDo Javadoc explains it more details [2]

> 
> KafkaIO.read().apply(TransformKafkaMessageToFeature).apply(FeatureToCassandraDoFn)
> 
> I am wondering if we write in parallel to Cassandra for all elements of unbounded collection. If yes, how to control the parallelism.

If you mean the number of parallel instances, then it should be your data processing engine and runner responsibility. Beam doesn’t expose it to user afaik. 


[1] https://beam.apache.org/documentation/programming-guide/#pardo <https://beam.apache.org/documentation/programming-guide/#pardo>
[2] https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/transforms/ParDo.html <https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/transforms/ParDo.html>

Alexey

> 
> Regards
> Dinh
> 
> ---
> 
> public class FeatureToCassandraDoFn extends DoFn<FeatureRow, Void> {
> 
>   private transient Session session;
>   private transient PreparedStatement ps;
> 
>   public FeatureToCassandraDoFn(CassandraConfig cassandraConfig) {
>     this.port = cassandraConfig.getPort();
>     this.hosts = Arrays.asList(cassandraConfig.getBootstrapHosts().split(","));
>   }
> 
>   @Setup
>   public void setup() {
>     // Cassandra
>     Cluster cluster = getCluster(this.hosts, this.port);
>     this.session = cluster.newSession();
>     this.ps = session.prepare("insert into metis.store (entities, feature, value) values(?,?,?) using TTL ?");
>   }
>   
>   @ProcessElement
>   public void processElement(ProcessContext context) {
>     Rows rows = context.element();
>     BatchStatement batchStatement = new BatchStatement(Type.UNLOGGED);
>     // Add rows to batchStatement ...
> 	
> 	// Execute it
>     ResultSetFuture resultSetFuture = this.session.executeAsync(batchStatement);
>   }
> 
>   private Cluster getCluster() {...}
> }
> 
>> On 20 Jul BE 2563, at 22:10, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> 
>> You can make them “transient” and instantiate in @Setup method of your DoFn (similar to what current CassandraIO's WriteFn does [1]).
>> 
>> [1] https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139 <https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139>
>> 
>>> On 20 Jul 2020, at 12:49, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> But unfortunately that way will not work as Session and/or Cluster is not serialisable.
>>> 
>>> Regards
>>> Dinh
>>> 
>>>> On 20 Jul BE 2563, at 17:42, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> We are thinking of tuning connection pooling like this:
>>>> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>
>>>> 
>>>> I agree that current CassandraIO code does not open up for such modification/extension. Thus, we are trying to use DoFn instead.
>>>> 
>>>> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
>>>>   Cluster cluster;
>>>>   Session session;
>>>> 
>>>>   public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
>>>>     PoolingOptions poolingOptions = new PoolingOptions();
>>>>     this.cluster = getCluster(
>>>>         config,
>>>> 	poolingOptions
>>>>     );
>>>>     this.session = this.cluster.newSession();
>>>>   }
>>>> 
>>>>   @ProcessElement
>>>>   public void processElement(ProcessContext context) {
>>>>     CassandraBatch batch = context.element();
>>>>     for (CassandraMutation o : batch.rows) {
>>>>       this.session.executeAsync("xxx");
>>>>     }
>>>> 
>>>>   }
>>>> }
>>>> 
>>>> 
>>>> Regards
>>>> Dinh
>>>> 
>>>>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Could you tell, what kind of driver customisation you’d like to implement? 
>>>>> 
>>>>> Taking a look on current implementation of CassandraIO, I think that one of the option could be just to add another configuration “withSomeOption(...)” method and pass it to new Cluster instance initialisation method. 
>>>>> 
>>>>> Another one, more sophisticated, is to implement a “withClusterProvider(…)” method, which will allow to user to implement and provide custom Cluster instance with all required configuration.
>>>>> 
>>>>> In both cases, it will require CassandraIO modification.
>>>>> 
>>>>> 
>>>>>> On 18 Jul 2020, at 13:11, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>>>>> 
>>>>>> I notice that the standard Cassandra IO setup Cluster with basics settings. Is it possible to implement custom Cassandra IO in which I can customise Datastax driver? Any sample code will be helpful. Thanks
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Custom Cassandra IO

Posted by wang Wu <fa...@gmail.com>.
Thank you. It works with the bellow code. Just 2 questions
1. Why if I initialise session inside the constructor FeatureToCassandraDoFn, it becomes null every time I access it inside processElement?
2. Is this function applied in parallel for elements of unbounded collection? The pipeline look like this:

KafkaIO.read().apply(TransformKafkaMessageToFeature).apply(FeatureToCassandraDoFn)

I am wondering if we write in parallel to Cassandra for all elements of unbounded collection. If yes, how to control the parallelism.

Regards
Dinh

---

public class FeatureToCassandraDoFn extends DoFn<FeatureRow, Void> {

  private transient Session session;
  private transient PreparedStatement ps;

  public FeatureToCassandraDoFn(CassandraConfig cassandraConfig) {
    this.port = cassandraConfig.getPort();
    this.hosts = Arrays.asList(cassandraConfig.getBootstrapHosts().split(","));
  }

  @Setup
  public void setup() {
    // Cassandra
    Cluster cluster = getCluster(this.hosts, this.port);
    this.session = cluster.newSession();
    this.ps = session.prepare("insert into metis.store (entities, feature, value) values(?,?,?) using TTL ?");
  }
  
  @ProcessElement
  public void processElement(ProcessContext context) {
    Rows rows = context.element();
    BatchStatement batchStatement = new BatchStatement(Type.UNLOGGED);
    // Add rows to batchStatement ...
	
	// Execute it
    ResultSetFuture resultSetFuture = this.session.executeAsync(batchStatement);
  }

  private Cluster getCluster() {...}
}

> On 20 Jul BE 2563, at 22:10, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> You can make them “transient” and instantiate in @Setup method of your DoFn (similar to what current CassandraIO's WriteFn does [1]).
> 
> [1] https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139 <https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139>
> 
>> On 20 Jul 2020, at 12:49, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>> 
>> But unfortunately that way will not work as Session and/or Cluster is not serialisable.
>> 
>> Regards
>> Dinh
>> 
>>> On 20 Jul BE 2563, at 17:42, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> We are thinking of tuning connection pooling like this:
>>> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>
>>> 
>>> I agree that current CassandraIO code does not open up for such modification/extension. Thus, we are trying to use DoFn instead.
>>> 
>>> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
>>>   Cluster cluster;
>>>   Session session;
>>> 
>>>   public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
>>>     PoolingOptions poolingOptions = new PoolingOptions();
>>>     this.cluster = getCluster(
>>>         config,
>>> 	poolingOptions
>>>     );
>>>     this.session = this.cluster.newSession();
>>>   }
>>> 
>>>   @ProcessElement
>>>   public void processElement(ProcessContext context) {
>>>     CassandraBatch batch = context.element();
>>>     for (CassandraMutation o : batch.rows) {
>>>       this.session.executeAsync("xxx");
>>>     }
>>> 
>>>   }
>>> }
>>> 
>>> 
>>> Regards
>>> Dinh
>>> 
>>>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Could you tell, what kind of driver customisation you’d like to implement? 
>>>> 
>>>> Taking a look on current implementation of CassandraIO, I think that one of the option could be just to add another configuration “withSomeOption(...)” method and pass it to new Cluster instance initialisation method. 
>>>> 
>>>> Another one, more sophisticated, is to implement a “withClusterProvider(…)” method, which will allow to user to implement and provide custom Cluster instance with all required configuration.
>>>> 
>>>> In both cases, it will require CassandraIO modification.
>>>> 
>>>> 
>>>>> On 18 Jul 2020, at 13:11, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>>>> 
>>>>> I notice that the standard Cassandra IO setup Cluster with basics settings. Is it possible to implement custom Cassandra IO in which I can customise Datastax driver? Any sample code will be helpful. Thanks
>>>> 
>>> 
>> 
> 


Re: Custom Cassandra IO

Posted by Alexey Romanenko <ar...@gmail.com>.
You can make them “transient” and instantiate in @Setup method of your DoFn (similar to what current CassandraIO's WriteFn does [1]).

[1] https://github.com/apache/beam/blob/8b84720631c8f454881d20fc1aa7cec2bc380edf/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L1139

> On 20 Jul 2020, at 12:49, wang Wu <fa...@gmail.com> wrote:
> 
> But unfortunately that way will not work as Session and/or Cluster is not serialisable.
> 
> Regards
> Dinh
> 
>> On 20 Jul BE 2563, at 17:42, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> We are thinking of tuning connection pooling like this:
>> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>
>> 
>> I agree that current CassandraIO code does not open up for such modification/extension. Thus, we are trying to use DoFn instead.
>> 
>> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
>>   Cluster cluster;
>>   Session session;
>> 
>>   public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
>>     PoolingOptions poolingOptions = new PoolingOptions();
>>     this.cluster = getCluster(
>>         config,
>> 	poolingOptions
>>     );
>>     this.session = this.cluster.newSession();
>>   }
>> 
>>   @ProcessElement
>>   public void processElement(ProcessContext context) {
>>     CassandraBatch batch = context.element();
>>     for (CassandraMutation o : batch.rows) {
>>       this.session.executeAsync("xxx");
>>     }
>> 
>>   }
>> }
>> 
>> 
>> Regards
>> Dinh
>> 
>>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> Could you tell, what kind of driver customisation you’d like to implement? 
>>> 
>>> Taking a look on current implementation of CassandraIO, I think that one of the option could be just to add another configuration “withSomeOption(...)” method and pass it to new Cluster instance initialisation method. 
>>> 
>>> Another one, more sophisticated, is to implement a “withClusterProvider(…)” method, which will allow to user to implement and provide custom Cluster instance with all required configuration.
>>> 
>>> In both cases, it will require CassandraIO modification.
>>> 
>>> 
>>>> On 18 Jul 2020, at 13:11, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> I notice that the standard Cassandra IO setup Cluster with basics settings. Is it possible to implement custom Cassandra IO in which I can customise Datastax driver? Any sample code will be helpful. Thanks
>>> 
>> 
> 


Re: Custom Cassandra IO

Posted by wang Wu <fa...@gmail.com>.
But unfortunately that way will not work as Session and/or Cluster is not serialisable.

Regards
Dinh

> On 20 Jul BE 2563, at 17:42, wang Wu <fa...@gmail.com> wrote:
> 
> Hi,
> 
> We are thinking of tuning connection pooling like this:
> https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>
> 
> I agree that current CassandraIO code does not open up for such modification/extension. Thus, we are trying to use DoFn instead.
> 
> public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
>   Cluster cluster;
>   Session session;
> 
>   public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
>     PoolingOptions poolingOptions = new PoolingOptions();
>     this.cluster = getCluster(
>         config,
> 	poolingOptions
>     );
>     this.session = this.cluster.newSession();
>   }
> 
>   @ProcessElement
>   public void processElement(ProcessContext context) {
>     CassandraBatch batch = context.element();
>     for (CassandraMutation o : batch.rows) {
>       this.session.executeAsync("xxx");
>     }
> 
>   }
> }
> 
> 
> Regards
> Dinh
> 
>> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> Could you tell, what kind of driver customisation you’d like to implement? 
>> 
>> Taking a look on current implementation of CassandraIO, I think that one of the option could be just to add another configuration “withSomeOption(...)” method and pass it to new Cluster instance initialisation method. 
>> 
>> Another one, more sophisticated, is to implement a “withClusterProvider(…)” method, which will allow to user to implement and provide custom Cluster instance with all required configuration.
>> 
>> In both cases, it will require CassandraIO modification.
>> 
>> 
>>> On 18 Jul 2020, at 13:11, wang Wu <farangbk@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> I notice that the standard Cassandra IO setup Cluster with basics settings. Is it possible to implement custom Cassandra IO in which I can customise Datastax driver? Any sample code will be helpful. Thanks
>> 
> 


Re: Custom Cassandra IO

Posted by wang Wu <fa...@gmail.com>.
Hi,

We are thinking of tuning connection pooling like this:
https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/ <https://docs.datastax.com/en/developer/java-driver/3.4/manual/pooling/>

I agree that current CassandraIO code does not open up for such modification/extension. Thus, we are trying to use DoFn instead.

public class CustomCassandraWriteFn extends DoFn<CassandraBatch, Void> {
  Cluster cluster;
  Session session;

  public CustomCassandraWriteFn(CassandraConfig cassandraConfig) {
    PoolingOptions poolingOptions = new PoolingOptions();
    this.cluster = getCluster(
        config,
	poolingOptions
    );
    this.session = this.cluster.newSession();
  }

  @ProcessElement
  public void processElement(ProcessContext context) {
    CassandraBatch batch = context.element();
    for (CassandraMutation o : batch.rows) {
      this.session.executeAsync("xxx");
    }

  }
}


Regards
Dinh

> On 20 Jul BE 2563, at 17:34, Alexey Romanenko <ar...@gmail.com> wrote:
> 
> Hi,
> 
> Could you tell, what kind of driver customisation you’d like to implement? 
> 
> Taking a look on current implementation of CassandraIO, I think that one of the option could be just to add another configuration “withSomeOption(...)” method and pass it to new Cluster instance initialisation method. 
> 
> Another one, more sophisticated, is to implement a “withClusterProvider(…)” method, which will allow to user to implement and provide custom Cluster instance with all required configuration.
> 
> In both cases, it will require CassandraIO modification.
> 
> 
>> On 18 Jul 2020, at 13:11, wang Wu <fa...@gmail.com> wrote:
>> 
>> I notice that the standard Cassandra IO setup Cluster with basics settings. Is it possible to implement custom Cassandra IO in which I can customise Datastax driver? Any sample code will be helpful. Thanks
> 


Re: Custom Cassandra IO

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi,

Could you tell, what kind of driver customisation you’d like to implement? 

Taking a look on current implementation of CassandraIO, I think that one of the option could be just to add another configuration “withSomeOption(...)” method and pass it to new Cluster instance initialisation method. 

Another one, more sophisticated, is to implement a “withClusterProvider(…)” method, which will allow to user to implement and provide custom Cluster instance with all required configuration.

In both cases, it will require CassandraIO modification.


> On 18 Jul 2020, at 13:11, wang Wu <fa...@gmail.com> wrote:
> 
> I notice that the standard Cassandra IO setup Cluster with basics settings. Is it possible to implement custom Cassandra IO in which I can customise Datastax driver? Any sample code will be helpful. Thanks