You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Vivek Bhide <vi...@target.com> on 2018/03/15 21:49:31 UTC

Need help on achieving end to end exactly once with KafkaIn and KafakOut

This is a continuation of my previous question about
KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
end exactly once processing with data receiving from one Kafka topic and
finally posting it to another Kafka topic. Below are three things needed, as
Pramod mentioned in one of the presentations, to achieve this

1. Idempotent inuput operator
2. stateful operator recovery (Something that Apex provides out of the box)
3. Action by OutputOperator (similar to what AbstractFileOutputOperator
does)

I am using the KafkaSinglePortInputOperator from
org.apache.apex.malhar.kafka package which is not an idempotent by default.
I made it idempotent as below

public abstract class CustomAbstractKafkaInputOperator extends
AbstractKafkaInputOperator {

	@Override
	public void setup(OperatorContext context) {
		super.setWindowDataManager(new FSWindowDataManager());
		super.setup(context);
	}

}

public class CustomKafkaSinglePortInputOperator extends
CustomAbstractKafkaInputOperator {

	public final transient DefaultOutputPort<byte[]> outputPort = new
DefaultOutputPort<>();

	@Override
	public AbstractKafkaConsumer createConsumer(Properties properties) {
		return new KafkaConsumer09(properties);
	}

	@Override
	protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]>
message) {
		outputPort.emit(message.value());
	}
}

For output operator I am using KafkaSinglePortExactlyOnceOutputOperator from
org.apache.apex.malhar.kafka package. With reference to my previous
question, I made sure that both/all applicable mentioned conditions are
satisfied and even after that, I am receiving below error 

2018-03-15 12:28:49,485 INFO com.datatorrent.stram.StreamingContainerParent:
child msg: Stopped running due to an exception. java.lang.RuntimeException:
Violates Exactly once. Not all the tuples received after operator reset.
	at
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
	at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
	at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
	at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
 context:
PTContainer[id=4(container_1521139241132_0003_01_000013),state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=PENDING_DEPLOY]]]

Questions I have are 

1. Is the way input operator made idempotent correct? or am i missing
anything
2. Do I need to make each and every operator in pipeline idempotent to
achieve this? As per my understanding, Not; because once the mapping between
tuple to window is established at first operator, it doen't change anywhere
furhter in pipeline

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Vivek Bhide <vi...@target.com>.
Hi Pramod,

I tried it but I am not getting consistent results. It worked few times but
then again failed with same error. Is there anything else that you will
recommend to validate?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Vivek Bhide <vi...@target.com>.
Can someone please confirm our findings? It very critical for us to solve
this issue since the whole pipeline's functionality is at stake

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Sandesh Hegde <sa...@datatorrent.com>.
If you are looking for the output operator that supports Kafka 0.11 then it
is implemented by DataTorrent, will be contributed back to the community in
the coming weeks. As far as the bug in the earlier versions of the operator
is concerned it is an open item, which needs to be picked up by the
community.

On Wed, Mar 21, 2018 at 4:39 PM Vivek Bhide <vi...@target.com> wrote:

> Thanks Sandesh for confirmation.. Can you point us to updated version of
> this
> Output operator?
>
> Regards
> Vivek
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Vivek Bhide <vi...@target.com>.
Thanks Sandesh for confirmation.. Can you point us to updated version of this
Output operator?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Sandesh Hegde <sa...@datatorrent.com>.
Kafka exactly once output operator is assuming that partial window data
will come in the same window after recovery. This is a bug which needs to
be fixed. This doesn't affect Kafka 0.11, as a different mechanism is used
to achieve exactly once.

