You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Leonardo Campos <le...@gameduell.de> on 2019/10/31 09:17:16 UTC
Encoding Problem: Kafka - DataFlow
Hello,
Problem: Special characters such as öüä are being save to our sinks are "?".
Set up: We read from Kafka using Kafka IO, run the Pipeline with
DataFlow Runner and save the results to BigQuery and ElasticSearch.
We checked that data is being written to Kafka in UTF-8 (code check). We
checked also that the special characters appear using
kafka-console-consumer.
Something else is that in a local setup, with Kafka in docker* and using
Direct Runner, the character was correctly encoded. *the event was
writen using kafka-console-producer.
Reading from Kafka:
pipeline
.apply(
"ReadInput",
KafkaIO.<String, String>read()
.withBootstrapServers(...)
.withTopics(...)
.updateConsumerProperties(...) // only"group.id" and"auto.offset.reset"
.withValueDeserializer(StringDeserializer.class)
.withCreateTime(Duration.standardMinutes(10))
.commitOffsetsInFinalize())
So, any clues on where to investigate? In the mean time I'm going to add
more logging to the application to see if I can detect where the
characters get "lost" in the pipeline. Also try to write to local Kafka
using a Java Kafka Producer where I can be sure it is written in UTF-8.
Thank you for the support.
Re: Encoding Problem: Kafka - DataFlow
Posted by Luke Cwik <lc...@google.com>.
The only combination that I can think of is to use this hack[1] combined
with a JvmInitialier[2].
1: https://stackoverflow.com/a/14987992/4368200
2:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java
On Mon, Nov 4, 2019 at 1:40 AM Leonardo Campos | GameDuell <
leonardo.campos@gameduell.de> wrote:
> Thanks, Eddie.
>
> Just to add to the discussion, I logged the following information:
> Charset.defaultCharset(): US-ASCII
> System.getProperty("file.encoding"): ANSI_X3.4-1968
> OutputStreamWriter writer = new OutputStreamWriter(new
> ByteArrayOutputStream()); writer..getEncoding(): ASCII
>
> In our case, a Json library seems to be messing things up, as just on
> first glance I already found in its internals a string.getBytes()
> without the possibility to inform the encoding.
>
> I really wonder if there is any way to change this default in DataFlow.
>
> Cheers
>
> On 04.11.2019 09:58, Eddy G wrote:
> > Adding to what Jeff just pointed out previously I'm dealing with the
> > same issue writing Parquet files using the ParquetIO module in
> > Dataflow and same stuff happens, even forcing all String objects with
> > UTF-8. Maybe it is related to behind the scenes decoding/encoding
> > within the previously mentioned module which causes those chars to be
> > wrongly encoded in the output, just in case you are doing some
> > Parquet
> > processing or using any other module in the end which may have a
> > similar behavior.
>
>
Re: Encoding Problem: Kafka - DataFlow
Posted by Leonardo Campos | GameDuell <le...@gameduell.de>.
Thanks, Eddie.
Just to add to the discussion, I logged the following information:
Charset.defaultCharset(): US-ASCII
System.getProperty("file.encoding"): ANSI_X3.4-1968
OutputStreamWriter writer = new OutputStreamWriter(new
ByteArrayOutputStream()); writer..getEncoding(): ASCII
In our case, a Json library seems to be messing things up, as just on
first glance I already found in its internals a string.getBytes()
without the possibility to inform the encoding.
I really wonder if there is any way to change this default in DataFlow.
Cheers
On 04.11.2019 09:58, Eddy G wrote:
> Adding to what Jeff just pointed out previously I'm dealing with the
> same issue writing Parquet files using the ParquetIO module in
> Dataflow and same stuff happens, even forcing all String objects with
> UTF-8. Maybe it is related to behind the scenes decoding/encoding
> within the previously mentioned module which causes those chars to be
> wrongly encoded in the output, just in case you are doing some
> Parquet
> processing or using any other module in the end which may have a
> similar behavior.
Re: Encoding Problem: Kafka - DataFlow
Posted by Eddy G <ka...@gmail.com>.
Adding to what Jeff just pointed out previously I'm dealing with the same issue writing Parquet files using the ParquetIO module in Dataflow and same stuff happens, even forcing all String objects with UTF-8. Maybe it is related to behind the scenes decoding/encoding within the previously mentioned module which causes those chars to be wrongly encoded in the output, just in case you are doing some Parquet processing or using any other module in the end which may have a similar behavior.
Re: Encoding Problem: Kafka - DataFlow
Posted by Jeff Klukas <jk...@mozilla.com>.
I ran into exactly this same problem of finding some accented characters
getting replaced with "?" in a pipeline only when running on Dataflow and
not when using the Direct Runner. KafkaIO was not involved, but I'd bet the
root cause is the same.
In my case, the input turned out to be properly UTF-8 encoded and the
problem was that we were calling String#getBytes() without specifying a
charset. Locally, the default charset was UTF-8, but it looks like the
Dataflow workers must have default charset set to something else (I
suspect Windows-1252),
so it was interpreting the UTF-8 bytes as Windows-1252 bytes for the byte
arrays in our PCollection, and then they were being read back as UTF-8.
We resolved the issue by combing our code for all uses of String#getBytes()
and making sure we always pass in StandardCharsets.UTF_8.
On Thu, Oct 31, 2019 at 5:26 AM Leonardo Campos <
leonardo.campos@gameduell.de> wrote:
> Hello,
>
> Problem: Special characters such as öüä are being save to our sinks are
> "?".
> Set up: We read from Kafka using Kafka IO, run the Pipeline with DataFlow
> Runner and save the results to BigQuery and ElasticSearch.
>
> We checked that data is being written to Kafka in UTF-8 (code check). We
> checked also that the special characters appear using
> kafka-console-consumer.
> Something else is that in a local setup, with Kafka in docker* and using
> Direct Runner, the character was correctly encoded. *the event was writen
> using kafka-console-producer.
>
> Reading from Kafka:
>
> pipeline
> .apply(
> "ReadInput",
> KafkaIO.<String, String>read()
> .withBootstrapServers(...)
> .withTopics(...)
> .updateConsumerProperties(...) // only "group.id" and "auto.offset.reset"
> .withValueDeserializer(StringDeserializer.class)
> .withCreateTime(Duration.standardMinutes(10))
> .commitOffsetsInFinalize())
>
> So, any clues on where to investigate? In the mean time I'm going to add
> more logging to the application to see if I can detect where the characters
> get "lost" in the pipeline. Also try to write to local Kafka using a Java
> Kafka Producer where I can be sure it is written in UTF-8.
>
> Thank you for the support.
>
>