You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Lisheng Wang <wa...@gmail.com> on 2020/01/06 07:31:12 UTC

about default value of struct of connect api

hello kafka devs

i'm facing a problem that how to set a default value of struct.

i'm following https://docs.confluent.io/current/connect/devguide.html

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();

Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75)
    .build();

below is my code, i dont know how to set default value when schema type is
struct

Schema schema1 = SchemaBuilder.struct().name("info")
                .field("address", Schema.STRING_SCHEMA)
                .field("code",
Schema.STRING_SCHEMA).defaultValue("").build();

Thanks!





Best,
Lisheng

Re: about default value of struct of connect api

Posted by Lisheng Wang <wa...@gmail.com>.
update my findings,

below is code of method 'defaultValue':

public SchemaBuilder defaultValue(Object value) {
        checkCanSet(DEFAULT_FIELD, defaultValue, value);
        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
        try {
            ConnectSchema.validateValue(this, value);
        } catch (DataException e) {
            throw new SchemaBuilderException("Invalid default value", e);
        }
        defaultValue = value;
        return this;
    }

seems will be ok if i change ConnectSchema.validateValue(this, value) to
ConnectSchema.validateValue(this.builder(), value), i dont know if it's ok
for other case.

Thanks.

Best,
Lisheng


Lisheng Wang <wa...@gmail.com> 于2020年1月8日周三 上午11:44写道:

> hello guys,
>
> update my question,
>
> i made a test, code as below, i want to set a default value of address to
> person
>
>         SchemaBuilder schemaBuilder =
> SchemaBuilder.struct().name("address")
>                 .field("province", SchemaBuilder.STRING_SCHEMA)
>                 .field("city", SchemaBuilder.STRING_SCHEMA);
>         Struct defaultValue = new Struct(schemaBuilder.build())
>                 .put("province", "aaaa")
>                 .put("city", "bbbb");
>         Schema dataSchema = SchemaBuilder.struct().name("person")
>                 .field("address",
> schemaBuilder.defaultValue(defaultValue).build()).build();
>         Struct struct = new Struct(dataSchema);
>         System.out.println(struct.toString());
>
> i got exception as below
>
> Exception in thread "main"
> org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default
> value
> at
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
>
> Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas
> do not match.
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
> at
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
> ... 1 more
>
> i digged code of ConnectSchema.validateValue and found when type is
> STRUCT, then will check class of schema, but one is  SchemaBuilder, another
> is ConnectSchema
>
>          case STRUCT:
>                 Struct struct = (Struct) value;
>                 if (!struct.schema().equals(schema))
>                     throw new DataException("Struct schemas do not
> match.");
>                 struct.validate();
>                 break;
>
> the method of equals is
>
> @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>         ConnectSchema schema = (ConnectSchema) o;
>         return Objects.equals(optional, schema.optional) &&
>                 Objects.equals(version, schema.version) &&
>                 Objects.equals(name, schema.name) &&
>                 Objects.equals(doc, schema.doc) &&
>                 Objects.equals(type, schema.type) &&
>                 Objects.deepEquals(defaultValue, schema.defaultValue) &&
>                 Objects.equals(fields, schema.fields) &&
>                 Objects.equals(keySchema, schema.keySchema) &&
>                 Objects.equals(valueSchema, schema.valueSchema) &&
>                 Objects.equals(parameters, schema.parameters);
>     }
>
> can anyone help how to set default value of "STRUCT" type with connect api?
>
> Thanks
>
> Best,
> Lisheng
>
>
> Lisheng Wang <wa...@gmail.com> 于2020年1月6日周一 下午3:31写道:
>
>> hello kafka devs
>>
>> i'm facing a problem that how to set a default value of struct.
>>
>> i'm following https://docs.confluent.io/current/connect/devguide.html
>>
>> Schema schema = SchemaBuilder.struct().name(NAME)
>>     .field("name", Schema.STRING_SCHEMA)
>>     .field("age", Schema.INT_SCHEMA)
>>     .field("admin", new
>> SchemaBuilder.boolean().defaultValue(false).build())
>>     .build();
>>
>> Struct struct = new Struct(schema)
>>     .put("name", "Barbara Liskov")
>>     .put("age", 75)
>>     .build();
>>
>> below is my code, i dont know how to set default value when schema type
>> is struct
>>
>> Schema schema1 = SchemaBuilder.struct().name("info")
>>                 .field("address", Schema.STRING_SCHEMA)
>>                 .field("code",
>> Schema.STRING_SCHEMA).defaultValue("").build();
>>
>> Thanks!
>>
>>
>>
>>
>>
>> Best,
>> Lisheng
>>
>

Re: about default value of struct of connect api

Posted by Lisheng Wang <wa...@gmail.com>.
update my findings,

below is code of method 'defaultValue':

