You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Thomas Weise <th...@apache.org> on 2016/10/06 20:02:55 UTC

Re: Datatorrent fault tolerance

Hi,

It would be necessary to know a bit more about your application for
specific recommendations, but from what I see above, a few things don't
look right.

It appears that you are setting the processing mode on the input operator,
which only reads from Kafka. Exactly-once is important for operators that
affect the state of external systems, for example when writing to a
database, or, producing messages to Kafka.

There is also a problem with the documentation that may contribute to the
confusion. The processing mode you are trying to use isn't really
exactly-once and probably will be deprecated in a future release. Here are
some better resources on fault tolerance and processing semantics:

http://apex.apache.org/docs.html
https://www.youtube.com/watch?v=FCMY6Ii89Nw
http://www.slideshare.net/ApacheApexOrganizer/webinar-fault-toleranceandprocessingsemantics
https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/

HTH,
Thomas


On Thu, Oct 6, 2016 at 11:26 AM, Bandaru, Srinivas <
srinivas.bandaru@optum.com> wrote:

> Hi, Need some help!
>
> Need your advice in implementing the fault tolerance mechanism using
> datatorrent. Currently we are trying to implement with EXACTLY_ONCE
> scenario.
>
> We are referring the below documentation. The highlighted line says if we
> need to use the EXACTLY_ONCE processing mode, the downstream operators
> should have AT_MOST_ONCE. We added the operator level attributes in the
> properties.xml (attached below) and when we launch the application, no
> messages are coming through the last operator *topicUpdate *but we are
> able to see the messages when we comment those attributes.
>
> *Please have a look at it, either we are specifying the attributes
> incorrectly or is there any additional step to be done*.
>
>
>
>
>
> ·         *PROCESSING_MODE*
>
> static final Attribute <https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Attribute.html><Operator.ProcessingMode <https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.ProcessingMode.html>> PROCESSING_MODE
>
> The payload processing mode for this operator - at most once, exactly
> once, or default at least once. If the processing mode for an operator is
> specified as AT_MOST_ONCE and no processing mode is specified for the
> downstream operators if any, the processing mode of the downstream
> operators is automatically set to AT_MOST_ONCE. If a different processing
> mode is specified for the downstream operators it will result in an error. If
> the processing mode for an operator is specified as EXACTLY_ONCE then the
> processing mode for all downstream operators should be specified as
> AT_MOST_ONCE otherwise it will result in an error.
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
>
> Srinivas
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>

Re: Datatorrent fault tolerance

Posted by hs...@gmail.com.
The KafkaSinglePortExactlyOnceOutputOperator takes whatever output from previous operator and writes to Kafka. 

Sent from my iPhone

> On Oct 7, 2016, at 07:59, Jaspal Singh <ja...@gmail.com> wrote:
> 
> Hi Thomas,
> 
> I have a question, so when we are using KafkaSinglePortExactlyOnceOutputOperator to write results into maprstream topic will it be able to read messgaes from the previous operator ?
> 
> 
> Thanks
> Jaspal
> 
>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>> For recovery you need to set the window data manager like so:
>> 
>> https://github.com/DataTorrent/examples/blob/master/tutorials/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33
>> 
>> That will also apply to stateful restart of the entire application (relaunch from previous instance's checkpointed state).
>> 
>> For cold restart, you would need to consider the property you mention and decide what is applicable to your use case.
>> 
>> Thomas
>> 
>> 
>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <ja...@gmail.com> wrote:
>>> Ok now I get it. Thanks for the nice explaination !!
>>> 
>>> One more thing, so you mentioned about checkpointing the offset ranges to replay in same order from kafka. 
>>> 
>>> Is there any property we need to configure to do that? like initialOffset set to APPLICATION_OR_LATEST.
>>> 
>>> 
>>> Thanks
>>> Jaspal
>>> 
>>> 
>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com> wrote:
>>>> What you want is the effect of exactly-once output (that's why we call it also end-to-end exactly-once). There is no such thing as exactly-once processing in a distributed system. In this case it would be rather "produce exactly-once. Upstream operators, on failure, will recover to checkpointed state and re-process the stream from there. This is at-least-once, the default behavior. Because in the input operator you have configured to replay in the same order from Kafka (this is done by checkpointing the offset ranges), the computation in the DAG is idempotent and the output operator can discard the results that were already published instead of producing duplicates. 
>>>> 
>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <ja...@gmail.com> wrote:
>>>>> I think this is something called a customized operator implementation that is taking care of exactly once processing at output.
>>>>> 
>>>>> What if any previous operators fail ? How we can make sure they also recover using EXACTLY_ONCE processing mode ?
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> Jaspal
>>>>> 
>>>>> 
>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com> wrote:
>>>>>> In that case please have a look at:
>>>>>> 
>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>> 
>>>>>> The operator will ensure that messages are not duplicated, under the stated assumptions.
>>>>>> 
>>>>>> 
>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <ja...@gmail.com> wrote:
>>>>>>> Hi Thomas,
>>>>>>> 
>>>>>>> In our case we are writing the results back to maprstreams topic based on some validations.
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>> 
>>>>>>> 
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> which operators in your application are writing to external systems?
>>>>>>>> 
>>>>>>>> When you look at the example from the blog (https://github.com/DataTorrent/examples/tree/master/tutorials/exactly-once), there is Kafka input, which is configured to be idempotent. The results are written to JDBC. That operator by itself supports exactly-once through transactions (in conjunction with idempotent input), hence there is no need to configure the processing mode at all.
>>>>>>>> 
>>>>>>>> Thomas
>>>>>>>> 
>>>> 
>> 
> 

Re: Datatorrent fault tolerance

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Oh I see, you want to send to different topics. Well, then you have to give
some dummy value to the topic property on the operator.

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Siyuan,
>
> So for the output operator, we have specified it as a part of our logic
> itself.
>
> public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOutputOperator {
>
>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:34 PM, hsy541@gmail.com <hs...@gmail.com> wrote:
>
>> Jaspal,
>>
>> I think you miss the kafkaOut  :)
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>> > wrote:
>>
>>> Siyuan,
>>>
>>> That's how we have given it in properties file:
>>>
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:27 PM, hsy541@gmail.com <hs...@gmail.com>
>>> wrote:
>>>
>>>> Jaspal,
>>>>
>>>> Topic is a mandatory property you have to set. In mapr, the value
>>>> should be set to the full stream path example:  /your/stream/path:streamname
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> After making the change, we are getting the below error while
>>>>> application launch:
>>>>>
>>>>> *An error occurred trying to launch the application. Server message:
>>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>>> constraints
>>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>>> propertyPath='topic', message='may not be null', *
>>>>>
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>>
>>>>>> Thanks for your inputs !!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>>>
>>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate module.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Siyuan,
>>>>>>>>>
>>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks!!
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <
>>>>>>>>> hsy541@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <
>>>>>>>>>> hsy541@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>>
>>>>>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>>> from malhar?  If so please make sure the producer you use here
>>>>>>>>>>> is org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is
>>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Siyuan
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>>>>>
>>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>>>>>>>
>>>>>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>>>>>>>
>>>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>>>
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>>>
>>>>>>>>>>>>             try {
>>>>>>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>>>                     } else {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                     producer.flush();
>>>>>>>>>>>>                 }
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>>>
>>>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an object of class
>>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>>>> state of the previous application after relaunch? Since the data is stored
>>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <thw@apache.org
>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For recovery you need to set the window data manager like
>>>>>>>>>>>>>>>> so:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/DataTorrent
>>>>>>>>>>>>>>>> /examples/blob/master/tutorial
>>>>>>>>>>>>>>>> s/exactly-once/src/main/java/c
>>>>>>>>>>>>>>>> om/example/myapexapp/Application.java#L33
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For cold restart, you would need to consider the property
>>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there any property we need to configure to do that?
>>>>>>>>>>>>>>>>> like initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output
>>>>>>>>>>>>>>>>>> (that's why we call it also end-to-end exactly-once). There is no such
>>>>>>>>>>>>>>>>>> thing as exactly-once processing in a distributed system. In this case it
>>>>>>>>>>>>>>>>>> would be rather "produce exactly-once. Upstream operators, on failure, will
>>>>>>>>>>>>>>>>>> recover to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make
>>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>>> thw@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once),
>>>>>>>>>>>>>>>>>>>>>> there is Kafka input, which is configured to be idempotent. The results are
>>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports exactly-once through
>>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence there is no need
>>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@apache.org>.
Since the operator expects it as part of the configuration, you could just
supply a dummy value that won't have any further effect.

On Fri, Oct 7, 2016 at 11:38 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Siyuan,
>
> So for the output operator, we have specified it as a part of our logic
> itself.
>
> public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOutputOperator {
>
>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:34 PM, hsy541@gmail.com <hs...@gmail.com> wrote:
>
>> Jaspal,
>>
>> I think you miss the kafkaOut  :)
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>> > wrote:
>>
>>> Siyuan,
>>>
>>> That's how we have given it in properties file:
>>>
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:27 PM, hsy541@gmail.com <hs...@gmail.com>
>>> wrote:
>>>
>>>> Jaspal,
>>>>
>>>> Topic is a mandatory property you have to set. In mapr, the value
>>>> should be set to the full stream path example:  /your/stream/path:streamname
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> After making the change, we are getting the below error while
>>>>> application launch:
>>>>>
>>>>> *An error occurred trying to launch the application. Server message:
>>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>>> constraints
>>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>>> propertyPath='topic', message='may not be null', *
>>>>>
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>>
>>>>>> Thanks for your inputs !!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>>>
>>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate module.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Siyuan,
>>>>>>>>>
>>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks!!
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <
>>>>>>>>> hsy541@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <
>>>>>>>>>> hsy541@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>>
>>>>>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>>> from malhar?  If so please make sure the producer you use here
>>>>>>>>>>> is org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is
>>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Siyuan
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>>>>>
>>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>>>>>>>
>>>>>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>>>>>>>
>>>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>>>
>>>>>>>>>>>>         @Override
>>>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>>>
>>>>>>>>>>>>             try {
>>>>>>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>>>                     } else {
>>>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>>
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                     producer.flush();
>>>>>>>>>>>>                 }
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>>>
>>>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an object of class
>>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>>>> state of the previous application after relaunch? Since the data is stored
>>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <thw@apache.org
>>>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For recovery you need to set the window data manager like
>>>>>>>>>>>>>>>> so:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/DataTorrent
>>>>>>>>>>>>>>>> /examples/blob/master/tutorial
>>>>>>>>>>>>>>>> s/exactly-once/src/main/java/c
>>>>>>>>>>>>>>>> om/example/myapexapp/Application.java#L33
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For cold restart, you would need to consider the property
>>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is there any property we need to configure to do that?
>>>>>>>>>>>>>>>>> like initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output
>>>>>>>>>>>>>>>>>> (that's why we call it also end-to-end exactly-once). There is no such
>>>>>>>>>>>>>>>>>> thing as exactly-once processing in a distributed system. In this case it
>>>>>>>>>>>>>>>>>> would be rather "produce exactly-once. Upstream operators, on failure, will
>>>>>>>>>>>>>>>>>> recover to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make
>>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>>> thw@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once),
>>>>>>>>>>>>>>>>>>>>>> there is Kafka input, which is configured to be idempotent. The results are
>>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports exactly-once through
>>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence there is no need
>>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Siyuan,

So for the output operator, we have specified it as a part of our logic
itself.

public class KafkaSinglePortExactlyOnceOutputOperator<T> extends
AbstractKafkaOutputOperator {

    private static final Logger LOG =
LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);

    public transient final DefaultInputPort<Tenant> in = new
DefaultInputPort<Tenant>() {

        Gson gson = new Gson();

        @Override
        public void process(Tenant tenant) {

            try {
                Producer<String, String> producer = getKafkaProducer();
                //ObjectMapper mapper = new ObjectMapper();
                long now = System.currentTimeMillis();
                //Configuration conf = HBaseConfiguration.create();
                //TenantDao dao = new TenantDao(conf);
                //ArrayList<Put> puts = new ArrayList<>();
                if (tenant != null) {
                    //Tenant tenant = tenant.next();
                    if (StringUtils.isNotEmpty(tenant.getGl())) {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
                        //puts.add(dao.mkPut(tenant));
                    } else {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
tenant.getVolumeName(), gson.toJson(tenant)));

                    }
                    producer.flush();
                }
            }



Thanks!!

On Fri, Oct 7, 2016 at 1:34 PM, hsy541@gmail.com <hs...@gmail.com> wrote:

> Jaspal,
>
> I think you miss the kafkaOut  :)
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Siyuan,
>>
>> That's how we have given it in properties file:
>>
>> [image: Inline image 1]
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 1:27 PM, hsy541@gmail.com <hs...@gmail.com>
>> wrote:
>>
>>> Jaspal,
>>>
>>> Topic is a mandatory property you have to set. In mapr, the value should
>>> be set to the full stream path example:  /your/stream/path:streamname
>>>
>>> Regards,
>>> Siyuan
>>>
>>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <
>>> jaspal.singh1404@gmail.com> wrote:
>>>
>>>> After making the change, we are getting the below error while
>>>> application launch:
>>>>
>>>> *An error occurred trying to launch the application. Server message:
>>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>>> constraints
>>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>>> propertyPath='topic', message='may not be null', *
>>>>
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>>
>>>>> Thanks for your inputs !!
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Should we use malhar-library version 3.5 then ?
>>>>>>
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in
>>>>>>> pom.xml. This operator is not in malhar-library, it's a separate module.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Siyuan,
>>>>>>>>
>>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks!!
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hsy541@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Also which kafka output operator you are using?
>>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>>> works with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Siyuan
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <
>>>>>>>>> hsy541@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Jaspal,
>>>>>>>>>>
>>>>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is
>>>>>>>>>> not supported by MapR stream.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Siyuan
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thomas,
>>>>>>>>>>>
>>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>>>>
>>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>>>>>>
>>>>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>>
>>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>>>>>>
>>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>>
>>>>>>>>>>>         @Override
>>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>>
>>>>>>>>>>>             try {
>>>>>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>>                     } else {
>>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>>
>>>>>>>>>>>                     }
>>>>>>>>>>>                     producer.flush();
>>>>>>>>>>>                 }
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>>
>>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>>
>>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator,
>>>>>>>>>>>> do we need to specify <String, T> ? Since we are getting an object of class
>>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>>> state of the previous application after relaunch? Since the data is stored
>>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For cold restart, you would need to consider the property
>>>>>>>>>>>>>>> you mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make
>>>>>>>>>>>>>>>>>> sure they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>>> thw@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there
>>>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The results are
>>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports exactly-once through
>>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence there is no need
>>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Jaspal,

I think you miss the kafkaOut  :)

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:32 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Siyuan,
>
> That's how we have given it in properties file:
>
> [image: Inline image 1]
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:27 PM, hsy541@gmail.com <hs...@gmail.com> wrote:
>
>> Jaspal,
>>
>> Topic is a mandatory property you have to set. In mapr, the value should
>> be set to the full stream path example:  /your/stream/path:streamname
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>> > wrote:
>>
>>> After making the change, we are getting the below error while
>>> application launch:
>>>
>>> *An error occurred trying to launch the application. Server message:
>>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>>> constraints
>>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>>> propertyPath='topic', message='may not be null', *
>>>
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>>
>>>> Thanks for your inputs !!
>>>>
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Should we use malhar-library version 3.5 then ?
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>>>> This operator is not in malhar-library, it's a separate module.
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Siyuan,
>>>>>>>
>>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>>
>>>>>>>
>>>>>>> Thanks!!
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Also which kafka output operator you are using?
>>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Siyuan
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hsy541@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hey Jaspal,
>>>>>>>>>
>>>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>>>> supported by MapR stream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Siyuan
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thomas,
>>>>>>>>>>
>>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>>>
>>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>>>>>
>>>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>>
>>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>>>>>
>>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>>
>>>>>>>>>>         @Override
>>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>>
>>>>>>>>>>             try {
>>>>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>>                     } else {
>>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>>
>>>>>>>>>>                     }
>>>>>>>>>>                     producer.flush();
>>>>>>>>>>                 }
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>>
>>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thomas,
>>>>>>>>>>>
>>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>>
>>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do
>>>>>>>>>>> we need to specify <String, T> ? Since we are getting an object of class
>>>>>>>>>>> type from previous operator.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>>> state of the previous application after relaunch? Since the data is stored
>>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>>>>> operator ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The operator will ensure that messages are not
>>>>>>>>>>>>>>>>>> duplicated, under the stated assumptions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>>> thw@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there
>>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The results are
>>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports exactly-once through
>>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence there is no need
>>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Siyuan,

