You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Whelan <sw...@jwplayer.com> on 2020/03/19 16:13:37 UTC

java.lang.AbstractMethodError when implementing KafkaSerializationSchema

Hi,

I am attempting to create a Key/Value serializer for the Kafka table
connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
classes, updating the serializer.

First, I created `JsonRowKeyedSerializationSchema` which implements
`KeyedSerializationSchema`[2], which is deprecated. The way it works is you
provide a list of indices in your Row output that are the Key. This works
successfully.

When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
`KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
exception. Normally, this would me I'm using an old interface however all
my Flink dependencies are version 1.9. I can not find this abstract
`serialize()` function in the Flink codebase. Has anyone come across this
before?

When I print the method of my `JsonRowKeyedSerializationSchema` class, I do
see the below which seems to be getting called by the FlinkKafkaProducer
but I do not see it in `KafkaSerializationSchema`:

public abstract
org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
java.lang.Object
java.lang.Long


*`JsonRowKeyedSerializationSchema` class*

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.producer.ProducerRecord;

public class JsonRowKeyedSerializationSchema implements
KafkaSerializationSchema<Row> {

  // constructors and helpers

  @Override
  public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long
aLong) {
    return new ProducerRecord<>("some_topic", serializeKey(row),
serializeValue(row));
  }
}


*Stacktrace:*

Caused by: java.lang.AbstractMethodError: Method
com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
is abstract
at
com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)


[1]
https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
[2]
https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
[3]
https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java

Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

Posted by Arvid Heise <ar...@ververica.com>.
Hi Steve,

I just noticed some inconsistency: Your class correctly contains the bridge
method (last method in javap).
Your stacktrace however mentions
*org/apache/**flink/kafka/shaded*/org/apache/kafka/clients/producer/ProducerRecord
instead of org.apache.kafka.clients.producer.ProducerRecord.

Did you perform any relocations yourself (quite unlikely)? If so, please
add an exclusion for org.apache.kafka.

If not, then we may look at some inconsistencies in different versions of
Flink and your setup. Could you double check all Flink versions ideally
down to the minor part, especially on EMR (dashboard) but also in your poms
(mvn dependency:tree)?

On Wed, Mar 25, 2020 at 2:35 AM Steve Whelan <sw...@jwplayer.com> wrote:

> Hi Arvid,
>
> Interestingly, my job runs successfully in a docker container (image*
> flink:1.9.0-scala_2.11*) but is failing with the
> *java.lang.AbstractMethodError* on AWS EMR (non-docker). I am compiling
> with java version OpenJDK 1.8.0_242, which is the same version my EMR
> cluster is running. Though since it runs successfully locally in a docker
> container, it would point to an issue in our AWS environment setup. Oddly,
> we have been running Flink on EMR for +2 years and have never come across
> this till now.
>
> Results of javap are:
>
> public class
> com.jwplayer.flink.serialization.json.JsonRowKeyedSerializationSchema
> implements
> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
> {
>   public static
> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
> create(com.jwplayer.flink.config.serde.SerDeConfig);
>   public byte[] serializeKey(org.apache.flink.types.Row);
>   public byte[] serializeValue(org.apache.flink.types.Row);
>   public org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>
> serialize(org.apache.flink.types.Row, java.lang.Long);
>   public org.apache.kafka.clients.producer.ProducerRecord
> serialize(java.lang.Object, java.lang.Long);
> }
>
>
> On Mon, Mar 23, 2020 at 9:55 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Steve,
>>
>> for some reason, it seems as if the Java compiler is not generating the
>> bridge method [1].
>>
>> Could you double-check that the Java version of your build process and
>> your cluster match?
>>
>> Could you run javap on your generated class file and report back?
>>
>> [1]
>> https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html
>>
>> On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <sw...@jwplayer.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am attempting to create a Key/Value serializer for the Kafka table
>>> connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
>>> classes, updating the serializer.
>>>
>>> First, I created `JsonRowKeyedSerializationSchema` which implements
>>> `KeyedSerializationSchema`[2], which is deprecated. The way it works is you
>>> provide a list of indices in your Row output that are the Key. This works
>>> successfully.
>>>
>>> When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
>>> `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
>>> exception. Normally, this would me I'm using an old interface however all
>>> my Flink dependencies are version 1.9. I can not find this abstract
>>> `serialize()` function in the Flink codebase. Has anyone come across this
>>> before?
>>>
>>> When I print the method of my `JsonRowKeyedSerializationSchema` class, I
>>> do see the below which seems to be getting called by the FlinkKafkaProducer
>>> but I do not see it in `KafkaSerializationSchema`:
>>>
>>> public abstract
>>> org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
>>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
>>> java.lang.Object
>>> java.lang.Long
>>>
>>>
>>> *`JsonRowKeyedSerializationSchema` class*
>>>
>>> import
>>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>>> import org.apache.flink.types.Row;
>>> import org.apache.kafka.clients.producer.ProducerRecord;
>>>
>>> public class JsonRowKeyedSerializationSchema implements
>>> KafkaSerializationSchema<Row> {
>>>
>>>   // constructors and helpers
>>>
>>>   @Override
>>>   public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable
>>> Long aLong) {
>>>     return new ProducerRecord<>("some_topic", serializeKey(row),
>>> serializeValue(row));
>>>   }
>>> }
>>>
>>>
>>> *Stacktrace:*
>>>
>>> Caused by: java.lang.AbstractMethodError: Method
>>> com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>>> is abstract
>>> at
>>> com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
>>> [2]
>>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
>>> [3]
>>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>>>
>>>
>>

Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