public SchemaBuilder defaultValue(Object value) {
        checkCanSet(DEFAULT_FIELD, defaultValue, value);
        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
        try {
            ConnectSchema.validateValue(this, value);
        } catch (DataException e) {
            throw new SchemaBuilderException("Invalid default value", e);
        }
        defaultValue = value;
        return this;
    }

seems will be ok if i change ConnectSchema.validateValue(this, value) to
ConnectSchema.validateValue(this.builder(), value), i dont know if it's ok
for other case.

Thanks.

Best,
Lisheng


Lisheng Wang <wa...@gmail.com> 于2020年1月8日周三 上午11:44写道:

> hello guys,
>
> update my question,
>
> i made a test, code as below, i want to set a default value of address to
> person
>
>         SchemaBuilder schemaBuilder =
> SchemaBuilder.struct().name("address")
>                 .field("province", SchemaBuilder.STRING_SCHEMA)
>                 .field("city", SchemaBuilder.STRING_SCHEMA);
>         Struct defaultValue = new Struct(schemaBuilder.build())
>                 .put("province", "aaaa")
>                 .put("city", "bbbb");
>         Schema dataSchema = SchemaBuilder.struct().name("person")
>                 .field("address",
> schemaBuilder.defaultValue(defaultValue).build()).build();
>         Struct struct = new Struct(dataSchema);
>         System.out.println(struct.toString());
>
> i got exception as below
>
> Exception in thread "main"
> org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default
> value
> at
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
>
> Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas
> do not match.
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
> at
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
> at
> org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
> ... 1 more
>
> i digged code of ConnectSchema.validateValue and found when type is
> STRUCT, then will check class of schema, but one is  SchemaBuilder, another
> is ConnectSchema
>
>          case STRUCT:
>                 Struct struct = (Struct) value;
>                 if (!struct.schema().equals(schema))
>                     throw new DataException("Struct schemas do not
> match.");
>                 struct.validate();
>                 break;
>
> the method of equals is
>
> @Override
>     public boolean equals(Object o) {
>         if (this == o) return true;
>         if (o == null || getClass() != o.getClass()) return false;
>         ConnectSchema schema = (ConnectSchema) o;
>         return Objects.equals(optional, schema.optional) &&
>                 Objects.equals(version, schema.version) &&
>                 Objects.equals(name, schema.name) &&
>                 Objects.equals(doc, schema.doc) &&
>                 Objects.equals(type, schema.type) &&
>                 Objects.deepEquals(defaultValue, schema.defaultValue) &&
>                 Objects.equals(fields, schema.fields) &&
>                 Objects.equals(keySchema, schema.keySchema) &&
>                 Objects.equals(valueSchema, schema.valueSchema) &&
>                 Objects.equals(parameters, schema.parameters);
>     }
>
> can anyone help how to set default value of "STRUCT" type with connect api?
>
> Thanks
>
> Best,
> Lisheng
>
>
> Lisheng Wang <wa...@gmail.com> 于2020年1月6日周一 下午3:31写道:
>
>> hello kafka devs
>>
>> i'm facing a problem that how to set a default value of struct.
>>
>> i'm following https://docs.confluent.io/current/connect/devguide.html
>>
>> Schema schema = SchemaBuilder.struct().name(NAME)
>>     .field("name", Schema.STRING_SCHEMA)
>>     .field("age", Schema.INT_SCHEMA)
>>     .field("admin", new
>> SchemaBuilder.boolean().defaultValue(false).build())
>>     .build();
>>
>> Struct struct = new Struct(schema)
>>     .put("name", "Barbara Liskov")
>>     .put("age", 75)
>>     .build();
>>
>> below is my code, i dont know how to set default value when schema type
>> is struct
>>
>> Schema schema1 = SchemaBuilder.struct().name("info")
>>                 .field("address", Schema.STRING_SCHEMA)
>>                 .field("code",
>> Schema.STRING_SCHEMA).defaultValue("").build();
>>
>> Thanks!
>>
>>
>>
>>
>>
>> Best,
>> Lisheng
>>
>

Re: about default value of struct of connect api

Posted by Lisheng Wang <wa...@gmail.com>.
hello guys,

update my question,

i made a test, code as below, i want to set a default value of address to
person

        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("address")
                .field("province", SchemaBuilder.STRING_SCHEMA)
                .field("city", SchemaBuilder.STRING_SCHEMA);
        Struct defaultValue = new Struct(schemaBuilder.build())
                .put("province", "aaaa")
                .put("city", "bbbb");
        Schema dataSchema = SchemaBuilder.struct().name("person")
                .field("address",
schemaBuilder.defaultValue(defaultValue).build()).build();
        Struct struct = new Struct(dataSchema);
        System.out.println(struct.toString());

i got exception as below

Exception in thread "main"
org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default
value
at
org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)

Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do
not match.
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at
org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 1 more

