You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Michal Tomaszewski <Mi...@cca.pl> on 2018/08/20 15:38:48 UTC

PutHive3Streaming concurrent tasks on different nodes

Hello,

I'm using PutHive3Streaming processor working on 3 nodes cluster.
Processor is writing to partition, that is generated every minute using pattern in Partition Value:
${now():toNumber():divide(1000):divide(60):multiply(60)}

Every minute we have an error in hive metastore log thrown by 2 nifi nodes (out of 3):

2018-08-20T16:06:26,888 ERROR [pool-6-thread-197]: metastore.RetryingHMSHandler (RetryingHMSHandler.java:invokeInternal(201)) - AlreadyExistsException(message:Partition already exists: Partition(values:[1534773960], dbName:dataflow_presence, tableName:presence_oneminute_flow, createTime:0, lastAccessTime:0, sd:StorageDescriptor(cols:[FieldSchema(name:monitorid, type:int, comment:null), FieldSchema(name:deviceid, type:int, comment:null), FieldSchema(name:mac, type:string, comment:null), FieldSchema(name:mints, type:int, comment:null), FieldSchema(name:maxts, type:int, comment:null), FieldSchema(name:visitorsignal, type:int, comment:null), FieldSchema(name:connected, type:int, comment:null), FieldSchema(name:authenticated, type:int, comment:null), FieldSchema(name:vendor, type:string, comment:null), FieldSchema(name:daydate, type:int, comment:null)], location:hdfs://dataflowhdfscluster/warehouse/tablespace/managed/hive/dataflow_presence.db/presence_oneminute_flow/write_time=1534773960, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{serialization.format=1}), bucketCols:[mac], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), parameters:null, catName:hive))

Configuration of PutHive3Streaming processor:
[cid:image001.png@01D438A7.EB4B46E0]

There is a 1 minute merge record before PutHive3Streaming processor:
[cid:image002.png@01D438AC.A5FBCCE0]

The same error appeared when "Partition Values" configuration field was empty and partition was sent inside the data file.
I'm using the current snapshot of nifi 1.8 and Hortonworks HDP 3.0.

It seems that PutHive3Streaming processor is not verifying the existence of partition just before streaming, so - as the result - one node creates partition and other nodes are trying to create the same partition (even long time later - like 20 seconds later).

Is there any workaround of this issue?

Regards,
Mike

________________________________________ Uwaga: Tre?? niniejszej wiadomo?ci mo?e by? poufna i obj?ta zakazem jej ujawniania. Je?li czytelnik tej wiadomo?ci nie jest jej zamierzonym adresatem, pracownikiem lub po?rednikiem upowa?nionym do jej przekazania adresatowi, informujemy ?e wszelkie rozprowadzanie, rozpowszechnianie lub powielanie niniejszej wiadomo?ci jest zabronione. Je?li otrzyma?e? t? wiadomo?? omy?kowo, prosz? bezzw?ocznie odes?a? j? nadawcy, a sam? wiadomo?? usun?? z komputera. Dzi?kujemy. ________________________________ Note: The information contained in this message may be privileged and confidential and protected from disclosure. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited.If you have received this communication in error, please notify the sender immediately by replying to the message and deleting it from your computer. Thank you. ________________________________

RE: PutHive3Streaming concurrent tasks on different nodes

Posted by Michal Tomaszewski <Mi...@cca.pl>.
Hi Matt,
Thanks for clarification.
Data is probably going to Hive properly. We did not see data loss, but in our case verification is quit complex. It probably also does not route to the retry queue – we have not noticed it, but I’ll try to verify it again during heavy load. In case I find anything – I’ll let you know.
Thanks again.

Regards,
Mike


Subject: Re: PutHive3Streaming concurrent tasks on different nodes

Mike,

Did the data appear to go through in spite of the error? I talked to Prasanth (a Hive Streaming developer), he said:

"HiveStreamingConnection ignores AlreadyExistsException. Maybe it should use a different add_partition API that adds only if partition does not exist. The metastore exception can be safely ignored. Regardless, HiveStreamingConnection should use ifNotExists API instead of addPartition. I will create an upstream ticket for it."

If the data is not going through, then perhaps although Hive Streaming ignores the error, if they propagate it out somehow, NiFi might think it's a legit error and send the flow file to error or retry.

He filed HIVE-20428 [1] for the improvement. Please let me know if you are seeing data loss and I will look into it from the NiFi side.

Thanks,
Matt

[1] https://issues.apache.org/jira/browse/HIVE-20428




On Mon, Aug 20, 2018 at 11:39 AM Michal Tomaszewski <Mi...@cca.pl>> wrote:
Hello,

I’m using PutHive3Streaming processor working on 3 nodes cluster.
Processor is writing to partition, that is generated every minute using pattern in Partition Value:
${now():toNumber():divide(1000):divide(60):multiply(60)}

Every minute we have an error in hive metastore log thrown by 2 nifi nodes (out of 3):

2018-08-20T16:06:26,888 ERROR [pool-6-thread-197]: metastore.RetryingHMSHandler (RetryingHMSHandler.java:invokeInternal(201)) - AlreadyExistsException(message:Partition already exists: Partition(values:[1534773960], dbName:dataflow_presence, tableName:presence_oneminute_flow, createTime:0, lastAccessTime:0, sd:StorageDescriptor(cols:[FieldSchema(name:monitorid, type:int, comment:null), FieldSchema(name:deviceid, type:int, comment:null), FieldSchema(name:mac, type:string, comment:null), FieldSchema(name:mints, type:int, comment:null), FieldSchema(name:maxts, type:int, comment:null), FieldSchema(name:visitorsignal, type:int, comment:null), FieldSchema(name:connected, type:int, comment:null), FieldSchema(name:authenticated, type:int, comment:null), FieldSchema(name:vendor, type:string, comment:null), FieldSchema(name:daydate, type:int, comment:null)], location:hdfs://dataflowhdfscluster/warehouse/tablespace/managed/hive/dataflow_presence.db/presence_oneminute_flow/write_time=1534773960, inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat, compressed:false, numBuckets:1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde, parameters:{serialization.format=1}), bucketCols:[mac], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), parameters:null, catName:hive))

Configuration of PutHive3Streaming processor:
[cid:image001.png@01D438A7.EB4B46E0]

There is a 1 minute merge record before PutHive3Streaming processor:
[cid:image002.png@01D438AC.A5FBCCE0]

The same error appeared when “Partition Values” configuration field was empty and partition was sent inside the data file.
I’m using the current snapshot of nifi 1.8 and Hortonworks HDP 3.0.

It seems that PutHive3Streaming processor is not verifying the existence of partition just before streaming, so – as the result – one node creates partition and other nodes are trying to create the same partition (even long time later – like 20 seconds later).

Is there any workaround of this issue?

Regards,
Mike

________________________________________ Uwaga: Treść niniejszej wiadomości może być poufna i objęta zakazem jej ujawniania. Jeśli czytelnik tej wiadomości nie jest jej zamierzonym adresatem, pracownikiem lub pośrednikiem upoważnionym do jej przekazania adresatowi, informujemy że wszelkie rozprowadzanie, rozpowszechnianie lub powielanie niniejszej wiadomości jest zabronione. Jeśli otrzymałeś tę wiadomość omyłkowo, proszę bezzwłocznie odesłać ją nadawcy, a samą wiadomość usunąć z komputera. Dziękujemy. ________________________________ Note: The information contained in this message may be privileged and confidential and protected from disclosure. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited.If you have received this communication in error, please notify the sender immediately by replying to the message and deleting it from your computer. Thank you. ________________________________

Re: PutHive3Streaming concurrent tasks on different nodes

Posted by Matt Burgess <ma...@apache.org>.
Mike,

Did the data appear to go through in spite of the error? I talked to
Prasanth (a Hive Streaming developer), he said:

"HiveStreamingConnection ignores AlreadyExistsException. Maybe it should
use a different add_partition API that adds only if partition does not
exist. The metastore exception can be safely ignored. Regardless,
HiveStreamingConnection should use ifNotExists API instead of addPartition.
I will create an upstream ticket for it."

If the data is not going through, then perhaps although Hive Streaming
ignores the error, if they propagate it out somehow, NiFi might think it's
a legit error and send the flow file to error or retry.

He filed HIVE-20428 [1] for the improvement. Please let me know if you are
seeing data loss and I will look into it from the NiFi side.

Thanks,
Matt

[1] https://issues.apache.org/jira/browse/HIVE-20428




On Mon, Aug 20, 2018 at 11:39 AM Michal Tomaszewski <
Michal.Tomaszewski@cca.pl> wrote:

> Hello,
>
>
>
> I’m using PutHive3Streaming processor working on 3 nodes cluster.
>
> Processor is writing to partition, that is generated every minute using
> pattern in Partition Value:
>
> ${now():toNumber():divide(1000):divide(60):multiply(60)}
>
>
>
> Every minute we have an error in hive metastore log thrown by 2 nifi nodes
> (out of 3):
>
>
>
> 2018-08-20T16:06:26,888 ERROR [pool-6-thread-197]:
> metastore.RetryingHMSHandler (RetryingHMSHandler.java:invokeInternal(201))
> - AlreadyExistsException(message:Partition already exists:
> Partition(values:[1534773960], dbName:dataflow_presence,
> tableName:presence_oneminute_flow, createTime:0, lastAccessTime:0,
> sd:StorageDescriptor(cols:[FieldSchema(name:monitorid, type:int,
> comment:null), FieldSchema(name:deviceid, type:int, comment:null),
> FieldSchema(name:mac, type:string, comment:null), FieldSchema(name:mints,
> type:int, comment:null), FieldSchema(name:maxts, type:int, comment:null),
> FieldSchema(name:visitorsignal, type:int, comment:null),
> FieldSchema(name:connected, type:int, comment:null),
> FieldSchema(name:authenticated, type:int, comment:null),
> FieldSchema(name:vendor, type:string, comment:null),
> FieldSchema(name:daydate, type:int, comment:null)],
> location:hdfs://dataflowhdfscluster/warehouse/tablespace/managed/hive/dataflow_presence.db/presence_oneminute_flow/write_time=1534773960,
> inputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat,
> outputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat,
> compressed:false, numBuckets:1, serdeInfo:SerDeInfo(name:null,
> serializationLib:org.apache.hadoop.hive.ql.io.orc.OrcSerde,
> parameters:{serialization.format=1}), bucketCols:[mac], sortCols:[],
> parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[],
> skewedColValueLocationMaps:{}), storedAsSubDirectories:false),
> parameters:null, catName:hive))
>
>
>
> Configuration of PutHive3Streaming processor:
>
>
>
> There is a 1 minute merge record before PutHive3Streaming processor:
>
>
>
> The same error appeared when “Partition Values” configuration field was
> empty and partition was sent inside the data file.
>
> I’m using the current snapshot of nifi 1.8 and Hortonworks HDP 3.0.
>
>
>
> It seems that PutHive3Streaming processor is not verifying the existence
> of partition just before streaming, so – as the result – one node creates
> partition and other nodes are trying to create the same partition (even
> long time later – like 20 seconds later).
>
>
>
> Is there any workaround of this issue?
>
>
>
> Regards,
>
> Mike
>
>
> ________________________________________ Uwaga: Treść niniejszej
> wiadomości może być poufna i objęta zakazem jej ujawniania. Jeśli czytelnik
> tej wiadomości nie jest jej zamierzonym adresatem, pracownikiem lub
> pośrednikiem upoważnionym do jej przekazania adresatowi, informujemy że
> wszelkie rozprowadzanie, rozpowszechnianie lub powielanie niniejszej
> wiadomości jest zabronione. Jeśli otrzymałeś tę wiadomość omyłkowo, proszę
> bezzwłocznie odesłać ją nadawcy, a samą wiadomość usunąć z komputera.
> Dziękujemy. ________________________________ Note: The information
> contained in this message may be privileged and confidential and protected
> from disclosure. If the reader of this message is not the intended
> recipient, or an employee or agent responsible for delivering this message
> to the intended recipient, you are hereby notified that any dissemination,
> distribution or copying of this communication is strictly prohibited.If you
> have received this communication in error, please notify the sender
> immediately by replying to the message and deleting it from your computer.
> Thank you. ________________________________
>