You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Vivek Bhide <vi...@target.com> on 2018/03/15 21:49:31 UTC
Need help on achieving end to end exactly once with KafkaIn and
KafakOut
This is a continuation of my previous question about
KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
end exactly once processing with data receiving from one Kafka topic and
finally posting it to another Kafka topic. Below are three things needed, as
Pramod mentioned in one of the presentations, to achieve this
1. Idempotent inuput operator
2. stateful operator recovery (Something that Apex provides out of the box)
3. Action by OutputOperator (similar to what AbstractFileOutputOperator
does)
I am using the KafkaSinglePortInputOperator from
org.apache.apex.malhar.kafka package which is not an idempotent by default.
I made it idempotent as below
public abstract class CustomAbstractKafkaInputOperator extends
AbstractKafkaInputOperator {
@Override
public void setup(OperatorContext context) {
super.setWindowDataManager(new FSWindowDataManager());
super.setup(context);
}
}
public class CustomKafkaSinglePortInputOperator extends
CustomAbstractKafkaInputOperator {
public final transient DefaultOutputPort<byte[]> outputPort = new
DefaultOutputPort<>();
@Override
public AbstractKafkaConsumer createConsumer(Properties properties) {
return new KafkaConsumer09(properties);
}
@Override
protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]>
message) {
outputPort.emit(message.value());
}
}
For output operator I am using KafkaSinglePortExactlyOnceOutputOperator from
org.apache.apex.malhar.kafka package. With reference to my previous
question, I made sure that both/all applicable mentioned conditions are
satisfied and even after that, I am receiving below error
2018-03-15 12:28:49,485 INFO com.datatorrent.stram.StreamingContainerParent:
child msg: Stopped running due to an exception. java.lang.RuntimeException:
Violates Exactly once. Not all the tuples received after operator reset.
at
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
context:
PTContainer[id=4(container_1521139241132_0003_01_000013),state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=PENDING_DEPLOY]]]
Questions I have are
1. Is the way input operator made idempotent correct? or am i missing
anything
2. Do I need to make each and every operator in pipeline idempotent to
achieve this? As per my understanding, Not; because once the mapping between
tuple to window is established at first operator, it doen't change anywhere
furhter in pipeline
Regards
Vivek
--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: Need help on achieving end to end exactly once with KafkaIn and
KafakOut
Posted by Vivek Bhide <vi...@target.com>.
Hi Pramod,
I tried it but I am not getting consistent results. It worked few times but
then again failed with same error. Is there anything else that you will
recommend to validate?
Regards
Vivek
--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: Need help on achieving end to end exactly once with KafkaIn and
KafakOut
Posted by Vivek Bhide <vi...@target.com>.
Can someone please confirm our findings? It very critical for us to solve
this issue since the whole pipeline's functionality is at stake
Regards
Vivek
--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut
Posted by Sandesh Hegde <sa...@datatorrent.com>.
If you are looking for the output operator that supports Kafka 0.11 then it
is implemented by DataTorrent, will be contributed back to the community in
the coming weeks. As far as the bug in the earlier versions of the operator
is concerned it is an open item, which needs to be picked up by the
community.
On Wed, Mar 21, 2018 at 4:39 PM Vivek Bhide <vi...@target.com> wrote:
> Thanks Sandesh for confirmation.. Can you point us to updated version of
> this
> Output operator?
>
> Regards
> Vivek
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>
Re: Need help on achieving end to end exactly once with KafkaIn and
KafakOut
Posted by Vivek Bhide <vi...@target.com>.
Thanks Sandesh for confirmation.. Can you point us to updated version of this
Output operator?
Regards
Vivek
--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut
Posted by Sandesh Hegde <sa...@datatorrent.com>.
Kafka exactly once output operator is assuming that partial window data
will come in the same window after recovery. This is a bug which needs to
be fixed. This doesn't affect Kafka 0.11, as a different mechanism is used
to achieve exactly once.
On Wed, Mar 21, 2018 at 11:04 AM Pramod Immaneni <pr...@datatorrent.com>
wrote:
> Any window that was not complete by the time the operator died is not
> replayed by definition (as we don't have all the data in the window) and
> the output operators should also not expect that. In your case if the
> operator died during window ..24 then on restart you can expect that the
> input operator with the data manager will replay all windows from
> checkpoint till and including the window prior to failure, in an idempotent
> fashion, but not the window during which failure happened. Also in
> idempotent replay, the window is treated as the replay unit, so the exact
> data within windows is replayed but order is not guaranteed generally
> because of partitioning the data can arrive in different order than the
> previous run at the output operators. Typically the output operators in the
> library that do exactly once do understand and work with these definitions,
> so not sure exactly why the kafka output operator is reporting exactly once
> violation for an incomplete window. Maybe somebody who is well versed with
> the output operator code can comment?
>
> Thanks
>
> On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <vi...@target.com>
> wrote:
>
>> Hi Pramod,
>>
>> We did some more research by adding more logging to the KafkaInput
>> operator
>> and below are our findings.
>>
>> Application Setup:
>> 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
>> 2. Streaming window for application is set to 5 seconds from 0.5 seconds
>> for
>> easily reproducing the issue
>> 3. Created 2 custom classes by for Input and Output operator only for the
>> purpose of adding debugging logs
>>
>> Logs for KafkaIn before operator failure :
>> --------------------------------------------
>> 2018-03-20 19:36:49,494 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771822 : 48
>> 2018-03-20 19:36:49,599 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
>> 2018-03-20 19:36:54,496 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771823 : 48
>> 2018-03-20 19:36:54,578 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> Logs of the KafKaIn after recovery :
>> ------------------------------------
>> CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771822 : 48
>> 2018-03-20 19:37:06,664 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
>> 2018-03-20 19:37:06,665 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771823 : 48
>> 2018-03-20 19:37:06,720 INFO util.AsyncFSStorageAgent
>> (AsyncFSStorageAgent.java:save(91)) - using
>>
>> /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
>> as the basepath for checkpointing.
>> 2018-03-20 19:37:06,727 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771824 : 0
>> 2018-03-20 19:37:06,768 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
>> processed in window 6535189514237771825 : 0
>> 2018-03-20 19:37:06,810 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
>> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
>> window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> Logs of the KafkaOutput operator :
>> -----------------------------------
>>
>> 2018-03-20 19:37:06,616 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771822
>> 2018-03-20 19:37:06,617 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>>
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
>> - Rebuild the partial window after 6535189514237771823
>> 2018-03-20 19:37:07,943 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>>
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
>> - Partitial Window tuples :
>> {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
>> name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
>> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
>> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
>> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
>> 2018-03-20 19:37:07,944 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771823
>> 2018-03-20 19:37:07,944 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
>> Current Window : 6535189514237771824
>> 2018-03-20 19:37:07,945 INFO
>> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
>> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
>> Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
>> randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
>> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
>> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
>> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
>> 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
>> (StreamingContainer.java:run(1456)) - Operator set
>>
>> [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
>> 0,
>>
>> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
>> stopped running due to an exception.
>> java.lang.RuntimeException: Violates Exactly once. Not all the tuples
>> received after operator reset.
>> at
>>
>> com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
>> at
>>
>> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
>> at
>> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
>> at
>>
>> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
>> 2018-03-20 19:37:07,964 INFO producer.KafkaProducer
>> (KafkaProducer.java:close(613)) - Closing the Kafka producer with
>> timeoutMillis = 9223372036854775807 ms.
>> 2018-03-20 19:37:08,515 INFO engine.StreamingContainer
>> (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy
>> request:
>> [3]
>>
>>
>> If you see the logs from KafKa In before and after, the last window that
>> operator processed is 6535189514237771823 and while processing
>> 6535189514237771824 it got killed. You can also see that the first tuple
>> from window 6535189514237771824 is
>> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
>> When Operator recovers it replays the tuple correctly till
>> 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
>> 6535189514237771825 window ids and then send the complete accumulated
>> tuples
>> in 6535189514237771826 with 1st tuple as
>> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>>
>> This as per my understanding is not an idempotent behavior since the tuple
>> assignment before failure changed after recovery. Please correct me if I
>> am
>> wrong. This we believe is casuing the failure for output operator because
>> we
>> see that it recovers correctly with partially processed window
>> 6535189514237771824. (Please refer the logs). We also verfied it by adding
>> consumer on output topic
>>
>> Could you please confirm if its an issue and needs fix? and suggest one if
>> possible?
>>
>> Regards
>> Vivek
>>
>>
>>
>> --
>> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>>
>
>
Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut
Posted by Pramod Immaneni <pr...@datatorrent.com>.
Any window that was not complete by the time the operator died is not
replayed by definition (as we don't have all the data in the window) and
the output operators should also not expect that. In your case if the
operator died during window ..24 then on restart you can expect that the
input operator with the data manager will replay all windows from
checkpoint till and including the window prior to failure, in an idempotent
fashion, but not the window during which failure happened. Also in
idempotent replay, the window is treated as the replay unit, so the exact
data within windows is replayed but order is not guaranteed generally
because of partitioning the data can arrive in different order than the
previous run at the output operators. Typically the output operators in the
library that do exactly once do understand and work with these definitions,
so not sure exactly why the kafka output operator is reporting exactly once
violation for an incomplete window. Maybe somebody who is well versed with
the output operator code can comment?
Thanks
On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <vi...@target.com> wrote:
> Hi Pramod,
>
> We did some more research by adding more logging to the KafkaInput operator
> and below are our findings.
>
> Application Setup:
> 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
> 2. Streaming window for application is set to 5 seconds from 0.5 seconds
> for
> easily reproducing the issue
> 3. Created 2 custom classes by for Input and Output operator only for the
> purpose of adding debugging logs
>
> Logs for KafkaIn before operator failure :
> --------------------------------------------
> 2018-03-20 19:36:49,494 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771822 : 48
> 2018-03-20 19:36:49,599 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
> 2018-03-20 19:36:54,496 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771823 : 48
> 2018-03-20 19:36:54,578 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>
> Logs of the KafKaIn after recovery :
> ------------------------------------
> CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771822 : 48
> 2018-03-20 19:37:06,664 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
> 2018-03-20 19:37:06,665 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771823 : 48
> 2018-03-20 19:37:06,720 INFO util.AsyncFSStorageAgent
> (AsyncFSStorageAgent.java:save(91)) - using
> /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/
> application_1519410901484_172884/container_e3125_1519410901484_172884_01_
> 000005/tmp/chkp4360474156134593331
> as the basepath for checkpointing.
> 2018-03-20 19:37:06,727 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771824 : 0
> 2018-03-20 19:37:06,768 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
> processed in window 6535189514237771825 : 0
> 2018-03-20 19:37:06,810 INFO
> kafkaoutputdedup.CustomKafkaSinglePortInputOperator
> (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
> window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>
> Logs of the KafkaOutput operator :
> -----------------------------------
>
> 2018-03-20 19:37:06,616 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
> Current Window : 6535189514237771822
> 2018-03-20 19:37:06,617 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:
> rebuildPartialWindow(203))
> - Rebuild the partial window after 6535189514237771823
> 2018-03-20 19:37:07,943 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:
> rebuildPartialWindow(304))
> - Partitial Window tuples :
> {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
> name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
> 2018-03-20 19:37:07,944 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
> Current Window : 6535189514237771823
> 2018-03-20 19:37:07,944 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
> Current Window : 6535189514237771824
> 2018-03-20 19:37:07,945 INFO
> kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
> (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
> Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
> randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
> name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
> name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
> name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
> 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
> (StreamingContainer.java:run(1456)) - Operator set
> [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=
> GENERIC,checkpoint={5ab1a83d00000029,
> 0,
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=
> inputPort,streamId=output,sourceNodeId=2,sourcePortName=
> output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
> stopped running due to an exception.
> java.lang.RuntimeException: Violates Exactly once. Not all the tuples
> received after operator reset.
> at
> com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOn
> ceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOn
> ceOutputOperator.java:117)
> at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(
> GenericNode.java:153)
> at com.datatorrent.stram.engine.GenericNode.run(GenericNode.
> java:397)
> at
> com.datatorrent.stram.engine.StreamingContainer$2.run(
> StreamingContainer.java:1428)
> 2018-03-20 19:37:07,964 INFO producer.KafkaProducer
> (KafkaProducer.java:close(613)) - Closing the Kafka producer with
> timeoutMillis = 9223372036854775807 ms.
> 2018-03-20 19:37:08,515 INFO engine.StreamingContainer
> (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy
> request:
> [3]
>
>
> If you see the logs from KafKa In before and after, the last window that
> operator processed is 6535189514237771823 and while processing
> 6535189514237771824 it got killed. You can also see that the first tuple
> from window 6535189514237771824 is {"id":145,"name":"
> GTNQLMEVGRWRHZQANCVM"}.
> When Operator recovers it replays the tuple correctly till
> 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
> 6535189514237771825 window ids and then send the complete accumulated
> tuples
> in 6535189514237771826 with 1st tuple as
> {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
>
> This as per my understanding is not an idempotent behavior since the tuple
> assignment before failure changed after recovery. Please correct me if I am
> wrong. This we believe is casuing the failure for output operator because
> we
> see that it recovers correctly with partially processed window
> 6535189514237771824. (Please refer the logs). We also verfied it by adding
> consumer on output topic
>
> Could you please confirm if its an issue and needs fix? and suggest one if
> possible?
>
> Regards
> Vivek
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>
Re: Need help on achieving end to end exactly once with KafkaIn and
KafakOut
Posted by Vivek Bhide <vi...@target.com>.
Hi Pramod,
We did some more research by adding more logging to the KafkaInput operator
and below are our findings.
Application Setup:
1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
2. Streaming window for application is set to 5 seconds from 0.5 seconds for
easily reproducing the issue
3. Created 2 custom classes by for Input and Output operator only for the
purpose of adding debugging logs
Logs for KafkaIn before operator failure :
--------------------------------------------
2018-03-20 19:36:49,494 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:36:49,599 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:36:54,496 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:36:54,578 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
Logs of the KafKaIn after recovery :
------------------------------------
CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:37:06,664 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:37:06,665 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:37:06,720 INFO util.AsyncFSStorageAgent
(AsyncFSStorageAgent.java:save(91)) - using
/grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
as the basepath for checkpointing.
2018-03-20 19:37:06,727 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771824 : 0
2018-03-20 19:37:06,768 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771825 : 0
2018-03-20 19:37:06,810 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
Logs of the KafkaOutput operator :
-----------------------------------
2018-03-20 19:37:06,616 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771822
2018-03-20 19:37:06,617 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
- Rebuild the partial window after 6535189514237771823
2018-03-20 19:37:07,943 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
- Partitial Window tuples :
{id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771823
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771824
2018-03-20 19:37:07,945 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
(StreamingContainer.java:run(1456)) - Operator set
[OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples
received after operator reset.
at
com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
2018-03-20 19:37:07,964 INFO producer.KafkaProducer
(KafkaProducer.java:close(613)) - Closing the Kafka producer with
timeoutMillis = 9223372036854775807 ms.
2018-03-20 19:37:08,515 INFO engine.StreamingContainer
(StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy request:
[3]
If you see the logs from KafKa In before and after, the last window that
operator processed is 6535189514237771823 and while processing
6535189514237771824 it got killed. You can also see that the first tuple
from window 6535189514237771824 is {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
When Operator recovers it replays the tuple correctly till
6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
6535189514237771825 window ids and then send the complete accumulated tuples
in 6535189514237771826 with 1st tuple as
{"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}
This as per my understanding is not an idempotent behavior since the tuple
assignment before failure changed after recovery. Please correct me if I am
wrong. This we believe is casuing the failure for output operator because we
see that it recovers correctly with partially processed window
6535189514237771824. (Please refer the logs). We also verfied it by adding
consumer on output topic
Could you please confirm if its an issue and needs fix? and suggest one if
possible?
Regards
Vivek
--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut
Posted by Pramod Immaneni <pr...@datatorrent.com>.
Have you tried instantiating FSWindowDataManger and setting it on the
operator instance in populateDAG. There may be state associated with the
window manager which will be lost if you set a new instance each time in
setup. You should be able to use KakfaSinglePortInputOperator directly.
On Thu, Mar 15, 2018 at 2:49 PM, Vivek Bhide <vi...@target.com> wrote:
> This is a continuation of my previous question about
> KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
> end exactly once processing with data receiving from one Kafka topic and
> finally posting it to another Kafka topic. Below are three things needed,
> as
> Pramod mentioned in one of the presentations, to achieve this
>
> 1. Idempotent inuput operator
> 2. stateful operator recovery (Something that Apex provides out of the box)
> 3. Action by OutputOperator (similar to what AbstractFileOutputOperator
> does)
>
> I am using the KafkaSinglePortInputOperator from
> org.apache.apex.malhar.kafka package which is not an idempotent by default.
> I made it idempotent as below
>
> public abstract class CustomAbstractKafkaInputOperator extends
> AbstractKafkaInputOperator {
>
> @Override
> public void setup(OperatorContext context) {
> super.setWindowDataManager(new FSWindowDataManager());
> super.setup(context);
> }
>
> }
>
> public class CustomKafkaSinglePortInputOperator extends
> CustomAbstractKafkaInputOperator {
>
> public final transient DefaultOutputPort<byte[]> outputPort = new
> DefaultOutputPort<>();
>
> @Override
> public AbstractKafkaConsumer createConsumer(Properties properties)
> {
> return new KafkaConsumer09(properties);
> }
>
> @Override
> protected void emitTuple(String cluster, ConsumerRecord<byte[],
> byte[]>
> message) {
> outputPort.emit(message.value());
> }
> }
>
> For output operator I am using KafkaSinglePortExactlyOnceOutputOperator
> from
> org.apache.apex.malhar.kafka package. With reference to my previous
> question, I made sure that both/all applicable mentioned conditions are
> satisfied and even after that, I am receiving below error
>
> 2018-03-15 12:28:49,485 INFO com.datatorrent.stram.
> StreamingContainerParent:
> child msg: Stopped running due to an exception. java.lang.RuntimeException:
> Violates Exactly once. Not all the tuples received after operator reset.
> at
> org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutp
> utOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
> at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(
> GenericNode.java:153)
> at com.datatorrent.stram.engine.GenericNode.run(GenericNode.
> java:397)
> at
> com.datatorrent.stram.engine.StreamingContainer$2.run(
> StreamingContainer.java:1428)
> context:
> PTContainer[id=4(container_1521139241132_0003_01_000013),
> state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=
> PENDING_DEPLOY]]]
>
> Questions I have are
>
> 1. Is the way input operator made idempotent correct? or am i missing
> anything
> 2. Do I need to make each and every operator in pipeline idempotent to
> achieve this? As per my understanding, Not; because once the mapping
> between
> tuple to window is established at first operator, it doen't change anywhere
> furhter in pipeline
>
> Regards
> Vivek
>
>
>
> --
> Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
>