Posted by Steve Whelan <sw...@jwplayer.com>.
Hi Arvid,

Interestingly, my job runs successfully in a docker container (image*
flink:1.9.0-scala_2.11*) but is failing with the
*java.lang.AbstractMethodError* on AWS EMR (non-docker). I am compiling
with java version OpenJDK 1.8.0_242, which is the same version my EMR
cluster is running. Though since it runs successfully locally in a docker
container, it would point to an issue in our AWS environment setup. Oddly,
we have been running Flink on EMR for +2 years and have never come across
this till now.

Results of javap are:

public class
com.jwplayer.flink.serialization.json.JsonRowKeyedSerializationSchema
implements
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
{
  public static
org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
create(com.jwplayer.flink.config.serde.SerDeConfig);
  public byte[] serializeKey(org.apache.flink.types.Row);
  public byte[] serializeValue(org.apache.flink.types.Row);
  public org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>
serialize(org.apache.flink.types.Row, java.lang.Long);
  public org.apache.kafka.clients.producer.ProducerRecord
serialize(java.lang.Object, java.lang.Long);
}


On Mon, Mar 23, 2020 at 9:55 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Steve,
>
> for some reason, it seems as if the Java compiler is not generating the
> bridge method [1].
>
> Could you double-check that the Java version of your build process and
> your cluster match?
>
> Could you run javap on your generated class file and report back?
>
> [1]
> https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html
>
> On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <sw...@jwplayer.com> wrote:
>
>> Hi,
>>
>> I am attempting to create a Key/Value serializer for the Kafka table
>> connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
>> classes, updating the serializer.
>>
>> First, I created `JsonRowKeyedSerializationSchema` which implements
>> `KeyedSerializationSchema`[2], which is deprecated. The way it works is you
>> provide a list of indices in your Row output that are the Key. This works
>> successfully.
>>
>> When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
>> `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
>> exception. Normally, this would me I'm using an old interface however all
>> my Flink dependencies are version 1.9. I can not find this abstract
>> `serialize()` function in the Flink codebase. Has anyone come across this
>> before?
>>
>> When I print the method of my `JsonRowKeyedSerializationSchema` class, I
>> do see the below which seems to be getting called by the FlinkKafkaProducer
>> but I do not see it in `KafkaSerializationSchema`:
>>
>> public abstract
>> org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
>> java.lang.Object
>> java.lang.Long
>>
>>
>> *`JsonRowKeyedSerializationSchema` class*
>>
>> import
>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>> import org.apache.flink.types.Row;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> public class JsonRowKeyedSerializationSchema implements
>> KafkaSerializationSchema<Row> {
>>
>>   // constructors and helpers
>>
>>   @Override
>>   public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long
>> aLong) {
>>     return new ProducerRecord<>("some_topic", serializeKey(row),
>> serializeValue(row));
>>   }
>> }
>>
>>
>> *Stacktrace:*
>>
>> Caused by: java.lang.AbstractMethodError: Method
>> com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>> is abstract
>> at
>> com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
>> [2]
>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
>> [3]
>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>>
>>
>

Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema

Posted by Arvid Heise <ar...@ververica.com>.
Hi Steve,

for some reason, it seems as if the Java compiler is not generating the
bridge method [1].

Could you double-check that the Java version of your build process and your
cluster match?

Could you run javap on your generated class file and report back?

[1] https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html

On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <sw...@jwplayer.com> wrote:

> Hi,
>
> I am attempting to create a Key/Value serializer for the Kafka table
> connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
> classes, updating the serializer.
>
> First, I created `JsonRowKeyedSerializationSchema` which implements
> `KeyedSerializationSchema`[2], which is deprecated. The way it works is you
> provide a list of indices in your Row output that are the Key. This works
> successfully.
>
> When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
> `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
> exception. Normally, this would me I'm using an old interface however all
> my Flink dependencies are version 1.9. I can not find this abstract
> `serialize()` function in the Flink codebase. Has anyone come across this
> before?
>
> When I print the method of my `JsonRowKeyedSerializationSchema` class, I
> do see the below which seems to be getting called by the FlinkKafkaProducer
> but I do not see it in `KafkaSerializationSchema`:
>
> public abstract
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
> java.lang.Object
> java.lang.Long
>
>
> *`JsonRowKeyedSerializationSchema` class*
>
> import
> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
> import org.apache.flink.types.Row;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> public class JsonRowKeyedSerializationSchema implements
> KafkaSerializationSchema<Row> {
>
>   // constructors and helpers
>
>   @Override
>   public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable Long
> aLong) {
>     return new ProducerRecord<>("some_topic", serializeKey(row),
> serializeValue(row));
>   }
> }
>
>
> *Stacktrace:*
>
> Caused by: java.lang.AbstractMethodError: Method
> com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
> is abstract
> at
> com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
> [2]
> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
> [3]
> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>
>