On Wed, Mar 21, 2018 at 11:04 AM Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Any window that was not complete by the time the operator died is not
> replayed by definition (as we don't have all the data in the window) and
> the output operators should also not expect that. In your case if the
> operator died during window ..24 then on restart you can expect that the
> input operator with the data manager will replay all windows from
> checkpoint till and including the window prior to failure, in an idempotent
> fashion, but not the window during which failure happened. Also in
> idempotent replay, the window is treated as the replay unit, so the exact
> data within windows is replayed but order is not guaranteed generally
> because of partitioning the data can arrive in different order than the
> previous run at the output operators. Typically the output operators in the
> library that do exactly once do understand and work with these definitions,
> so not sure exactly why the kafka output operator is reporting exactly once
> violation for an incomplete window. Maybe somebody who is well versed with
> the output operator code can comment?
>
> Thanks
>
> On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <vi...@target.com>
> wrote:
>
>> Hi Pramod,
>>
>> We did some more research by adding more logging to the KafkaInput
>> operator
>> and below are our findings.
>>
>> Application Setup:
>> 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
>> 2. Streaming window for application is set to 5 seconds from 0.5 seconds
>> for
>> easily reproducing the issue
>> 3. Created 2 custom classes by for Input and Output operator only for the
>> purpose of adding debugging logs
>>
>> Logs for KafkaIn before operator failure :
>> --------------------------------------------
>> 2018-03-20 19:36:49,494 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771822 : 48
>> 2018-03-20 19:36:49,599 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
>> 2018-03-20 19:36:54,496 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771823 : 48
>> 2018-03-20 19:36:54,578 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> Logs of the KafKaIn after recovery :
>> ------------------------------------
>> CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771822 : 48
>> 2018-03-20 19:37:06,664 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
>> 2018-03-20 19:37:06,665 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771823 : 48
>> 2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
>> (AsyncFSStorageAgent.java:save(91)) - using
>>
>> /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
>> as the basepath for checkpointing.
>> 2018-03-20 19:37:06,727 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771824 : 0
>> 2018-03-20 19:37:06,768 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771825 : 0
>> 2018-03-20 19:37:06,810 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> Logs of the KafkaOutput operator :
>> -----------------------------------
>>
>> 2018-03-20 19:37:06,616 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771822
>> 2018-03-20 19:37:06,617 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>>
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
>> - Rebuild the partial window after 6535189514237771823
>> 2018-03-20 19:37:07,943 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>>
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
>> - Partitial Window tuples :
>> {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
>> name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
>> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
>> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
>> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
>> 2018-03-20 19:37:07,944 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771823
>> 2018-03-20 19:37:07,944 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771824
>> 2018-03-20 19:37:07,945 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
>> Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
>> randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
>> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
>> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
>> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
>> 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
>> (StreamingContainer.java:run(1456)) - Operator set
>>
>> [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
>> 0,
>>
>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
>> stopped running due to an exception.
>> java.lang.RuntimeException: Violates Exactly once. Not all the tuples
>> received after operator reset.
>>         at
>>
>> com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
>>         at
>>
>> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
>>         at
>> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
>>         at
>>
>> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
>> 2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
>> (KafkaProducer.java:close(613)) - Closing the Kafka producer with
>> timeoutMillis = 9223372036854775807 ms.
>> 2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
>> (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy
>> request:
>> [3]
>>
>>
>> If you see the logs from KafKa In before and after, the last window that
>> operator processed is 6535189514237771823 and while processing
>> 6535189514237771824 it got killed. You can also see that the first tuple
>> from window 6535189514237771824 is
>> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
>> When Operator recovers it replays the tuple correctly till
>> 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
>> 6535189514237771825 window ids and then send the complete accumulated
>> tuples
>> in 6535189514237771826 with 1st tuple as
>> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> This as per my understanding is not an idempotent behavior since the tuple
>> assignment before failure changed after recovery. Please correct me if I
>> am
>> wrong. This we believe is casuing the failure for output operator because
>> we
>> see that it recovers correctly with partially processed window
>> 6535189514237771824. (Please refer the logs). We also verfied it by adding
>> consumer on output topic
>>
>> Could you please confirm if its an issue and needs fix? and suggest one if
>> possible?
>>
>> Regards
>> Vivek
>>
>>
>>
>> --
>> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>>
>
>

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Any window that was not complete by the time the operator died is not
replayed by definition (as we don't have all the data in the window) and
the output operators should also not expect that. In your case if the
operator died during window ..24 then on restart you can expect that the
input operator with the data manager will replay all windows from
checkpoint till and including the window prior to failure, in an idempotent
fashion, but not the window during which failure happened. Also in
idempotent replay, the window is treated as the replay unit, so the exact
data within windows is replayed but order is not guaranteed generally
because of partitioning the data can arrive in different order than the
previous run at the output operators. Typically the output operators in the
library that do exactly once do understand and work with these definitions,
so not sure exactly why the kafka output operator is reporting exactly once
violation for an incomplete window. Maybe somebody who is well versed with
the output operator code can comment?

Thanks

On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <vi...@target.com> wrote:

> Hi Pramod,
>
> We did some more research by adding more logging to the KafkaInput operator
> and below are our findings.
>
> Application Setup:
> 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
> 2. Streaming window for application is set to 5 seconds from 0.5 seconds
> for
> easily reproducing the issue
> 3. Created 2 custom classes by for Input and Output operator only for the
> purpose of adding debugging logs
>
> Logs for KafkaIn before operator failure :
> --------------------------------------------
> 2018-03-20 19:36:49,494 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771822 : 48
> 2018-03-20 19:36:49,599 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
> 2018-03-20 19:36:54,496 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771823 : 48
> 2018-03-20 19:36:54,578 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>
> Logs of the KafKaIn after recovery :
> ------------------------------------
> CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771822 : 48
> 2018-03-20 19:37:06,664 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
> 2018-03-20 19:37:06,665 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771823 : 48
> 2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
> (AsyncFSStorageAgent.java:save(91)) - using
> /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/
> application_1519410901484_172884/container_e3125_1519410901484_172884_01_
> 000005/tmp/chkp4360474156134593331
> as the basepath for checkpointing.
> 2018-03-20 19:37:06,727 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771824 : 0
> 2018-03-20 19:37:06,768 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771825 : 0
> 2018-03-20 19:37:06,810 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>
> Logs of the KafkaOutput operator :
> -----------------------------------
>
> 2018-03-20 19:37:06,616 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
> Current Window : 6535189514237771822
> 2018-03-20 19:37:06,617 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:
> rebuildPartialWindow(203))
> - Rebuild the partial window after 6535189514237771823
> 2018-03-20 19:37:07,943 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:
> rebuildPartialWindow(304))
> - Partitial Window tuples :
> {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
> name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
> 2018-03-20 19:37:07,944 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
> Current Window : 6535189514237771823
> 2018-03-20 19:37:07,944 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
> Current Window : 6535189514237771824
> 2018-03-20 19:37:07,945 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
> Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
> randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
> 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
> (StreamingContainer.java:run(1456)) - Operator set
> [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=
> GENERIC,checkpoint={5ab1a83d00000029,
> 0,
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=
> inputPort,streamId=output,sourceNodeId=2,sourcePortName=
> output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
> stopped running due to an exception.
> java.lang.RuntimeException: Violates Exactly once. Not all the tuples
> received after operator reset.
>         at
> com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOn
> ceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOn
> ceOutputOperator.java:117)
>         at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(
> GenericNode.java:153)
>         at com.datatorrent.stram.engine.GenericNode.run(GenericNode.
> java:397)
>         at
> com.datatorrent.stram.engine.StreamingContainer$2.run(
> StreamingContainer.java:1428)
> 2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
> (KafkaProducer.java:close(613)) - Closing the Kafka producer with
> timeoutMillis = 9223372036854775807 ms.
> 2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
> (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy
> request:
> [3]
>
>
> If you see the logs from KafKa In before and after, the last window that
> operator processed is 6535189514237771823 and while processing
> 6535189514237771824 it got killed. You can also see that the first tuple
> from window 6535189514237771824 is {"id":145,"name":"
> GTNQLMEVGRWRHZQANCVM"}.
> When Operator recovers it replays the tuple correctly till
> 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
> 6535189514237771825 window ids and then send the complete accumulated
> tuples
> in 6535189514237771826 with 1st tuple as
> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>
> This as per my understanding is not an idempotent behavior since the tuple
> assignment before failure changed after recovery. Please correct me if I am
> wrong. This we believe is casuing the failure for output operator because
> we
> see that it recovers correctly with partially processed window
> 6535189514237771824. (Please refer the logs). We also verfied it by adding
> consumer on output topic
>
> Could you please confirm if its an issue and needs fix? and suggest one if
> possible?
>
> Regards
> Vivek
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Vivek Bhide <vi...@target.com>.
Hi Pramod,

We did some more research by adding more logging to the KafkaInput operator
and below are our findings.

Application Setup: 
1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
2. Streaming window for application is set to 5 seconds from 0.5 seconds for
easily reproducing the issue
3. Created 2 custom classes by for Input and Output operator only for the
purpose of adding debugging logs

Logs for KafkaIn before operator failure :
--------------------------------------------
2018-03-20 19:36:49,494 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:36:49,599 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:36:54,496 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:36:54,578 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafKaIn after recovery :
------------------------------------
CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:37:06,664 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:37:06,665 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
(AsyncFSStorageAgent.java:save(91)) - using
/grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
as the basepath for checkpointing.
2018-03-20 19:37:06,727 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771824 : 0
2018-03-20 19:37:06,768 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771825 : 0
2018-03-20 19:37:06,810 INFO 
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafkaOutput operator :
-----------------------------------

2018-03-20 19:37:06,616 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771822
2018-03-20 19:37:06,617 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
- Rebuild the partial window after 6535189514237771823
2018-03-20 19:37:07,943 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
- Partitial Window tuples : 
{id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,944 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771823
2018-03-20 19:37:07,944 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771824
2018-03-20 19:37:07,945 INFO 
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
(StreamingContainer.java:run(1456)) - Operator set
[OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples
received after operator reset.
	at
com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
	at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
	at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
	at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
(KafkaProducer.java:close(613)) - Closing the Kafka producer with
timeoutMillis = 9223372036854775807 ms.
2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
(StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy request:
[3]


If you see the logs from KafKa In before and after, the last window that
operator processed is 6535189514237771823 and while processing
6535189514237771824 it got killed. You can also see that the first tuple
from window 6535189514237771824 is {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
When Operator recovers it replays the tuple correctly till
6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
6535189514237771825 window ids and then send the complete accumulated tuples
in 6535189514237771826 with 1st tuple as
{"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

This as per my understanding is not an idempotent behavior since the tuple
assignment before failure changed after recovery. Please correct me if I am
wrong. This we believe is casuing the failure for output operator because we
see that it recovers correctly with partially processed window
6535189514237771824. (Please refer the logs). We also verfied it by adding
consumer on output topic

Could you please confirm if its an issue and needs fix? and suggest one if
possible?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Have you tried instantiating FSWindowDataManger and setting it on the
operator instance in populateDAG. There may be state associated with the
window manager which will be lost if you set a new instance each time in
setup. You should be able to use KakfaSinglePortInputOperator directly.

On Thu, Mar 15, 2018 at 2:49 PM, Vivek Bhide <vi...@target.com> wrote:

> This is a continuation of my previous question about
> KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
> end exactly once processing with data receiving from one Kafka topic and
> finally posting it to another Kafka topic. Below are three things needed,
> as
> Pramod mentioned in one of the presentations, to achieve this
>
> 1. Idempotent inuput operator
> 2. stateful operator recovery (Something that Apex provides out of the box)
> 3. Action by OutputOperator (similar to what AbstractFileOutputOperator
> does)
>
> I am using the KafkaSinglePortInputOperator from
> org.apache.apex.malhar.kafka package which is not an idempotent by default.
> I made it idempotent as below
>
> public abstract class CustomAbstractKafkaInputOperator extends
> AbstractKafkaInputOperator {
>
>         @Override
>         public void setup(OperatorContext context) {
>                 super.setWindowDataManager(new FSWindowDataManager());
>                 super.setup(context);
>         }
>
> }
>
> public class CustomKafkaSinglePortInputOperator extends
> CustomAbstractKafkaInputOperator {
>
>         public final transient DefaultOutputPort<byte[]> outputPort = new
> DefaultOutputPort<>();
>
>         @Override
>         public AbstractKafkaConsumer createConsumer(Properties properties)
> {
>                 return new KafkaConsumer09(properties);
>         }
>
>         @Override
>         protected void emitTuple(String cluster, ConsumerRecord<byte[],
> byte[]>
> message) {
>                 outputPort.emit(message.value());
>         }
> }
>
> For output operator I am using KafkaSinglePortExactlyOnceOutputOperator
> from
> org.apache.apex.malhar.kafka package. With reference to my previous
> question, I made sure that both/all applicable mentioned conditions are
> satisfied and even after that, I am receiving below error
>
> 2018-03-15 12:28:49,485 INFO com.datatorrent.stram.
> StreamingContainerParent:
> child msg: Stopped running due to an exception. java.lang.RuntimeException:
> Violates Exactly once. Not all the tuples received after operator reset.
>         at
> org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutp
> utOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
>         at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(
> GenericNode.java:153)
>         at com.datatorrent.stram.engine.GenericNode.run(GenericNode.
> java:397)
>         at
> com.datatorrent.stram.engine.StreamingContainer$2.run(
> StreamingContainer.java:1428)
>  context:
> PTContainer[id=4(container_1521139241132_0003_01_000013),
> state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=
> PENDING_DEPLOY]]]
>
> Questions I have are
>
> 1. Is the way input operator made idempotent correct? or am i missing
> anything
> 2. Do I need to make each and every operator in pipeline idempotent to
> achieve this? As per my understanding, Not; because once the mapping
> between
> tuple to window is established at first operator, it doen't change anywhere
> furhter in pipeline
>
> Regards
> Vivek
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>