That's how we have given it in properties file:

[image: Inline image 1]


Thanks!!

On Fri, Oct 7, 2016 at 1:27 PM, hsy541@gmail.com <hs...@gmail.com> wrote:

> Jaspal,
>
> Topic is a mandatory property you have to set. In mapr, the value should
> be set to the full stream path example:  /your/stream/path:streamname
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> After making the change, we are getting the below error while application
>> launch:
>>
>> *An error occurred trying to launch the application. Server message:
>> javax.validation.ConstraintViolationException: Operator kafkaOut violates
>> constraints
>> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
>> propertyPath='topic', message='may not be null', *
>>
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> So I just changes the malhar-kafka version to 3.5.0, I was able to
>>> import the AbstractOutputOperator. Let me try to launch it now.
>>>
>>> Thanks for your inputs !!
>>>
>>>
>>>
>>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> Should we use malhar-library version 3.5 then ?
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>>> This operator is not in malhar-library, it's a separate module.
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Hi Siyuan,
>>>>>>
>>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>>
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Also which kafka output operator you are using?
>>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>>
>>>>>>> Regards,
>>>>>>> Siyuan
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Jaspal,
>>>>>>>>
>>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>>> supported by MapR stream.
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Siyuan
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thomas,
>>>>>>>>>
>>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>>
>>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>>>>
>>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>>
>>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>>>>
>>>>>>>>>         Gson gson = new Gson();
>>>>>>>>>
>>>>>>>>>         @Override
>>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>>
>>>>>>>>>             try {
>>>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>>                 if (tenant != null) {
>>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>>                     } else {
>>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>>
>>>>>>>>>                     }
>>>>>>>>>                     producer.flush();
>>>>>>>>>                 }
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>>
>>>>>>>>> An error occurred trying to launch the application. Server
>>>>>>>>> message: java.lang.NoClassDefFoundError:
>>>>>>>>> Lkafka/javaapi/producer/Producer; at
>>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Thomas,
>>>>>>>>>>
>>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>>
>>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do
>>>>>>>>>> we need to specify <String, T> ? Since we are getting an object of class
>>>>>>>>>> type from previous operator.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>>> state of the previous application after relaunch? Since the data is stored
>>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>>>>> Please clarify your question.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>>>> operator ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>>
>>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>>>
>>>>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>>>>> thw@apache.org> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there
>>>>>>>>>>>>>>>>>>> is Kafka input, which is configured to be idempotent. The results are
>>>>>>>>>>>>>>>>>>> written to JDBC. That operator by itself supports exactly-once through
>>>>>>>>>>>>>>>>>>> transactions (in conjunction with idempotent input), hence there is no need
>>>>>>>>>>>>>>>>>>> to configure the processing mode at all.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Jaspal,

Topic is a mandatory property you have to set. In mapr, the value should be
set to the full stream path example:  /your/stream/path:streamname

Regards,
Siyuan

