You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Tom Underhill (Jira)" <ji...@apache.org> on 2021/03/11 19:24:00 UTC

[jira] [Comment Edited] (BEAM-11851) ConfluentSchemaRegistryProvider fails when authentication is required

    [ https://issues.apache.org/jira/browse/BEAM-11851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17299822#comment-17299822 ] 

Tom Underhill edited comment on BEAM-11851 at 3/11/21, 7:23 PM:
----------------------------------------------------------------

 

Hi Alexey,

I'm having the same issue trying to connect to a CSR that implements basic authentication. In this case I'm trying to use Confluent Cloud & Confluent Cloud Schema Registry. Connecting to CC is no problem using .withConsumerConfigUpdates(props) but it's unclear where or if you can pass along separate API keys and secrets to CC SR.  Any pointers on how to make this work? or does this need a PR?

Many Thanks!
{code:java}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.util.HashMap;
import java.util.Map;

public class DataflowJob {
    public static void main(String[] args) {

        Map<String, Object> props = new HashMap<>();
        props.put("auto.offset.reset", "earliest");
        props.put("ssl.endpoint.identification.algorithm", "https");
        props.put("sasl.mechanism", "PLAIN");
        props.put("request.timeout.ms", 20000);
        props.put("retry.backoff.ms", 500);
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CC API KEY>\" password=\"<CC API SECRET>\";");
        props.put("security.protocol", "SASL_SSL");

        props.put("schema.registry.url", "<CC-SR-URL>");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "<CC-SR-KEY:CC-SR-SECRET>");

        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        p.apply(KafkaIO.<Long, String>read()
                .withBootstrapServers("<CC-URL>")
                .withTopic("orders")
                .withConsumerConfigUpdates(props)
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("<CC-SR-URL>", "orders-value"))
                .withoutMetadata()
        );

        p.run().waitUntilFinish();
    }
}
{code}
fails with
{code:java}
Exception in thread "main" java.lang.RuntimeException: Unable to get latest schema metadata for subject: orders-valueException in thread "main" java.lang.RuntimeException: Unable to get latest schema metadata for subject: orders-value at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1192) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1096) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:486) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1256) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1245) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:176) at DataflowJob.main(DataflowJob.java:30)Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515) at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:116) ... 15 more
{code}
 


was (Author: tunderhill):
 

Hi Alexey,

I'm having the same issue trying to connect to a CSR that implements basic authentication. In this case I'm trying to use Confluent Cloud & Confluent Cloud Schema Registry. Connecting to CC is no problem using .withConsumerConfigUpdates(props) but it's unclear where or if you can pass along separate API keys and secrets to CC SR.  Any pointers on how to make this work? or does this need a PR?

Many Thanks!
{code:java}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.util.HashMap;
import java.util.Map;

public class DataflowJob {
    public static void main(String[] args) {

        Map<String, Object> props = new HashMap<>();
        props.put("auto.offset.reset", "earliest");
        props.put("ssl.endpoint.identification.algorithm", "https");
        props.put("sasl.mechanism", "PLAIN");
        props.put("request.timeout.ms", 20000);
        props.put("retry.backoff.ms", 500);
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CC API KEY>\" password=\"<CC API SECRET>\";");
        props.put("security.protocol", "SASL_SSL");

        props.put("schema.registry.url", "<CC-SR-URL>");
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "<CC-SR-KEY:CC-SR-SECRET>");

        PipelineOptions options = PipelineOptionsFactory.create();
        Pipeline p = Pipeline.create(options);

        p.apply(KafkaIO.<Long, String>read()
                .withBootstrapServers("pkc-4r297.europe-west1.gcp.confluent.cloud:9092")
                .withTopic("orders")
                .withConsumerConfigUpdates(props)
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of("<CC-SR-URL>", "orders-value"))
                .withoutMetadata()
        );

        p.run().waitUntilFinish();
    }
}
{code}
fails with
{code:java}
Exception in thread "main" java.lang.RuntimeException: Unable to get latest schema metadata for subject: orders-valueException in thread "main" java.lang.RuntimeException: Unable to get latest schema metadata for subject: orders-value at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1192) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1096) at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:486) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1256) at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1245) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:481) at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:176) at DataflowJob.main(DataflowJob.java:30)Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:515) at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:507) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275) at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:116) ... 15 more
{code}
 

> ConfluentSchemaRegistryProvider fails when authentication is required
> ---------------------------------------------------------------------
>
>                 Key: BEAM-11851
>                 URL: https://issues.apache.org/jira/browse/BEAM-11851
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.27.0
>            Reporter: Maxim Ivanov
>            Priority: P2
>
> When configuring KafkaIO.reader with ConfluentSchemaRegistryProvider as value deserializer with `basic.auth.credentials.source=AUTH_INO` and `basic.auth.user.info=user:password` properties, it fails to start the pipepline
> {code:java}
> [error] java.lang.RuntimeException: Unable to get latest schema metadata for subject: identity_users_v2-value
> [error]   at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:119)
> [error]   at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:110)
> [error]   at org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:106)
> [error]   at org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1147)
> [error]   at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1052)
> [error]   at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:481)
> [error]   at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:547)
> [error]   at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:498)
> [error]   at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
> [error]   at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:191) {code}
>  
> I suspect that it because it reconfigured KafkaAvroDeserializer only in getDeserializer, but doesn't do so in getCoder.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)