You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Leonardo Campos <le...@gameduell.de> on 2019/10/31 09:17:16 UTC

Encoding Problem: Kafka - DataFlow

Hello,

Problem: Special characters such as öüä are being save to our sinks are "?".
Set up: We read from Kafka using Kafka IO, run the Pipeline with 
DataFlow Runner and save the results to BigQuery and ElasticSearch.

We checked that data is being written to Kafka in UTF-8 (code check). We 
checked also that the special characters appear using 
kafka-console-consumer.
Something else is that in a local setup, with Kafka in docker* and using 
Direct Runner, the character was correctly encoded. *the event was 
writen using kafka-console-producer.

Reading from Kafka:

pipeline
     .apply(
         "ReadInput",
         KafkaIO.<String, String>read()
             .withBootstrapServers(...)
             .withTopics(...)
             .updateConsumerProperties(...) // only"group.id"  and"auto.offset.reset"
             .withValueDeserializer(StringDeserializer.class)
             .withCreateTime(Duration.standardMinutes(10))
             .commitOffsetsInFinalize())

So, any clues on where to investigate? In the mean time I'm going to add 
more logging to the application to see if I can detect where the 
characters get "lost" in the pipeline. Also try to write to local Kafka 
using a Java Kafka Producer where I can be sure it is written in UTF-8.

Thank you for the support.




Re: Encoding Problem: Kafka - DataFlow

Posted by Luke Cwik <lc...@google.com>.
The only combination that I can think of is to use this hack[1] combined
with a JvmInitialier[2].

1: https://stackoverflow.com/a/14987992/4368200
2:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java

On Mon, Nov 4, 2019 at 1:40 AM Leonardo Campos | GameDuell <
leonardo.campos@gameduell.de> wrote:

> Thanks, Eddie.
>
> Just to add to the discussion, I logged the following information:
> Charset.defaultCharset(): US-ASCII
> System.getProperty("file.encoding"): ANSI_X3.4-1968
> OutputStreamWriter writer = new OutputStreamWriter(new
> ByteArrayOutputStream()); writer..getEncoding(): ASCII
>
> In our case, a Json library seems to be messing things up, as just on
> first glance I already found in its internals a string.getBytes()
> without the possibility to inform the encoding.
>
> I really wonder if there is any way to change this default in DataFlow.
>
> Cheers
>
> On 04.11.2019 09:58, Eddy G wrote:
> > Adding to what Jeff just pointed out previously I'm dealing with the
> > same issue writing Parquet files using the ParquetIO module in
> > Dataflow and same stuff happens, even forcing all String objects with
> > UTF-8. Maybe it is related to behind the scenes decoding/encoding
> > within the previously mentioned module which causes those chars to be
> > wrongly encoded in the output, just in case you are doing some
> > Parquet
> > processing or using any other module in the end which may have a
> > similar behavior.
>
>

Re: Encoding Problem: Kafka - DataFlow

Posted by Leonardo Campos | GameDuell <le...@gameduell.de>.
Thanks, Eddie.

Just to add to the discussion, I logged the following information:
Charset.defaultCharset(): US-ASCII
System.getProperty("file.encoding"): ANSI_X3.4-1968
OutputStreamWriter writer = new OutputStreamWriter(new 
ByteArrayOutputStream()); writer..getEncoding(): ASCII

In our case, a Json library seems to be messing things up, as just on 
first glance I already found in its internals a string.getBytes() 
without the possibility to inform the encoding.

I really wonder if there is any way to change this default in DataFlow.

Cheers

On 04.11.2019 09:58, Eddy G wrote:
> Adding to what Jeff just pointed out previously I'm dealing with the
> same issue writing Parquet files using the ParquetIO module in
> Dataflow and same stuff happens, even forcing all String objects with
> UTF-8. Maybe it is related to behind the scenes decoding/encoding
> within the previously mentioned module which causes those chars to be
> wrongly encoded in the output, just in case you are doing some 
> Parquet
> processing or using any other module in the end which may have a
> similar behavior.


Re: Encoding Problem: Kafka - DataFlow

Posted by Eddy G <ka...@gmail.com>.
Adding to what Jeff just pointed out previously I'm dealing with the same issue writing Parquet files using the ParquetIO module in Dataflow and same stuff happens, even forcing all String objects with UTF-8. Maybe it is related to behind the scenes decoding/encoding within the previously mentioned module which causes those chars to be wrongly encoded in the output, just in case you are doing some Parquet processing or using any other module in the end which may have a similar behavior.

Re: Encoding Problem: Kafka - DataFlow

Posted by Jeff Klukas <jk...@mozilla.com>.
I ran into exactly this same problem of finding some accented characters
getting replaced with "?" in a pipeline only when running on Dataflow and
not when using the Direct Runner. KafkaIO was not involved, but I'd bet the
root cause is the same.

In my case, the input turned out to be properly UTF-8 encoded and the
problem was that we were calling String#getBytes() without specifying a
charset. Locally, the default charset was UTF-8, but it looks like the
Dataflow workers must have default charset set to something else (I
suspect Windows-1252),
so it was interpreting the UTF-8 bytes as Windows-1252 bytes for the byte
arrays in our PCollection, and then they were being read back as UTF-8.

We resolved the issue by combing our code for all uses of String#getBytes()
and making sure we always pass in StandardCharsets.UTF_8.

On Thu, Oct 31, 2019 at 5:26 AM Leonardo Campos <
leonardo.campos@gameduell.de> wrote:

> Hello,
>
> Problem: Special characters such as öüä are being save to our sinks are
> "?".
> Set up: We read from Kafka using Kafka IO, run the Pipeline with DataFlow
> Runner and save the results to BigQuery and ElasticSearch.
>
> We checked that data is being written to Kafka in UTF-8 (code check). We
> checked also that the special characters appear using
> kafka-console-consumer.
> Something else is that in a local setup, with Kafka in docker* and using
> Direct Runner, the character was correctly encoded. *the event was writen
> using kafka-console-producer.
>
> Reading from Kafka:
>
> pipeline
>     .apply(
>         "ReadInput",
>         KafkaIO.<String, String>read()
>             .withBootstrapServers(...)
>             .withTopics(...)
>             .updateConsumerProperties(...) // only "group.id" and "auto.offset.reset"
>             .withValueDeserializer(StringDeserializer.class)
>             .withCreateTime(Duration.standardMinutes(10))
>             .commitOffsetsInFinalize())
>
> So, any clues on where to investigate? In the mean time I'm going to add
> more logging to the application to see if I can detect where the characters
> get "lost" in the pipeline. Also try to write to local Kafka using a Java
> Kafka Producer where I can be sure it is written in UTF-8.
>
> Thank you for the support.
>
>