i digged code of ConnectSchema.validateValue and found when type is STRUCT,
then will check class of schema, but one is  SchemaBuilder, another
is ConnectSchema

         case STRUCT:
                Struct struct = (Struct) value;
                if (!struct.schema().equals(schema))
                    throw new DataException("Struct schemas do not match.");
                struct.validate();
                break;

the method of equals is

@Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ConnectSchema schema = (ConnectSchema) o;
        return Objects.equals(optional, schema.optional) &&
                Objects.equals(version, schema.version) &&
                Objects.equals(name, schema.name) &&
                Objects.equals(doc, schema.doc) &&
                Objects.equals(type, schema.type) &&
                Objects.deepEquals(defaultValue, schema.defaultValue) &&
                Objects.equals(fields, schema.fields) &&
                Objects.equals(keySchema, schema.keySchema) &&
                Objects.equals(valueSchema, schema.valueSchema) &&
                Objects.equals(parameters, schema.parameters);
    }

can anyone help how to set default value of "STRUCT" type with connect api?

Thanks

Best,
Lisheng


Lisheng Wang <wa...@gmail.com> 于2020年1月6日周一 下午3:31写道:

> hello kafka devs
>
> i'm facing a problem that how to set a default value of struct.
>
> i'm following https://docs.confluent.io/current/connect/devguide.html
>
> Schema schema = SchemaBuilder.struct().name(NAME)
>     .field("name", Schema.STRING_SCHEMA)
>     .field("age", Schema.INT_SCHEMA)
>     .field("admin", new
> SchemaBuilder.boolean().defaultValue(false).build())
>     .build();
>
> Struct struct = new Struct(schema)
>     .put("name", "Barbara Liskov")
>     .put("age", 75)
>     .build();
>
> below is my code, i dont know how to set default value when schema type is
> struct
>
> Schema schema1 = SchemaBuilder.struct().name("info")
>                 .field("address", Schema.STRING_SCHEMA)
>                 .field("code",
> Schema.STRING_SCHEMA).defaultValue("").build();
>
> Thanks!
>
>
>
>
>
> Best,
> Lisheng
>

Re: about default value of struct of connect api

Posted by Lisheng Wang <wa...@gmail.com>.
hello guys,

update my question,

i made a test, code as below, i want to set a default value of address to
person

        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("address")
                .field("province", SchemaBuilder.STRING_SCHEMA)
                .field("city", SchemaBuilder.STRING_SCHEMA);
        Struct defaultValue = new Struct(schemaBuilder.build())
                .put("province", "aaaa")
                .put("city", "bbbb");
        Schema dataSchema = SchemaBuilder.struct().name("person")
                .field("address",
schemaBuilder.defaultValue(defaultValue).build()).build();
        Struct struct = new Struct(dataSchema);
        System.out.println(struct.toString());

i got exception as below

Exception in thread "main"
org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default
value
at
org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)

Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do
not match.
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at
org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 1 more

i digged code of ConnectSchema.validateValue and found when type is STRUCT,
then will check class of schema, but one is  SchemaBuilder, another
is ConnectSchema

         case STRUCT:
                Struct struct = (Struct) value;
                if (!struct.schema().equals(schema))
                    throw new DataException("Struct schemas do not match.");
                struct.validate();
                break;

the method of equals is

@Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ConnectSchema schema = (ConnectSchema) o;
        return Objects.equals(optional, schema.optional) &&
                Objects.equals(version, schema.version) &&
                Objects.equals(name, schema.name) &&
                Objects.equals(doc, schema.doc) &&
                Objects.equals(type, schema.type) &&
                Objects.deepEquals(defaultValue, schema.defaultValue) &&
                Objects.equals(fields, schema.fields) &&
                Objects.equals(keySchema, schema.keySchema) &&
                Objects.equals(valueSchema, schema.valueSchema) &&
                Objects.equals(parameters, schema.parameters);
    }

can anyone help how to set default value of "STRUCT" type with connect api?

Thanks

Best,
Lisheng


Lisheng Wang <wa...@gmail.com> 于2020年1月6日周一 下午3:31写道:

> hello kafka devs
>
> i'm facing a problem that how to set a default value of struct.
>
> i'm following https://docs.confluent.io/current/connect/devguide.html
>
> Schema schema = SchemaBuilder.struct().name(NAME)
>     .field("name", Schema.STRING_SCHEMA)
>     .field("age", Schema.INT_SCHEMA)
>     .field("admin", new
> SchemaBuilder.boolean().defaultValue(false).build())
>     .build();
>
> Struct struct = new Struct(schema)
>     .put("name", "Barbara Liskov")
>     .put("age", 75)
>     .build();
>
> below is my code, i dont know how to set default value when schema type is
> struct
>
> Schema schema1 = SchemaBuilder.struct().name("info")
>                 .field("address", Schema.STRING_SCHEMA)
>                 .field("code",
> Schema.STRING_SCHEMA).defaultValue("").build();
>
> Thanks!
>
>
>
>
>
> Best,
> Lisheng
>