On Fri, Oct 7, 2016 at 11:22 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> After making the change, we are getting the below error while application
> launch:
>
> *An error occurred trying to launch the application. Server message:
> javax.validation.ConstraintViolationException: Operator kafkaOut violates
> constraints
> [ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
> propertyPath='topic', message='may not be null', *
>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> So I just changes the malhar-kafka version to 3.5.0, I was able to import
>> the AbstractOutputOperator. Let me try to launch it now.
>>
>> Thanks for your inputs !!
>>
>>
>>
>> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> Should we use malhar-library version 3.5 then ?
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>>> This operator is not in malhar-library, it's a separate module.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Hi Siyuan,
>>>>>
>>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>>
>>>>>
>>>>> Thanks!!
>>>>>
>>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Also which kafka output operator you are using?
>>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>>
>>>>>> Regards,
>>>>>> Siyuan
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Jaspal,
>>>>>>>
>>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>>> supported by MapR stream.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Siyuan
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thomas,
>>>>>>>>
>>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>>
>>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>>>
>>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>>
>>>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>>>
>>>>>>>>         Gson gson = new Gson();
>>>>>>>>
>>>>>>>>         @Override
>>>>>>>>         public void process(Tenant tenant) {
>>>>>>>>
>>>>>>>>             try {
>>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>>                 if (tenant != null) {
>>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>>                     } else {
>>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>>
>>>>>>>>                     }
>>>>>>>>                     producer.flush();
>>>>>>>>                 }
>>>>>>>>             }
>>>>>>>>
>>>>>>>>
>>>>>>>> After building the application, it throws error during launch:
>>>>>>>>
>>>>>>>> An error occurred trying to launch the application. Server message:
>>>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thomas,
>>>>>>>>>
>>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>>
>>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do
>>>>>>>>> we need to specify <String, T> ? Since we are getting an object of class
>>>>>>>>> type from previous operator.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Are you referring to the upstream operator in the DAG or the
>>>>>>>>>> state of the previous application after relaunch? Since the data is stored
>>>>>>>>>> in MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>>>> Please clarify your question.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results
>>>>>>>>>>> into maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>>> operator ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>>> on.java#L33
>>>>>>>>>>>>
>>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>>
>>>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>>
>>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the
>>>>>>>>>>>>> offset ranges to replay in same order from kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's
>>>>>>>>>>>>>> why we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://github.com/apache/apex
>>>>>>>>>>>>>>>> -malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>>> main/java/org/apache/apex/malh
>>>>>>>>>>>>>>>> ar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there is
>>>>>>>>>>>>>>>>>> Kafka input, which is configured to be idempotent. The results are written
>>>>>>>>>>>>>>>>>> to JDBC. That operator by itself supports exactly-once through transactions
>>>>>>>>>>>>>>>>>> (in conjunction with idempotent input), hence there is no need to configure
>>>>>>>>>>>>>>>>>> the processing mode at all.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
After making the change, we are getting the below error while application
launch:

*An error occurred trying to launch the application. Server message:
javax.validation.ConstraintViolationException: Operator kafkaOut violates
constraints
[ConstraintViolationImpl{rootBean=com.example.datatorrent.KafkaSinglePortExactlyOnceOutputOperator@4726f93f,
propertyPath='topic', message='may not be null', *



Thanks!!

On Fri, Oct 7, 2016 at 1:13 PM, Jaspal Singh <ja...@gmail.com>
wrote:

> So I just changes the malhar-kafka version to 3.5.0, I was able to import
> the AbstractOutputOperator. Let me try to launch it now.
>
> Thanks for your inputs !!
>
>
>
> On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Should we use malhar-library version 3.5 then ?
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>>
>>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>>> This operator is not in malhar-library, it's a separate module.
>>>
>>>
>>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <
>>> jaspal.singh1404@gmail.com> wrote:
>>>
>>>> Hi Siyuan,
>>>>
>>>> I am using the same Kafka producer as you mentioned. But I am not
>>>> seeing the AbstractKafkaOutputOperator in malhar library while import.
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com>
>>>> wrote:
>>>>
>>>>> Also which kafka output operator you are using?
>>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator
>>>>> instead of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>>
>>>>> Regards,
>>>>> Siyuan
>>>>>
>>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Jaspal,
>>>>>>
>>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>>> from malhar?  If so please make sure the producer you use here is
>>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>>> supported by MapR stream.
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Siyuan
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Thomas,
>>>>>>>
>>>>>>> Below is the operator implementation we are trying to run. This
>>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>>
>>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>>
>>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>>
>>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>>
>>>>>>>         Gson gson = new Gson();
>>>>>>>
>>>>>>>         @Override
>>>>>>>         public void process(Tenant tenant) {
>>>>>>>
>>>>>>>             try {
>>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>>                 long now = System.currentTimeMillis();
>>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>>                 if (tenant != null) {
>>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>>                     } else {
>>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>>
>>>>>>>                     }
>>>>>>>                     producer.flush();
>>>>>>>                 }
>>>>>>>             }
>>>>>>>
>>>>>>>
>>>>>>> After building the application, it throws error during launch:
>>>>>>>
>>>>>>> An error occurred trying to launch the application. Server message:
>>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thomas,
>>>>>>>>
>>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>>
>>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>>>>>> need to specify <String, T> ? Since we are getting an object of class type
>>>>>>>> from previous operator.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Are you referring to the upstream operator in the DAG or the state
>>>>>>>>> of the previous application after relaunch? Since the data is stored in
>>>>>>>>> MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>>> Please clarify your question.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> I have a question, so when we are using
>>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>>>>>> operator ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>>> on.java#L33
>>>>>>>>>>>
>>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>>
>>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>>
>>>>>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>>>>>
>>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What you want is the effect of exactly-once output (that's why
>>>>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> which operators in your application are writing to
>>>>>>>>>>>>>>>>> external systems?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>>> https://github.com/DataTorren
>>>>>>>>>>>>>>>>> t/examples/tree/master/tutorials/exactly-once), there is
>>>>>>>>>>>>>>>>> Kafka input, which is configured to be idempotent. The results are written
>>>>>>>>>>>>>>>>> to JDBC. That operator by itself supports exactly-once through transactions
>>>>>>>>>>>>>>>>> (in conjunction with idempotent input), hence there is no need to configure
>>>>>>>>>>>>>>>>> the processing mode at all.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
So I just changes the malhar-kafka version to 3.5.0, I was able to import
the AbstractOutputOperator. Let me try to launch it now.

Thanks for your inputs !!



On Fri, Oct 7, 2016 at 1:09 PM, Jaspal Singh <ja...@gmail.com>
wrote:

> Should we use malhar-library version 3.5 then ?
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:
>
>> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml.
>> This operator is not in malhar-library, it's a separate module.
>>
>>
>> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>> > wrote:
>>
>>> Hi Siyuan,
>>>
>>> I am using the same Kafka producer as you mentioned. But I am not seeing
>>> the AbstractKafkaOutputOperator in malhar library while import.
>>>
>>>
>>> Thanks!!
>>>
>>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com>
>>> wrote:
>>>
>>>> Also which kafka output operator you are using?
>>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
>>>> of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works
>>>> with MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey Jaspal,
>>>>>
>>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>>> from malhar?  If so please make sure the producer you use here is
>>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>>> supported by MapR stream.
>>>>>
>>>>>
>>>>> Regards,
>>>>> Siyuan
>>>>>
>>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Thomas,
>>>>>>
>>>>>> Below is the operator implementation we are trying to run. This
>>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>>
>>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>>
>>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>>
>>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>>
>>>>>>         Gson gson = new Gson();
>>>>>>
>>>>>>         @Override
>>>>>>         public void process(Tenant tenant) {
>>>>>>
>>>>>>             try {
>>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>>                 long now = System.currentTimeMillis();
>>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>>                 if (tenant != null) {
>>>>>>                     //Tenant tenant = tenant.next();
>>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>>                     } else {
>>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>>
>>>>>>                     }
>>>>>>                     producer.flush();
>>>>>>                 }
>>>>>>             }
>>>>>>
>>>>>>
>>>>>> After building the application, it throws error during launch:
>>>>>>
>>>>>> An error occurred trying to launch the application. Server message:
>>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Thomas,
>>>>>>>
>>>>>>> I was trying to refer to the input from previous operator.
>>>>>>>
>>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>>>>> need to specify <String, T> ? Since we are getting an object of class type
>>>>>>> from previous operator.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Are you referring to the upstream operator in the DAG or the state
>>>>>>>> of the previous application after relaunch? Since the data is stored in
>>>>>>>> MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>>> Please clarify your question.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Thomas,
>>>>>>>>>
>>>>>>>>> I have a question, so when we are using
>>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>>>>> operator ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>>
>>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>>> on.java#L33
>>>>>>>>>>
>>>>>>>>>> That will also apply to stateful restart of the entire
>>>>>>>>>> application (relaunch from previous instance's checkpointed state).
>>>>>>>>>>
>>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>>
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>>
>>>>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>>>>
>>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What you want is the effect of exactly-once output (that's why
>>>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What if any previous operators fail ? How we can make sure
>>>>>>>>>>>>> they also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>>>>>> systems?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is
>>>>>>>>>>>>>>>> configured to be idempotent. The results are written to JDBC. That operator
>>>>>>>>>>>>>>>> by itself supports exactly-once through transactions (in conjunction with
>>>>>>>>>>>>>>>> idempotent input), hence there is no need to configure the processing mode
>>>>>>>>>>>>>>>> at all.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Should we use malhar-library version 3.5 then ?


Thanks!!

On Fri, Oct 7, 2016 at 1:04 PM, Thomas Weise <th...@apache.org> wrote:

> Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This
> operator is not in malhar-library, it's a separate module.
>
>
> On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Hi Siyuan,
>>
>> I am using the same Kafka producer as you mentioned. But I am not seeing
>> the AbstractKafkaOutputOperator in malhar library while import.
>>
>>
>> Thanks!!
>>
>> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com>
>> wrote:
>>
>>> Also which kafka output operator you are using?
>>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
>>> of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with
>>> MapR stream, the latter only works with kafka 0.8.* or 0.9
>>>
>>> Regards,
>>> Siyuan
>>>
>>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com>
>>> wrote:
>>>
>>>> Hey Jaspal,
>>>>
>>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>>> from malhar?  If so please make sure the producer you use here is
>>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>>> supported by MapR stream.
>>>>
>>>>
>>>> Regards,
>>>> Siyuan
>>>>
>>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Thomas,
>>>>>
>>>>> Below is the operator implementation we are trying to run. This
>>>>> operator is getting an object of tenant class from updtream operator.
>>>>>
>>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>>
>>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>>
>>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>>
>>>>>         Gson gson = new Gson();
>>>>>
>>>>>         @Override
>>>>>         public void process(Tenant tenant) {
>>>>>
>>>>>             try {
>>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>>                 long now = System.currentTimeMillis();
>>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>>                 if (tenant != null) {
>>>>>                     //Tenant tenant = tenant.next();
>>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>                         //puts.add(dao.mkPut(tenant));
>>>>>                     } else {
>>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>>
>>>>>                     }
>>>>>                     producer.flush();
>>>>>                 }
>>>>>             }
>>>>>
>>>>>
>>>>> After building the application, it throws error during launch:
>>>>>
>>>>> An error occurred trying to launch the application. Server message:
>>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Thomas,
>>>>>>
>>>>>> I was trying to refer to the input from previous operator.
>>>>>>
>>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>>>> need to specify <String, T> ? Since we are getting an object of class type
>>>>>> from previous operator.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>>> Are you referring to the upstream operator in the DAG or the state
>>>>>>> of the previous application after relaunch? Since the data is stored in
>>>>>>> MapR streams, an operator that is a producer can also act as a consumer.
>>>>>>> Please clarify your question.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Thomas,
>>>>>>>>
>>>>>>>> I have a question, so when we are using
>>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>>>> operator ?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>>
>>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>>> on.java#L33
>>>>>>>>>
>>>>>>>>> That will also apply to stateful restart of the entire application
>>>>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>>>>
>>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>>
>>>>>>>>> Thomas
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>>
>>>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>>>
>>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What you want is the effect of exactly-once output (that's why
>>>>>>>>>>> we call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>>
>>>>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>>>
>>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>>>>> systems?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured
>>>>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@apache.org>.
Please make sure to depend on version 3.5 of malhar-kafka in pom.xml. This
operator is not in malhar-library, it's a separate module.


On Fri, Oct 7, 2016 at 11:01 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Hi Siyuan,
>
> I am using the same Kafka producer as you mentioned. But I am not seeing
> the AbstractKafkaOutputOperator in malhar library while import.
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com>
> wrote:
>
>> Also which kafka output operator you are using?
>> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
>> of com.datatorrent.contrib.kafka.AbstractOutputOperator.
>> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with
>> MapR stream, the latter only works with kafka 0.8.* or 0.9
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com>
>> wrote:
>>
>>> Hey Jaspal,
>>>
>>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>>> from malhar?  If so please make sure the producer you use here is
>>> org.apache.kafka.clients.producer.KafkaProducer instead of
>>> kafka.javaapi.producer.Producer.  That is old api and that is not
>>> supported by MapR stream.
>>>
>>>
>>> Regards,
>>> Siyuan
>>>
>>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> Thomas,
>>>>
>>>> Below is the operator implementation we are trying to run. This
>>>> operator is getting an object of tenant class from updtream operator.
>>>>
>>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>>
>>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>>
>>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>>
>>>>         Gson gson = new Gson();
>>>>
>>>>         @Override
>>>>         public void process(Tenant tenant) {
>>>>
>>>>             try {
>>>>                 Producer<String, String> producer = getKafkaProducer();
>>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>>                 long now = System.currentTimeMillis();
>>>>                 //Configuration conf = HBaseConfiguration.create();
>>>>                 //TenantDao dao = new TenantDao(conf);
>>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>>                 if (tenant != null) {
>>>>                     //Tenant tenant = tenant.next();
>>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>                         //puts.add(dao.mkPut(tenant));
>>>>                     } else {
>>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>>
>>>>                     }
>>>>                     producer.flush();
>>>>                 }
>>>>             }
>>>>
>>>>
>>>> After building the application, it throws error during launch:
>>>>
>>>> An error occurred trying to launch the application. Server message:
>>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Thomas,
>>>>>
>>>>> I was trying to refer to the input from previous operator.
>>>>>
>>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>>> need to specify <String, T> ? Since we are getting an object of class type
>>>>> from previous operator.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>>> Are you referring to the upstream operator in the DAG or the state of
>>>>>> the previous application after relaunch? Since the data is stored in MapR
>>>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>>>> clarify your question.
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Thomas,
>>>>>>>
>>>>>>> I have a question, so when we are using
>>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>>> operator ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>>>
>>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>>
>>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>>> on.java#L33
>>>>>>>>
>>>>>>>> That will also apply to stateful restart of the entire application
>>>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>>>
>>>>>>>> For cold restart, you would need to consider the property you
>>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>>
>>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>>
>>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>>> instead of producing duplicates.
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>>
>>>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>>
>>>>>>>>>>>> The operator will ensure that messages are not duplicated,
>>>>>>>>>>>> under the stated assumptions.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>>>> systems?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured
>>>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Hi Siyuan,

I am using the same Kafka producer as you mentioned. But I am not seeing
the AbstractKafkaOutputOperator in malhar library while import.


Thanks!!

On Fri, Oct 7, 2016 at 12:52 PM, hsy541@gmail.com <hs...@gmail.com> wrote:

> Also which kafka output operator you are using?
> Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
> of com.datatorrent.contrib.kafka.AbstractOutputOperator.
> Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with
> MapR stream, the latter only works with kafka 0.8.* or 0.9
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com>
> wrote:
>
>> Hey Jaspal,
>>
>> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator
>> from malhar?  If so please make sure the producer you use here is
>> org.apache.kafka.clients.producer.KafkaProducer instead of
>> kafka.javaapi.producer.Producer.  That is old api and that is not
>> supported by MapR stream.
>>
>>
>> Regards,
>> Siyuan
>>
>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> Thomas,
>>>
>>> Below is the operator implementation we are trying to run. This operator
>>> is getting an object of tenant class from updtream operator.
>>>
>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>
>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>
>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>
>>>         Gson gson = new Gson();
>>>
>>>         @Override
>>>         public void process(Tenant tenant) {
>>>
>>>             try {
>>>                 Producer<String, String> producer = getKafkaProducer();
>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>                 long now = System.currentTimeMillis();
>>>                 //Configuration conf = HBaseConfiguration.create();
>>>                 //TenantDao dao = new TenantDao(conf);
>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>                 if (tenant != null) {
>>>                     //Tenant tenant = tenant.next();
>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>                         //puts.add(dao.mkPut(tenant));
>>>                     } else {
>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>
>>>                     }
>>>                     producer.flush();
>>>                 }
>>>             }
>>>
>>>
>>> After building the application, it throws error during launch:
>>>
>>> An error occurred trying to launch the application. Server message:
>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>> jaspal.singh1404@gmail.com> wrote:
>>>
>>>> Thomas,
>>>>
>>>> I was trying to refer to the input from previous operator.
>>>>
>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>> need to specify <String, T> ? Since we are getting an object of class type
>>>> from previous operator.
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> Are you referring to the upstream operator in the DAG or the state of
>>>>> the previous application after relaunch? Since the data is stored in MapR
>>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>>> clarify your question.
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> I have a question, so when we are using
>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>> operator ?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>
>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>> on.java#L33
>>>>>>>
>>>>>>> That will also apply to stateful restart of the entire application
>>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>>
>>>>>>> For cold restart, you would need to consider the property you
>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>
>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>
>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>> instead of producing duplicates.
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>
>>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>
>>>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>>>> the stated assumptions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>>> systems?
>>>>>>>>>>>>>
>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured
>>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Also which kafka output operator you are using?
Please use org.apache.apex.malhar.kafka.AbstractOutputOperator instead
of com.datatorrent.contrib.kafka.AbstractOutputOperator.
Only the org.apache.apex.malhar.kafka.AbstractOutputOperator works with
MapR stream, the latter only works with kafka 0.8.* or 0.9

Regards,
Siyuan

On Fri, Oct 7, 2016 at 10:38 AM, hsy541@gmail.com <hs...@gmail.com> wrote:

> Hey Jaspal,
>
> Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from
> malhar?  If so please make sure the producer you use here
> is org.apache.kafka.clients.producer.KafkaProducer instead of
> kafka.javaapi.producer.Producer.  That is old api and that is not
> supported by MapR stream.
>
>
> Regards,
> Siyuan
>
> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Thomas,
>>
>> Below is the operator implementation we are trying to run. This operator
>> is getting an object of tenant class from updtream operator.
>>
>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>
>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>
>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>
>>         Gson gson = new Gson();
>>
>>         @Override
>>         public void process(Tenant tenant) {
>>
>>             try {
>>                 Producer<String, String> producer = getKafkaProducer();
>>                 //ObjectMapper mapper = new ObjectMapper();
>>                 long now = System.currentTimeMillis();
>>                 //Configuration conf = HBaseConfiguration.create();
>>                 //TenantDao dao = new TenantDao(conf);
>>                 //ArrayList<Put> puts = new ArrayList<>();
>>                 if (tenant != null) {
>>                     //Tenant tenant = tenant.next();
>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>                         //puts.add(dao.mkPut(tenant));
>>                     } else {
>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>
>>                     }
>>                     producer.flush();
>>                 }
>>             }
>>
>>
>> After building the application, it throws error during launch:
>>
>> An error occurred trying to launch the application. Server message:
>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>> java.lang.Class.getDeclaredFields0(Native Method) at
>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>> > wrote:
>>
>>> Thomas,
>>>
>>> I was trying to refer to the input from previous operator.
>>>
>>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>>> to specify <String, T> ? Since we are getting an object of class type from
>>> previous operator.
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Are you referring to the upstream operator in the DAG or the state of
>>>> the previous application after relaunch? Since the data is stored in MapR
>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>> clarify your question.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> I have a question, so when we are using
>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>> operator ?
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>>> For recovery you need to set the window data manager like so:
>>>>>>
>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>> on.java#L33
>>>>>>
>>>>>> That will also apply to stateful restart of the entire application
>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>
>>>>>> For cold restart, you would need to consider the property you mention
>>>>>> and decide what is applicable to your use case.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>
>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>> ranges to replay in same order from kafka.
>>>>>>>
>>>>>>> Is there any property we need to configure to do that? like
>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>> instead of producing duplicates.
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I think this is something called a customized operator
>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>
>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>
>>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>>> the stated assumptions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> In our case we are writing the results back to maprstreams topic
>>>>>>>>>>> based on some validations.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>> systems?
>>>>>>>>>>>>
>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to
>>>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Munagala Ramanath <ra...@datatorrent.com>.
Please post the following:

1. Entire pom.xml
2. output of "mvn dependency:tree"
3. output of "jar tvf " run on your application package file (with .apa
extension)

Ram

On Fri, Oct 7, 2016 at 10:10 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Thomas,
>
> We have added the dependency in pom.xml for lafka client API and also for
> malhar kafka. Please highlight if you are specifying some other dependency
> that we need to add.
>
> <dependency>
>   <groupId>org.apache.apex</groupId>
>   <artifactId>malhar-kafka</artifactId>
>   <version>${malhar.version}</version>
>   <exclusions>
>     <exclusion>
>       <groupId>org.apache.kafka</groupId>
>       <artifactId>kafka-clients</artifactId>
>     </exclusion>
>   </exclusions>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.kafka</groupId>
>   <artifactId>kafka-clients</artifactId>
>   <version>0.9.0.0-mapr-1602-streams-5.1.0</version>
>
> </dependency>
>
>
> Thanks!!
>
> On Fri, Oct 7, 2016 at 12:04 PM, Thomas Weise <th...@apache.org> wrote:
>
>> It looks like the Kafka API dependency is missing. Can you please check
>> it is part of the .apa file?
>>
>> To your previous question: The records/tuples/objects are moved by the
>> Apex engine through the stream from operator to operator. There is nothing
>> you need to do beyond connecting the operator ports with addStream when you
>> specify the DAG.
>>
>>
>> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> Thomas,
>>>
>>> Below is the operator implementation we are trying to run. This operator
>>> is getting an object of tenant class from updtream operator.
>>>
>>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>>
>>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>>
>>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>>
>>>         Gson gson = new Gson();
>>>
>>>         @Override
>>>         public void process(Tenant tenant) {
>>>
>>>             try {
>>>                 Producer<String, String> producer = getKafkaProducer();
>>>                 //ObjectMapper mapper = new ObjectMapper();
>>>                 long now = System.currentTimeMillis();
>>>                 //Configuration conf = HBaseConfiguration.create();
>>>                 //TenantDao dao = new TenantDao(conf);
>>>                 //ArrayList<Put> puts = new ArrayList<>();
>>>                 if (tenant != null) {
>>>                     //Tenant tenant = tenant.next();
>>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>>                         //puts.add(dao.mkPut(tenant));
>>>                     } else {
>>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>>
>>>                     }
>>>                     producer.flush();
>>>                 }
>>>             }
>>>
>>>
>>> After building the application, it throws error during launch:
>>>
>>> An error occurred trying to launch the application. Server message:
>>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>>> java.lang.Class.getDeclaredFields0(Native Method) at
>>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <
>>> jaspal.singh1404@gmail.com> wrote:
>>>
>>>> Thomas,
>>>>
>>>> I was trying to refer to the input from previous operator.
>>>>
>>>> Another thing when we extend the AbstractKafkaOutputOperator, do we
>>>> need to specify <String, T> ? Since we are getting an object of class type
>>>> from previous operator.
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> Are you referring to the upstream operator in the DAG or the state of
>>>>> the previous application after relaunch? Since the data is stored in MapR
>>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>>> clarify your question.
>>>>>
>>>>>
>>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> I have a question, so when we are using
>>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>>> operator ?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>>> For recovery you need to set the window data manager like so:
>>>>>>>
>>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>>> on.java#L33
>>>>>>>
>>>>>>> That will also apply to stateful restart of the entire application
>>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>>
>>>>>>> For cold restart, you would need to consider the property you
>>>>>>> mention and decide what is applicable to your use case.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>>
>>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>>> ranges to replay in same order from kafka.
>>>>>>>>
>>>>>>>> Is there any property we need to configure to do that? like
>>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>>> instead of producing duplicates.
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I think this is something called a customized operator
>>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>>
>>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <
>>>>>>>>>> thomas.weise@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>>
>>>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>>>> the stated assumptions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> In our case we are writing the results back to
>>>>>>>>>>>> maprstreams topic based on some validations.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Jaspal
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>>> systems?
>>>>>>>>>>>>>
>>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured
>>>>>>>>>>>>> to be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Thomas,

We have added the dependency in pom.xml for lafka client API and also for
malhar kafka. Please highlight if you are specifying some other dependency
that we need to add.

<dependency>
  <groupId>org.apache.apex</groupId>
  <artifactId>malhar-kafka</artifactId>
  <version>${malhar.version}</version>
  <exclusions>
    <exclusion>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
    </exclusion>
  </exclusions>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.0-mapr-1602-streams-5.1.0</version>

</dependency>


Thanks!!

On Fri, Oct 7, 2016 at 12:04 PM, Thomas Weise <th...@apache.org> wrote:

> It looks like the Kafka API dependency is missing. Can you please check it
> is part of the .apa file?
>
> To your previous question: The records/tuples/objects are moved by the
> Apex engine through the stream from operator to operator. There is nothing
> you need to do beyond connecting the operator ports with addStream when you
> specify the DAG.
>
>
> On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Thomas,
>>
>> Below is the operator implementation we are trying to run. This operator
>> is getting an object of tenant class from updtream operator.
>>
>> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>>
>>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>>
>>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>>
>>         Gson gson = new Gson();
>>
>>         @Override
>>         public void process(Tenant tenant) {
>>
>>             try {
>>                 Producer<String, String> producer = getKafkaProducer();
>>                 //ObjectMapper mapper = new ObjectMapper();
>>                 long now = System.currentTimeMillis();
>>                 //Configuration conf = HBaseConfiguration.create();
>>                 //TenantDao dao = new TenantDao(conf);
>>                 //ArrayList<Put> puts = new ArrayList<>();
>>                 if (tenant != null) {
>>                     //Tenant tenant = tenant.next();
>>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>>                         //puts.add(dao.mkPut(tenant));
>>                     } else {
>>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>>
>>                     }
>>                     producer.flush();
>>                 }
>>             }
>>
>>
>> After building the application, it throws error during launch:
>>
>> An error occurred trying to launch the application. Server message:
>> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
>> java.lang.Class.getDeclaredFields0(Native Method) at
>> java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
>> java.lang.Class.getDeclaredFields(Class.java:1916) at
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>> > wrote:
>>
>>> Thomas,
>>>
>>> I was trying to refer to the input from previous operator.
>>>
>>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>>> to specify <String, T> ? Since we are getting an object of class type from
>>> previous operator.
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Are you referring to the upstream operator in the DAG or the state of
>>>> the previous application after relaunch? Since the data is stored in MapR
>>>> streams, an operator that is a producer can also act as a consumer. Please
>>>> clarify your question.
>>>>
>>>>
>>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> I have a question, so when we are using
>>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>>> maprstream topic will it be able to read messgaes from the previous
>>>>> operator ?
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>>> For recovery you need to set the window data manager like so:
>>>>>>
>>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>>> on.java#L33
>>>>>>
>>>>>> That will also apply to stateful restart of the entire application
>>>>>> (relaunch from previous instance's checkpointed state).
>>>>>>
>>>>>> For cold restart, you would need to consider the property you mention
>>>>>> and decide what is applicable to your use case.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>>
>>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>>> ranges to replay in same order from kafka.
>>>>>>>
>>>>>>> Is there any property we need to configure to do that? like
>>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>>> and the output operator can discard the results that were already published
>>>>>>>> instead of producing duplicates.
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I think this is something called a customized operator
>>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>>
>>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> In that case please have a look at:
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>>
>>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>>> the stated assumptions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>>
>>>>>>>>>>> In our case we are writing the results back to maprstreams topic
>>>>>>>>>>> based on some validations.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Jaspal
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>>> systems?
>>>>>>>>>>>>
>>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to
>>>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>>
>>>>>>>>>>>> Thomas
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by "hsy541@gmail.com" <hs...@gmail.com>.
Hey Jaspal,

Did you add any code to existing KafkaSinglePortExactlyOnceOutputOperator from
malhar?  If so please make sure the producer you use here
is org.apache.kafka.clients.producer.KafkaProducer instead of
kafka.javaapi.producer.Producer.  That is old api and that is not supported
by MapR stream.


Regards,
Siyuan

On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Thomas,
>
> Below is the operator implementation we are trying to run. This operator
> is getting an object of tenant class from updtream operator.
>
> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>
>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
> After building the application, it throws error during launch:
>
> An error occurred trying to launch the application. Server message:
> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
> java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.
> privateGetDeclaredFields(Class.java:2583) at java.lang.Class.
> getDeclaredFields(Class.java:1916) at
>
>
> Thanks
> Jaspal
>
> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Thomas,
>>
>> I was trying to refer to the input from previous operator.
>>
>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>> to specify <String, T> ? Since we are getting an object of class type from
>> previous operator.
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>
>>> Are you referring to the upstream operator in the DAG or the state of
>>> the previous application after relaunch? Since the data is stored in MapR
>>> streams, an operator that is a producer can also act as a consumer. Please
>>> clarify your question.
>>>
>>>
>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> I have a question, so when we are using
>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>> maprstream topic will it be able to read messgaes from the previous
>>>> operator ?
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> For recovery you need to set the window data manager like so:
>>>>>
>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>> on.java#L33
>>>>>
>>>>> That will also apply to stateful restart of the entire application
>>>>> (relaunch from previous instance's checkpointed state).
>>>>>
>>>>> For cold restart, you would need to consider the property you mention
>>>>> and decide what is applicable to your use case.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>
>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>> ranges to replay in same order from kafka.
>>>>>>
>>>>>> Is there any property we need to configure to do that? like
>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>>
>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>> and the output operator can discard the results that were already published
>>>>>>> instead of producing duplicates.
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> I think this is something called a customized operator
>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>
>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In that case please have a look at:
>>>>>>>>>
>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>
>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>> the stated assumptions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> In our case we are writing the results back to maprstreams topic
>>>>>>>>>> based on some validations.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>> systems?
>>>>>>>>>>>
>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to
>>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@apache.org>.
It looks like the Kafka API dependency is missing. Can you please check it
is part of the .apa file?

To your previous question: The records/tuples/objects are moved by the Apex
engine through the stream from operator to operator. There is nothing you
need to do beyond connecting the operator ports with addStream when you
specify the DAG.


On Fri, Oct 7, 2016 at 9:01 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Thomas,
>
> Below is the operator implementation we are trying to run. This operator
> is getting an object of tenant class from updtream operator.
>
> public class KafkaSinglePortExactlyOnceOutputOperator extends AbstractKafkaOutputOperator {
>
>     private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
>
>     public transient final DefaultInputPort<Tenant> in = new DefaultInputPort<Tenant>() {
>
>         Gson gson = new Gson();
>
>         @Override
>         public void process(Tenant tenant) {
>
>             try {
>                 Producer<String, String> producer = getKafkaProducer();
>                 //ObjectMapper mapper = new ObjectMapper();
>                 long now = System.currentTimeMillis();
>                 //Configuration conf = HBaseConfiguration.create();
>                 //TenantDao dao = new TenantDao(conf);
>                 //ArrayList<Put> puts = new ArrayList<>();
>                 if (tenant != null) {
>                     //Tenant tenant = tenant.next();
>                     if (StringUtils.isNotEmpty(tenant.getGl())) {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate", tenant.getVolumeName(), gson.toJson(tenant)));
>                         //puts.add(dao.mkPut(tenant));
>                     } else {
>                         producer.send(new ProducerRecord<String, String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error", tenant.getVolumeName(), gson.toJson(tenant)));
>
>                     }
>                     producer.flush();
>                 }
>             }
>
>
> After building the application, it throws error during launch:
>
> An error occurred trying to launch the application. Server message:
> java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
> java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.
> privateGetDeclaredFields(Class.java:2583) at java.lang.Class.
> getDeclaredFields(Class.java:1916) at
>
>
> Thanks
> Jaspal
>
> On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Thomas,
>>
>> I was trying to refer to the input from previous operator.
>>
>> Another thing when we extend the AbstractKafkaOutputOperator, do we need
>> to specify <String, T> ? Since we are getting an object of class type from
>> previous operator.
>>
>>
>> Thanks
>> Jaspal
>>
>> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>>
>>> Are you referring to the upstream operator in the DAG or the state of
>>> the previous application after relaunch? Since the data is stored in MapR
>>> streams, an operator that is a producer can also act as a consumer. Please
>>> clarify your question.
>>>
>>>
>>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> I have a question, so when we are using
>>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>>> maprstream topic will it be able to read messgaes from the previous
>>>> operator ?
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> For recovery you need to set the window data manager like so:
>>>>>
>>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>>> s/exactly-once/src/main/java/com/example/myapexapp/Applicati
>>>>> on.java#L33
>>>>>
>>>>> That will also apply to stateful restart of the entire application
>>>>> (relaunch from previous instance's checkpointed state).
>>>>>
>>>>> For cold restart, you would need to consider the property you mention
>>>>> and decide what is applicable to your use case.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>>
>>>>>> One more thing, so you mentioned about checkpointing the offset
>>>>>> ranges to replay in same order from kafka.
>>>>>>
>>>>>> Is there any property we need to configure to do that? like
>>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>>
>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>>> and the output operator can discard the results that were already published
>>>>>>> instead of producing duplicates.
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> I think this is something called a customized operator
>>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>>
>>>>>>>> What if any previous operators fail ? How we can make sure they
>>>>>>>> also recover using EXACTLY_ONCE processing mode ?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> In that case please have a look at:
>>>>>>>>>
>>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>>> yOnceOutputOperator.java
>>>>>>>>>
>>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>>> the stated assumptions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> In our case we are writing the results back to maprstreams topic
>>>>>>>>>> based on some validations.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Jaspal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>>> systems?
>>>>>>>>>>>
>>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to
>>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>>
>>>>>>>>>>> Thomas
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Thomas,

Below is the operator implementation we are trying to run. This operator is
getting an object of tenant class from updtream operator.

public class KafkaSinglePortExactlyOnceOutputOperator extends
AbstractKafkaOutputOperator {

    private static final Logger LOG =
LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);

    public transient final DefaultInputPort<Tenant> in = new
DefaultInputPort<Tenant>() {

        Gson gson = new Gson();

        @Override
        public void process(Tenant tenant) {

            try {
                Producer<String, String> producer = getKafkaProducer();
                //ObjectMapper mapper = new ObjectMapper();
                long now = System.currentTimeMillis();
                //Configuration conf = HBaseConfiguration.create();
                //TenantDao dao = new TenantDao(conf);
                //ArrayList<Put> puts = new ArrayList<>();
                if (tenant != null) {
                    //Tenant tenant = tenant.next();
                    if (StringUtils.isNotEmpty(tenant.getGl())) {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:certUpdate",
tenant.getVolumeName(), gson.toJson(tenant)));
                        //puts.add(dao.mkPut(tenant));
                    } else {
                        producer.send(new ProducerRecord<String,
String>("/datalake/corporate/ses_dlpoc/stream_fabbddev/jsingh73/sample-stream:error",
tenant.getVolumeName(), gson.toJson(tenant)));

                    }
                    producer.flush();
                }
            }


After building the application, it throws error during launch:

An error occurred trying to launch the application. Server message:
java.lang.NoClassDefFoundError: Lkafka/javaapi/producer/Producer; at
java.lang.Class.getDeclaredFields0(Native Method) at
java.lang.Class.privateGetDeclaredFields(Class.java:2583) at
java.lang.Class.getDeclaredFields(Class.java:1916) at


Thanks
Jaspal

On Fri, Oct 7, 2016 at 10:42 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Thomas,
>
> I was trying to refer to the input from previous operator.
>
> Another thing when we extend the AbstractKafkaOutputOperator, do we need
> to specify <String, T> ? Since we are getting an object of class type from
> previous operator.
>
>
> Thanks
> Jaspal
>
> On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:
>
>> Are you referring to the upstream operator in the DAG or the state of the
>> previous application after relaunch? Since the data is stored in MapR
>> streams, an operator that is a producer can also act as a consumer. Please
>> clarify your question.
>>
>>
>> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> Hi Thomas,
>>>
>>> I have a question, so when we are using
>>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>>> maprstream topic will it be able to read messgaes from the previous
>>> operator ?
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>>
>>>> For recovery you need to set the window data manager like so:
>>>>
>>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>>> s/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33
>>>>
>>>> That will also apply to stateful restart of the entire application
>>>> (relaunch from previous instance's checkpointed state).
>>>>
>>>> For cold restart, you would need to consider the property you mention
>>>> and decide what is applicable to your use case.
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>>
>>>>> One more thing, so you mentioned about checkpointing the offset ranges
>>>>> to replay in same order from kafka.
>>>>>
>>>>> Is there any property we need to configure to do that? like
>>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>>
>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> What you want is the effect of exactly-once output (that's why we
>>>>>> call it also end-to-end exactly-once). There is no such thing as
>>>>>> exactly-once processing in a distributed system. In this case it would be
>>>>>> rather "produce exactly-once. Upstream operators, on failure, will recover
>>>>>> to checkpointed state and re-process the stream from there. This is
>>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>>> configured to replay in the same order from Kafka (this is done by
>>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>>> and the output operator can discard the results that were already published
>>>>>> instead of producing duplicates.
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> I think this is something called a customized operator
>>>>>>> implementation that is taking care of exactly once processing at output.
>>>>>>>
>>>>>>> What if any previous operators fail ? How we can make sure they also
>>>>>>> recover using EXACTLY_ONCE processing mode ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> In that case please have a look at:
>>>>>>>>
>>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>>> yOnceOutputOperator.java
>>>>>>>>
>>>>>>>> The operator will ensure that messages are not duplicated, under
>>>>>>>> the stated assumptions.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Thomas,
>>>>>>>>>
>>>>>>>>> In our case we are writing the results back to maprstreams topic based
>>>>>>>>> on some validations.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Jaspal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> which operators in your application are writing to external
>>>>>>>>>> systems?
>>>>>>>>>>
>>>>>>>>>> When you look at the example from the blog (
>>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to
>>>>>>>>>> be idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>>
>>>>>>>>>> Thomas
>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Thomas,

I was trying to refer to the input from previous operator.

Another thing when we extend the AbstractKafkaOutputOperator, do we need to
specify <String, T> ? Since we are getting an object of class type from
previous operator.


Thanks
Jaspal

On Fri, Oct 7, 2016 at 10:12 AM, Thomas Weise <th...@apache.org> wrote:

> Are you referring to the upstream operator in the DAG or the state of the
> previous application after relaunch? Since the data is stored in MapR
> streams, an operator that is a producer can also act as a consumer. Please
> clarify your question.
>
>
> On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Hi Thomas,
>>
>> I have a question, so when we are using
>> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
>> maprstream topic will it be able to read messgaes from the previous
>> operator ?
>>
>>
>> Thanks
>> Jaspal
>>
>> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>>
>>> For recovery you need to set the window data manager like so:
>>>
>>> https://github.com/DataTorrent/examples/blob/master/tutorial
>>> s/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33
>>>
>>> That will also apply to stateful restart of the entire application
>>> (relaunch from previous instance's checkpointed state).
>>>
>>> For cold restart, you would need to consider the property you mention
>>> and decide what is applicable to your use case.
>>>
>>> Thomas
>>>
>>>
>>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> Ok now I get it. Thanks for the nice explaination !!
>>>>
>>>> One more thing, so you mentioned about checkpointing the offset ranges
>>>> to replay in same order from kafka.
>>>>
>>>> Is there any property we need to configure to do that? like
>>>> initialOffset set to APPLICATION_OR_LATEST.
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>>
>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>> wrote:
>>>>
>>>>> What you want is the effect of exactly-once output (that's why we call
>>>>> it also end-to-end exactly-once). There is no such thing as exactly-once
>>>>> processing in a distributed system. In this case it would be rather
>>>>> "produce exactly-once. Upstream operators, on failure, will recover to
>>>>> checkpointed state and re-process the stream from there. This is
>>>>> at-least-once, the default behavior. Because in the input operator you have
>>>>> configured to replay in the same order from Kafka (this is done by
>>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>>> and the output operator can discard the results that were already published
>>>>> instead of producing duplicates.
>>>>>
>>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> I think this is something called a customized operator implementation
>>>>>> that is taking care of exactly once processing at output.
>>>>>>
>>>>>> What if any previous operators fail ? How we can make sure they also
>>>>>> recover using EXACTLY_ONCE processing mode ?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>>
>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> In that case please have a look at:
>>>>>>>
>>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>>> yOnceOutputOperator.java
>>>>>>>
>>>>>>> The operator will ensure that messages are not duplicated, under the
>>>>>>> stated assumptions.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Thomas,
>>>>>>>>
>>>>>>>> In our case we are writing the results back to maprstreams topic based
>>>>>>>> on some validations.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Jaspal
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> which operators in your application are writing to external
>>>>>>>>> systems?
>>>>>>>>>
>>>>>>>>> When you look at the example from the blog (
>>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to be
>>>>>>>>> idempotent. The results are written to JDBC. That operator by itself
>>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>>
>>>>>>>>> Thomas
>>>>>>>>>
>>>>>>>>>
>>>>>
>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@apache.org>.
Are you referring to the upstream operator in the DAG or the state of the
previous application after relaunch? Since the data is stored in MapR
streams, an operator that is a producer can also act as a consumer. Please
clarify your question.


On Fri, Oct 7, 2016 at 7:59 AM, Jaspal Singh <ja...@gmail.com>
wrote:

> Hi Thomas,
>
> I have a question, so when we are using
> *KafkaSinglePortExactlyOnceOutputOperator* to write results into
> maprstream topic will it be able to read messgaes from the previous
> operator ?
>
>
> Thanks
> Jaspal
>
> On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:
>
>> For recovery you need to set the window data manager like so:
>>
>> https://github.com/DataTorrent/examples/blob/master/
>> tutorials/exactly-once/src/main/java/com/example/myapexap
>> p/Application.java#L33
>>
>> That will also apply to stateful restart of the entire application
>> (relaunch from previous instance's checkpointed state).
>>
>> For cold restart, you would need to consider the property you mention and
>> decide what is applicable to your use case.
>>
>> Thomas
>>
>>
>> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> Ok now I get it. Thanks for the nice explaination !!
>>>
>>> One more thing, so you mentioned about checkpointing the offset ranges
>>> to replay in same order from kafka.
>>>
>>> Is there any property we need to configure to do that? like
>>> initialOffset set to APPLICATION_OR_LATEST.
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>>
>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>> wrote:
>>>
>>>> What you want is the effect of exactly-once output (that's why we call
>>>> it also end-to-end exactly-once). There is no such thing as exactly-once
>>>> processing in a distributed system. In this case it would be rather
>>>> "produce exactly-once. Upstream operators, on failure, will recover to
>>>> checkpointed state and re-process the stream from there. This is
>>>> at-least-once, the default behavior. Because in the input operator you have
>>>> configured to replay in the same order from Kafka (this is done by
>>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>>> and the output operator can discard the results that were already published
>>>> instead of producing duplicates.
>>>>
>>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> I think this is something called a customized operator implementation
>>>>> that is taking care of exactly once processing at output.
>>>>>
>>>>> What if any previous operators fail ? How we can make sure they also
>>>>> recover using EXACTLY_ONCE processing mode ?
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>>
>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> In that case please have a look at:
>>>>>>
>>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>>> yOnceOutputOperator.java
>>>>>>
>>>>>> The operator will ensure that messages are not duplicated, under the
>>>>>> stated assumptions.
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Thomas,
>>>>>>>
>>>>>>> In our case we are writing the results back to maprstreams topic based
>>>>>>> on some validations.
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jaspal
>>>>>>>
>>>>>>>
>>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> which operators in your application are writing to external systems?
>>>>>>>>
>>>>>>>> When you look at the example from the blog (
>>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>>> ls/exactly-once), there is Kafka input, which is configured to be
>>>>>>>> idempotent. The results are written to JDBC. That operator by itself
>>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>>
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>
>>
>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Hi Thomas,

I have a question, so when we are using
*KafkaSinglePortExactlyOnceOutputOperator* to write results into maprstream
topic will it be able to read messgaes from the previous operator ?


Thanks
Jaspal

On Thu, Oct 6, 2016 at 6:28 PM, Thomas Weise <th...@apache.org> wrote:

> For recovery you need to set the window data manager like so:
>
> https://github.com/DataTorrent/examples/blob/
> master/tutorials/exactly-once/src/main/java/com/example/
> myapexapp/Application.java#L33
>
> That will also apply to stateful restart of the entire application
> (relaunch from previous instance's checkpointed state).
>
> For cold restart, you would need to consider the property you mention and
> decide what is applicable to your use case.
>
> Thomas
>
>
> On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <ja...@gmail.com>
> wrote:
>
>> Ok now I get it. Thanks for the nice explaination !!
>>
>> One more thing, so you mentioned about checkpointing the offset ranges
>> to replay in same order from kafka.
>>
>> Is there any property we need to configure to do that? like initialOffset
>> set to APPLICATION_OR_LATEST.
>>
>>
>> Thanks
>> Jaspal
>>
>>
>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>> wrote:
>>
>>> What you want is the effect of exactly-once output (that's why we call
>>> it also end-to-end exactly-once). There is no such thing as exactly-once
>>> processing in a distributed system. In this case it would be rather
>>> "produce exactly-once. Upstream operators, on failure, will recover to
>>> checkpointed state and re-process the stream from there. This is
>>> at-least-once, the default behavior. Because in the input operator you have
>>> configured to replay in the same order from Kafka (this is done by
>>> checkpointing the offset ranges), the computation in the DAG is idempotent
>>> and the output operator can discard the results that were already published
>>> instead of producing duplicates.
>>>
>>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> I think this is something called a customized operator implementation
>>>> that is taking care of exactly once processing at output.
>>>>
>>>> What if any previous operators fail ? How we can make sure they also
>>>> recover using EXACTLY_ONCE processing mode ?
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>>
>>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>>> wrote:
>>>>
>>>>> In that case please have a look at:
>>>>>
>>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>>> yOnceOutputOperator.java
>>>>>
>>>>> The operator will ensure that messages are not duplicated, under the
>>>>> stated assumptions.
>>>>>
>>>>>
>>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>>> jaspal.singh1404@gmail.com> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> In our case we are writing the results back to maprstreams topic based
>>>>>> on some validations.
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Jaspal
>>>>>>
>>>>>>
>>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> which operators in your application are writing to external systems?
>>>>>>>
>>>>>>> When you look at the example from the blog (
>>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>>> ls/exactly-once), there is Kafka input, which is configured to be
>>>>>>> idempotent. The results are written to JDBC. That operator by itself
>>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>>
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>
>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@apache.org>.
For recovery you need to set the window data manager like so:

https://github.com/DataTorrent/examples/blob/master/tutorials/exactly-once/src/main/java/com/example/myapexapp/Application.java#L33

That will also apply to stateful restart of the entire application
(relaunch from previous instance's checkpointed state).

For cold restart, you would need to consider the property you mention and
decide what is applicable to your use case.

Thomas


On Thu, Oct 6, 2016 at 4:16 PM, Jaspal Singh <ja...@gmail.com>
wrote:

> Ok now I get it. Thanks for the nice explaination !!
>
> One more thing, so you mentioned about checkpointing the offset ranges
> to replay in same order from kafka.
>
> Is there any property we need to configure to do that? like initialOffset
> set to APPLICATION_OR_LATEST.
>
>
> Thanks
> Jaspal
>
>
> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com> wrote:
>
>> What you want is the effect of exactly-once output (that's why we call it
>> also end-to-end exactly-once). There is no such thing as exactly-once
>> processing in a distributed system. In this case it would be rather
>> "produce exactly-once. Upstream operators, on failure, will recover to
>> checkpointed state and re-process the stream from there. This is
>> at-least-once, the default behavior. Because in the input operator you have
>> configured to replay in the same order from Kafka (this is done by
>> checkpointing the offset ranges), the computation in the DAG is idempotent
>> and the output operator can discard the results that were already published
>> instead of producing duplicates.
>>
>> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> I think this is something called a customized operator implementation
>>> that is taking care of exactly once processing at output.
>>>
>>> What if any previous operators fail ? How we can make sure they also
>>> recover using EXACTLY_ONCE processing mode ?
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>>
>>> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com>
>>> wrote:
>>>
>>>> In that case please have a look at:
>>>>
>>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>>> yOnceOutputOperator.java
>>>>
>>>> The operator will ensure that messages are not duplicated, under the
>>>> stated assumptions.
>>>>
>>>>
>>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <
>>>> jaspal.singh1404@gmail.com> wrote:
>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> In our case we are writing the results back to maprstreams topic based
>>>>> on some validations.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Jaspal
>>>>>
>>>>>
>>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> which operators in your application are writing to external systems?
>>>>>>
>>>>>> When you look at the example from the blog (
>>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>>> ls/exactly-once), there is Kafka input, which is configured to be
>>>>>> idempotent. The results are written to JDBC. That operator by itself
>>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Ok now I get it. Thanks for the nice explaination !!

One more thing, so you mentioned about checkpointing the offset ranges
to replay in same order from kafka.

Is there any property we need to configure to do that? like initialOffset
set to APPLICATION_OR_LATEST.


Thanks
Jaspal


On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com> wrote:

> What you want is the effect of exactly-once output (that's why we call it
> also end-to-end exactly-once). There is no such thing as exactly-once
> processing in a distributed system. In this case it would be rather
> "produce exactly-once. Upstream operators, on failure, will recover to
> checkpointed state and re-process the stream from there. This is
> at-least-once, the default behavior. Because in the input operator you have
> configured to replay in the same order from Kafka (this is done by
> checkpointing the offset ranges), the computation in the DAG is idempotent
> and the output operator can discard the results that were already published
> instead of producing duplicates.
>
> On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <jaspal.singh1404@gmail.com
> <javascript:_e(%7B%7D,'cvml','jaspal.singh1404@gmail.com');>> wrote:
>
>> I think this is something called a customized operator implementation
>> that is taking care of exactly once processing at output.
>>
>> What if any previous operators fail ? How we can make sure they also
>> recover using EXACTLY_ONCE processing mode ?
>>
>>
>> Thanks
>> Jaspal
>>
>>
>> On Thursday, October 6, 2016, Thomas Weise <thomas.weise@gmail.com
>> <javascript:_e(%7B%7D,'cvml','thomas.weise@gmail.com');>> wrote:
>>
>>> In that case please have a look at:
>>>
>>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>>> yOnceOutputOperator.java
>>>
>>> The operator will ensure that messages are not duplicated, under the
>>> stated assumptions.
>>>
>>>
>>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <jaspal.singh1404@gmail.com
>>> > wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> In our case we are writing the results back to maprstreams topic based
>>>> on some validations.
>>>>
>>>>
>>>> Thanks
>>>> Jaspal
>>>>
>>>>
>>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> which operators in your application are writing to external systems?
>>>>>
>>>>> When you look at the example from the blog (
>>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>>> ls/exactly-once), there is Kafka input, which is configured to be
>>>>> idempotent. The results are written to JDBC. That operator by itself
>>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>>> input), hence there is no need to configure the processing mode at all.
>>>>>
>>>>> Thomas
>>>>>
>>>>>
>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@gmail.com>.
What you want is the effect of exactly-once output (that's why we call it
also end-to-end exactly-once). There is no such thing as exactly-once
processing in a distributed system. In this case it would be rather
"produce exactly-once. Upstream operators, on failure, will recover to
checkpointed state and re-process the stream from there. This is
at-least-once, the default behavior. Because in the input operator you have
configured to replay in the same order from Kafka (this is done by
checkpointing the offset ranges), the computation in the DAG is idempotent
and the output operator can discard the results that were already published
instead of producing duplicates.

On Thu, Oct 6, 2016 at 3:57 PM, Jaspal Singh <ja...@gmail.com>
wrote:

> I think this is something called a customized operator implementation that
> is taking care of exactly once processing at output.
>
> What if any previous operators fail ? How we can make sure they also
> recover using EXACTLY_ONCE processing mode ?
>
>
> Thanks
> Jaspal
>
>
> On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com> wrote:
>
>> In that case please have a look at:
>>
>> https://github.com/apache/apex-malhar/blob/master/kafka/src/
>> main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactl
>> yOnceOutputOperator.java
>>
>> The operator will ensure that messages are not duplicated, under the
>> stated assumptions.
>>
>>
>> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <ja...@gmail.com>
>> wrote:
>>
>>> Hi Thomas,
>>>
>>> In our case we are writing the results back to maprstreams topic based
>>> on some validations.
>>>
>>>
>>> Thanks
>>> Jaspal
>>>
>>>
>>> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> which operators in your application are writing to external systems?
>>>>
>>>> When you look at the example from the blog (
>>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>>> ls/exactly-once), there is Kafka input, which is configured to be
>>>> idempotent. The results are written to JDBC. That operator by itself
>>>> supports exactly-once through transactions (in conjunction with idempotent
>>>> input), hence there is no need to configure the processing mode at all.
>>>>
>>>> Thomas
>>>>
>>>>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
I think this is something called a customized operator implementation that
is taking care of exactly once processing at output.

What if any previous operators fail ? How we can make sure they also
recover using EXACTLY_ONCE processing mode ?


Thanks
Jaspal

On Thursday, October 6, 2016, Thomas Weise <th...@gmail.com> wrote:

> In that case please have a look at:
>
> https://github.com/apache/apex-malhar/blob/master/kafka/
> src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutp
> utOperator.java
>
> The operator will ensure that messages are not duplicated, under the
> stated assumptions.
>
>
> On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <jaspal.singh1404@gmail.com
> <javascript:_e(%7B%7D,'cvml','jaspal.singh1404@gmail.com');>> wrote:
>
>> Hi Thomas,
>>
>> In our case we are writing the results back to maprstreams topic based
>> on some validations.
>>
>>
>> Thanks
>> Jaspal
>>
>>
>> On Thursday, October 6, 2016, Thomas Weise <thw@apache.org
>> <javascript:_e(%7B%7D,'cvml','thw@apache.org');>> wrote:
>>
>>> Hi,
>>>
>>> which operators in your application are writing to external systems?
>>>
>>> When you look at the example from the blog (
>>> https://github.com/DataTorrent/examples/tree/master/tutoria
>>> ls/exactly-once), there is Kafka input, which is configured to be
>>> idempotent. The results are written to JDBC. That operator by itself
>>> supports exactly-once through transactions (in conjunction with idempotent
>>> input), hence there is no need to configure the processing mode at all.
>>>
>>> Thomas
>>>
>>>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@gmail.com>.
In that case please have a look at:

https://github.com/apache/apex-malhar/blob/master/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java

The operator will ensure that messages are not duplicated, under the stated
assumptions.


On Thu, Oct 6, 2016 at 3:37 PM, Jaspal Singh <ja...@gmail.com>
wrote:

> Hi Thomas,
>
> In our case we are writing the results back to maprstreams topic based on
> some validations.
>
>
> Thanks
> Jaspal
>
>
> On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:
>
>> Hi,
>>
>> which operators in your application are writing to external systems?
>>
>> When you look at the example from the blog (https://github.com/DataTorren
>> t/examples/tree/master/tutorials/exactly-once), there is Kafka input,
>> which is configured to be idempotent. The results are written to JDBC. That
>> operator by itself supports exactly-once through transactions (in
>> conjunction with idempotent input), hence there is no need to configure the
>> processing mode at all.
>>
>> Thomas
>>
>>

Re: Datatorrent fault tolerance

Posted by Jaspal Singh <ja...@gmail.com>.
Hi Thomas,

In our case we are writing the results back to maprstreams topic based on
some validations.


Thanks
Jaspal

On Thursday, October 6, 2016, Thomas Weise <th...@apache.org> wrote:

> Hi,
>
> which operators in your application are writing to external systems?
>
> When you look at the example from the blog (https://github.com/
> DataTorrent/examples/tree/master/tutorials/exactly-once), there is Kafka
> input, which is configured to be idempotent. The results are written to
> JDBC. That operator by itself supports exactly-once through transactions
> (in conjunction with idempotent input), hence there is no need to configure
> the processing mode at all.
>
> Thomas
>
>
> On Thu, Oct 6, 2016 at 1:52 PM, Bandaru, Srinivas <
> srinivas.bandaru@optum.com
> <javascript:_e(%7B%7D,'cvml','srinivas.bandaru@optum.com');>> wrote:
>
>> Thanks Thomas. We have set the set all three operator properties, Kafka
>> input operator has set to “Exactly_once” and downstream two operators set
>> as AT_MOST_ONCE same as mentioned in documentation.  We have also referred
>> the documentation you have shared but couldn’t find the right properties
>> for exactly_once scenario. Is there anything we are missing or need to add
>> additional properties if someone else used this functionality.
>>
>>
>>
>> *Any help would be greatly appreciated??*
>>
>>
>>
>> We tried with different combinations and trying to set all three
>> properties with “EXACTLY_ONCE”, But application throwing an with error
>> saying *“If the processing mode for an operator is specified as
>> EXACTLY_ONCE then the processing mode for all downstream operators should
>> be specified as AT_MOST_ONCE otherwise it will result in an error.*”
>> Application behavior is same as the Datatorrent documentation but it’s not
>> working.
>>
>>
>>
>> Thanks,
>>
>> Srinivas
>>
>>
>>
>>
>>
>> *From:* Thomas Weise [mailto:thw@apache.org
>> <javascript:_e(%7B%7D,'cvml','thw@apache.org');>]
>> *Sent:* Thursday, October 06, 2016 3:03 PM
>> *To:* users@apex.apache.org
>> <javascript:_e(%7B%7D,'cvml','users@apex.apache.org');>
>> *Subject:* Re: Datatorrent fault tolerance
>>
>>
>>
>> Hi,
>>
>>
>>
>> It would be necessary to know a bit more about your application for
>> specific recommendations, but from what I see above, a few things don't
>> look right.
>>
>>
>>
>> It appears that you are setting the processing mode on the input
>> operator, which only reads from Kafka. Exactly-once is important for
>> operators that affect the state of external systems, for example when
>> writing to a database, or, producing messages to Kafka.
>>
>>
>>
>> There is also a problem with the documentation that may contribute to the
>> confusion. The processing mode you are trying to use isn't really
>> exactly-once and probably will be deprecated in a future release. Here are
>> some better resources on fault tolerance and processing semantics:
>>
>>
>>
>> http://apex.apache.org/docs.html
>>
>> https://www.youtube.com/watch?v=FCMY6Ii89Nw
>>
>> http://www.slideshare.net/ApacheApexOrganizer/webinar-fault-
>> toleranceandprocessingsemantics
>>
>> https://www.datatorrent.com/blog/end-to-end-exactly-once-wit
>> h-apache-apex/
>>
>>
>>
>> HTH,
>>
>> Thomas
>>
>>
>>
>>
>>
>> On Thu, Oct 6, 2016 at 11:26 AM, Bandaru, Srinivas <
>> srinivas.bandaru@optum.com
>> <javascript:_e(%7B%7D,'cvml','srinivas.bandaru@optum.com');>> wrote:
>>
>> Hi, Need some help!
>>
>> Need your advice in implementing the fault tolerance mechanism using
>> datatorrent. Currently we are trying to implement with EXACTLY_ONCE
>> scenario.
>>
>> We are referring the below documentation. The highlighted line says if we
>> need to use the EXACTLY_ONCE processing mode, the downstream operators
>> should have AT_MOST_ONCE. We added the operator level attributes in the
>> properties.xml (attached below) and when we launch the application, no
>> messages are coming through the last operator *topicUpdate *but we are
>> able to see the messages when we comment those attributes.
>>
>> *Please have a look at it, either we are specifying the attributes
>> incorrectly or is there any additional step to be done*.
>>
>>
>>
>>
>>
>> ·         *PROCESSING_MODE*
>>
>> static final Attribute <https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Attribute.html><Operator.ProcessingMode <https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.ProcessingMode.html>> PROCESSING_MODE
>>
>> The payload processing mode for this operator - at most once, exactly
>> once, or default at least once. If the processing mode for an operator is
>> specified as AT_MOST_ONCE and no processing mode is specified for the
>> downstream operators if any, the processing mode of the downstream
>> operators is automatically set to AT_MOST_ONCE. If a different processing
>> mode is specified for the downstream operators it will result in an error. If
>> the processing mode for an operator is specified as EXACTLY_ONCE then the
>> processing mode for all downstream operators should be specified as
>> AT_MOST_ONCE otherwise it will result in an error.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Srinivas
>>
>>
>> This e-mail, including attachments, may include confidential and/or
>> proprietary information, and may be used only by the person or entity
>> to which it is addressed. If the reader of this e-mail is not the intended
>> recipient or his or her authorized agent, the reader is hereby notified
>> that any dissemination, distribution or copying of this e-mail is
>> prohibited. If you have received this e-mail in error, please notify the
>> sender by replying to this message and delete this e-mail immediately.
>>
>>
>>
>>
>> This e-mail, including attachments, may include confidential and/or
>> proprietary information, and may be used only by the person or entity
>> to which it is addressed. If the reader of this e-mail is not the intended
>> recipient or his or her authorized agent, the reader is hereby notified
>> that any dissemination, distribution or copying of this e-mail is
>> prohibited. If you have received this e-mail in error, please notify the
>> sender by replying to this message and delete this e-mail immediately.
>>
>
>

Re: Datatorrent fault tolerance

Posted by Thomas Weise <th...@apache.org>.
Hi,

which operators in your application are writing to external systems?

When you look at the example from the blog (
https://github.com/DataTorrent/examples/tree/master/tutorials/exactly-once),
there is Kafka input, which is configured to be idempotent. The results are
written to JDBC. That operator by itself supports exactly-once through
transactions (in conjunction with idempotent input), hence there is no need
to configure the processing mode at all.

Thomas


On Thu, Oct 6, 2016 at 1:52 PM, Bandaru, Srinivas <
srinivas.bandaru@optum.com> wrote:

> Thanks Thomas. We have set the set all three operator properties, Kafka
> input operator has set to “Exactly_once” and downstream two operators set
> as AT_MOST_ONCE same as mentioned in documentation.  We have also referred
> the documentation you have shared but couldn’t find the right properties
> for exactly_once scenario. Is there anything we are missing or need to add
> additional properties if someone else used this functionality.
>
>
>
> *Any help would be greatly appreciated??*
>
>
>
> We tried with different combinations and trying to set all three
> properties with “EXACTLY_ONCE”, But application throwing an with error
> saying *“If the processing mode for an operator is specified as
> EXACTLY_ONCE then the processing mode for all downstream operators should
> be specified as AT_MOST_ONCE otherwise it will result in an error.*”
> Application behavior is same as the Datatorrent documentation but it’s not
> working.
>
>
>
> Thanks,
>
> Srinivas
>
>
>
>
>
> *From:* Thomas Weise [mailto:thw@apache.org]
> *Sent:* Thursday, October 06, 2016 3:03 PM
> *To:* users@apex.apache.org
> *Subject:* Re: Datatorrent fault tolerance
>
>
>
> Hi,
>
>
>
> It would be necessary to know a bit more about your application for
> specific recommendations, but from what I see above, a few things don't
> look right.
>
>
>
> It appears that you are setting the processing mode on the input operator,
> which only reads from Kafka. Exactly-once is important for operators that
> affect the state of external systems, for example when writing to a
> database, or, producing messages to Kafka.
>
>
>
> There is also a problem with the documentation that may contribute to the
> confusion. The processing mode you are trying to use isn't really
> exactly-once and probably will be deprecated in a future release. Here are
> some better resources on fault tolerance and processing semantics:
>
>
>
> http://apex.apache.org/docs.html
>
> https://www.youtube.com/watch?v=FCMY6Ii89Nw
>
> http://www.slideshare.net/ApacheApexOrganizer/webinar-fault-
> toleranceandprocessingsemantics
>
> https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/
>
>
>
> HTH,
>
> Thomas
>
>
>
>
>
> On Thu, Oct 6, 2016 at 11:26 AM, Bandaru, Srinivas <
> srinivas.bandaru@optum.com> wrote:
>
> Hi, Need some help!
>
> Need your advice in implementing the fault tolerance mechanism using
> datatorrent. Currently we are trying to implement with EXACTLY_ONCE
> scenario.
>
> We are referring the below documentation. The highlighted line says if we
> need to use the EXACTLY_ONCE processing mode, the downstream operators
> should have AT_MOST_ONCE. We added the operator level attributes in the
> properties.xml (attached below) and when we launch the application, no
> messages are coming through the last operator *topicUpdate *but we are
> able to see the messages when we comment those attributes.
>
> *Please have a look at it, either we are specifying the attributes
> incorrectly or is there any additional step to be done*.
>
>
>
>
>
> ·         *PROCESSING_MODE*
>
> static final Attribute <https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Attribute.html><Operator.ProcessingMode <https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.ProcessingMode.html>> PROCESSING_MODE
>
> The payload processing mode for this operator - at most once, exactly
> once, or default at least once. If the processing mode for an operator is
> specified as AT_MOST_ONCE and no processing mode is specified for the
> downstream operators if any, the processing mode of the downstream
> operators is automatically set to AT_MOST_ONCE. If a different processing
> mode is specified for the downstream operators it will result in an error. If
> the processing mode for an operator is specified as EXACTLY_ONCE then the
> processing mode for all downstream operators should be specified as
> AT_MOST_ONCE otherwise it will result in an error.
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
>
> Srinivas
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>

RE: Datatorrent fault tolerance

Posted by "Bandaru, Srinivas" <sr...@optum.com>.
Thanks Thomas. We have set the set all three operator properties, Kafka input operator has set to “Exactly_once” and downstream two operators set as AT_MOST_ONCE same as mentioned in documentation.  We have also referred the documentation you have shared but couldn’t find the right properties for exactly_once scenario. Is there anything we are missing or need to add additional properties if someone else used this functionality.

Any help would be greatly appreciated??

We tried with different combinations and trying to set all three properties with “EXACTLY_ONCE”, But application throwing an with error saying “If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators should be specified as AT_MOST_ONCE otherwise it will result in an error.” Application behavior is same as the Datatorrent documentation but it’s not working.

Thanks,
Srinivas


From: Thomas Weise [mailto:thw@apache.org]
Sent: Thursday, October 06, 2016 3:03 PM
To: users@apex.apache.org
Subject: Re: Datatorrent fault tolerance

Hi,

It would be necessary to know a bit more about your application for specific recommendations, but from what I see above, a few things don't look right.

It appears that you are setting the processing mode on the input operator, which only reads from Kafka. Exactly-once is important for operators that affect the state of external systems, for example when writing to a database, or, producing messages to Kafka.

There is also a problem with the documentation that may contribute to the confusion. The processing mode you are trying to use isn't really exactly-once and probably will be deprecated in a future release. Here are some better resources on fault tolerance and processing semantics:

http://apex.apache.org/docs.html
https://www.youtube.com/watch?v=FCMY6Ii89Nw
http://www.slideshare.net/ApacheApexOrganizer/webinar-fault-toleranceandprocessingsemantics
https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/

HTH,
Thomas


On Thu, Oct 6, 2016 at 11:26 AM, Bandaru, Srinivas <sr...@optum.com>> wrote:
Hi, Need some help!
Need your advice in implementing the fault tolerance mechanism using datatorrent. Currently we are trying to implement with EXACTLY_ONCE scenario.
We are referring the below documentation. The highlighted line says if we need to use the EXACTLY_ONCE processing mode, the downstream operators should have AT_MOST_ONCE. We added the operator level attributes in the properties.xml (attached below) and when we launch the application, no messages are coming through the last operator topicUpdate but we are able to see the messages when we comment those attributes.
Please have a look at it, either we are specifying the attributes incorrectly or is there any additional step to be done.


•         PROCESSING_MODE

static final Attribute<https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Attribute.html><Operator.ProcessingMode<https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.ProcessingMode.html>> PROCESSING_MODE
The payload processing mode for this operator - at most once, exactly once, or default at least once. If the processing mode for an operator is specified as AT_MOST_ONCE and no processing mode is specified for the downstream operators if any, the processing mode of the downstream operators is automatically set to AT_MOST_ONCE. If a different processing mode is specified for the downstream operators it will result in an error. If the processing mode for an operator is specified as EXACTLY_ONCE then the processing mode for all downstream operators should be specified as AT_MOST_ONCE otherwise it will result in an error.



[cid:image001.png@01D21FE8.1DEA6520]


[cid:image002.png@01D21FE8.1DEA6520]

Thanks,
Srinivas

This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.


This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.