You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lian Jiang <ji...@gmail.com> on 2020/07/11 02:16:20 UTC

Flink 1.11 Table API cannot process Avro

Hi,

According to
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html,
avro is supported for table API but below code failed:

tEnv.executeSql("CREATE TABLE people (\n" +
        "    id  INT,\n" +
        "    name STRING\n" +
        ") WITH (\n" +
        "    'connector' = 'filesystem',\n" +
        "    'path'     = 'file:///data/test.avro',\n" +
        "    'format'    = 'avro',\n" +
        "    'record-class'    = 'avro.Person',\n" +
        "    'property-version'    = '1',\n" +
        "    'properties.bootstrap.servers' = 'kafka:9092'\n" +
        ")");

But got:

 Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error: Could not find any factory for identifier
'avro' that implements
'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by:
org.apache.flink.table.api.ValidationException: Could not find any factory
for identifier 'avro' that implements
'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.

Any idea? Thanks!

Regards
Leon

Re: Flink 1.11 Table API cannot process Avro

Posted by Lian Jiang <ji...@gmail.com>.
Thanks Leonard and Jark.

Here is my repo for your repro:
https://bitbucket.org/jiangok/flink-playgrounds/src/0d242a51f02083711218d3810267117e6ce4260c/table-walkthrough/pom.xml#lines-131.
As you can see, my pom.xml has already added flink-avro and avro
dependencies.

You can run this repro by:

git clone git@bitbucket.org:jiangok/flink-playgrounds.git
cd flink-playgrounds/table-walkthrough
. scripts/ops.sh  # this script has some helper commands.
rebuild               # this will build artifacts, docker and run.
log jobmanager  # this will print job manager log which has the exception.

Appreciate very much for your help!



table-walkthrough


On Sun, Jul 12, 2020 at 8:00 PM Leonard Xu <xb...@gmail.com> wrote:

> Hi, Jiang
>
> Is there a uber jar or a list of runtime dependencies so that developers can easily make the above example of Flink SQL for avro work? Thanks.
>
>
> The dependency list for using Avro in Flink SQL  is simple and has not a
> uber jar AFAIK, we only need to add `flink-avro` and `avro` dependency, the
> `avro` dependency is mannaged which means you do not need to add it if your
> dependency list has contained a `avro` dependency. I wrote a simple
> demo[1], hope it can help you.
>
> Best,
> Leonard Xu
> [1]
> https://github.com/leonardBang/flink-sql-etl/blob/master/sql-avro/pom.xml#L32
>
>
>
>
>


-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Re: Flink 1.11 Table API cannot process Avro

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Jiang
> Is there a uber jar or a list of runtime dependencies so that developers can easily make the above example of Flink SQL for avro work? Thanks.

The dependency list for using Avro in Flink SQL  is simple and has not a uber jar AFAIK, we only need to add `flink-avro` and `avro` dependency, the `avro` dependency is mannaged which means you do not need to add it if your dependency list has contained a `avro` dependency. I wrote a simple demo[1], hope it can help you.

Best,
Leonard Xu
[1] https://github.com/leonardBang/flink-sql-etl/blob/master/sql-avro/pom.xml#L32 <https://github.com/leonardBang/flink-sql-etl/blob/master/sql-avro/pom.xml#L32>



 

Re: Flink 1.11 Table API cannot process Avro

Posted by Jark Wu <im...@gmail.com>.
From the latest exception message, it seems that the avro factory problem
has been resolved.
The new exception indicates that you don't have proper Apache Avro
dependencies (because flink-avro doesn't bundle Apache Avro),
so you have to add Apache Avro into your project dependency, or
export HADOOP_CLASSPATH if hadoop is installed in your environment.

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

Best,
Jark

On Mon, 13 Jul 2020 at 03:04, Lian Jiang <ji...@gmail.com> wrote:

> Thanks guys.
>
> I missed the runtime dependencies. After adding below into
> https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/Dockerfile.
> The original issue of "Could not find any factory for identifier" is gone.
>
> wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.0/flink-avro-1.11.0.jar; \
> wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar; \
> wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar; \
> wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar; \
> wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar;
>
>
> However, I got various NoSuchMethodException related to JsonNode/JsonNull/GenricRecord/...  The most recent exception is:
>
>  java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.avro.generic.GenericRecord.<init>()
> jobmanager_1      |     at org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) ~[avro-1.8.2.jar:1.8.2]
> jobmanager_1      |     at org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) ~[flink-avro-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:200) ~[flink-avro-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> Is there a uber jar or a list of runtime dependencies so that developers can easily make the above example of Flink SQL for avro work? Thanks.
>
>
>
>
>
> On Sat, Jul 11, 2020 at 11:39 PM Leonard Xu <xb...@gmail.com> wrote:
>
>> Hi, Jiang
>>
>>
>> jobmanager_1      | Available factory identifiers are:
>> jobmanager_1      |
>> jobmanager_1      | csv
>> jobmanager_1      | json
>> jobmanager_1      | parquet
>>
>>
>> After added the flink-avro dependency, did you restart your
>> cluster/sql-client? It looks flink-avro dependency did not load properly
>> from the log.
>>
>>
>> Best,
>> Leonard Xu
>>
>
>
> --
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>

Re: Flink 1.11 Table API cannot process Avro

Posted by Lian Jiang <ji...@gmail.com>.
Thanks guys.

I missed the runtime dependencies. After adding below into
https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/Dockerfile.
The original issue of "Could not find any factory for identifier" is gone.

wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.11.0/flink-avro-1.11.0.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar;
\
wget -P /opt/flink/lib/
https://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar;


However, I got various NoSuchMethodException related to
JsonNode/JsonNull/GenricRecord/...  The most recent exception is:

 java.lang.RuntimeException: java.lang.NoSuchMethodException:
org.apache.avro.generic.GenericRecord.<init>()
jobmanager_1      |     at
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:353)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:369)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.avro.reflect.ReflectData.newRecord(ReflectData.java:901)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:212)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
~[avro-1.8.2.jar:1.8.2]
jobmanager_1      |     at
org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165)
~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroInputFormat.nextRecord(AvroFileSystemFormatFactory.java:200)
~[flink-avro-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.0.jar:1.11.0]

Is there a uber jar or a list of runtime dependencies so that
developers can easily make the above example of Flink SQL for avro
work? Thanks.





On Sat, Jul 11, 2020 at 11:39 PM Leonard Xu <xb...@gmail.com> wrote:

> Hi, Jiang
>
>
> jobmanager_1      | Available factory identifiers are:
> jobmanager_1      |
> jobmanager_1      | csv
> jobmanager_1      | json
> jobmanager_1      | parquet
>
>
> After added the flink-avro dependency, did you restart your
> cluster/sql-client? It looks flink-avro dependency did not load properly
> from the log.
>
>
> Best,
> Leonard Xu
>


-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Re: Flink 1.11 Table API cannot process Avro

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Jiang

> 
> jobmanager_1      | Available factory identifiers are:
> jobmanager_1      | 
> jobmanager_1      | csv
> jobmanager_1      | json
> jobmanager_1      | parquet

After added the flink-avro dependency, did you restart your cluster/sql-client? It looks flink-avro dependency did not load properly from the log.


Best,
Leonard Xu

Re: Flink 1.11 Table API cannot process Avro

Posted by 方盛凯 <fs...@gmail.com>.
It seems that you don't add additional dependencies.

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.8.2</version>
</dependency>

Lian Jiang <ji...@gmail.com> 于2020年7月12日周日 下午1:08写道:

> i am using flink playground as the base:
>
> https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/pom.xml
>
> I observed "PhysicalLegacyTableSourceScan". Not sure whether this is
> related. Thanks. Regards!
>
> On Sat, Jul 11, 2020 at 3:43 PM Lian Jiang <ji...@gmail.com> wrote:
>
>> Thanks Jörn!
>>
>> I added the documented dependency in my pom.xml file:
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-avro</artifactId>
>>   <version>1.11.0</version></dependency>
>>
>> The newly generated jar does have:
>>
>> $ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory
>> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class
>>
>> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class
>>
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class
>>
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class
>>
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class
>> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class
>>
>> But still got the same error.  Anything else is missing? Thanks. Regards!
>>
>>
>> More detailed exception:
>> jobmanager_1      | Caused by:
>> org.apache.flink.client.program.ProgramInvocationException: The main method
>> caused an error: Could not find any factory for identifier 'avro' that
>> implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in
>> the classpath.
>> jobmanager_1      |
>> jobmanager_1      | Available factory identifiers are:
>> jobmanager_1      |
>> jobmanager_1      | csv
>> jobmanager_1      | json
>> jobmanager_1      | parquet
>> jobmanager_1      |     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     ... 10 more
>> jobmanager_1      | Caused by:
>> org.apache.flink.table.api.ValidationException: Could not find any factory
>> for identifier 'avro' that implements
>> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
>> jobmanager_1      |
>> jobmanager_1      | Available factory identifiers are:
>> jobmanager_1      |
>> jobmanager_1      | csv
>> jobmanager_1      | json
>> jobmanager_1      | parquet
>> jobmanager_1      |     at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
>> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.playgrounds.spendreport.SpendReport.localavro_mysql(SpendReport.java:220)
>> ~[?:?]
>> jobmanager_1      |     at
>> org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:31)
>> ~[?:?]
>> jobmanager_1      |     at
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]
>> jobmanager_1      |     at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_252]
>> jobmanager_1      |     at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_252]
>> jobmanager_1      |     at
>> java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
>> jobmanager_1      |     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>
>> On Sat, Jul 11, 2020 at 12:33 AM Jörn Franke <jo...@gmail.com>
>> wrote:
>>
>>> You are missing additional dependencies
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>>>
>>> Am 11.07.2020 um 04:16 schrieb Lian Jiang <ji...@gmail.com>:
>>>
>>> 
>>> Hi,
>>>
>>> According to
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html,
>>> avro is supported for table API but below code failed:
>>>
>>> tEnv.executeSql("CREATE TABLE people (\n" +
>>>         "    id  INT,\n" +
>>>         "    name STRING\n" +
>>>         ") WITH (\n" +
>>>         "    'connector' = 'filesystem',\n" +
>>>         "    'path'     = 'file:///data/test.avro',\n" +
>>>         "    'format'    = 'avro',\n" +
>>>         "    'record-class'    = 'avro.Person',\n" +
>>>         "    'property-version'    = '1',\n" +
>>>         "    'properties.bootstrap.servers' = 'kafka:9092'\n" +
>>>         ")");
>>>
>>> But got:
>>>
>>>  Caused by: org.apache.flink.client.program.ProgramInvocationException:
>>> The main method caused an error: Could not find any factory for identifier
>>> 'avro' that implements
>>> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
>>> jobmanager_1      |
>>> jobmanager_1      | Available factory identifiers are:
>>> jobmanager_1      |
>>> jobmanager_1      | csv
>>> jobmanager_1      | json
>>> jobmanager_1      | parquet
>>> jobmanager_1      |     at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>> jobmanager_1      |     at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>> jobmanager_1      |     at
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>> jobmanager_1      |     at
>>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>>> jobmanager_1      |     ... 10 more
>>> jobmanager_1      | Caused by:
>>> org.apache.flink.table.api.ValidationException: Could not find any factory
>>> for identifier 'avro' that implements
>>> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
>>>
>>> Any idea? Thanks!
>>>
>>> Regards
>>> Leon
>>>
>>>
>>>
>>>
>>>
>>
>> --
>>
>> Create your own email signature
>> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>>
>
>
> --
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>

Re: Flink 1.11 Table API cannot process Avro

Posted by Lian Jiang <ji...@gmail.com>.
i am using flink playground as the base:
https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/pom.xml

I observed "PhysicalLegacyTableSourceScan". Not sure whether this is
related. Thanks. Regards!

On Sat, Jul 11, 2020 at 3:43 PM Lian Jiang <ji...@gmail.com> wrote:

> Thanks Jörn!
>
> I added the documented dependency in my pom.xml file:
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-avro</artifactId>
>   <version>1.11.0</version></dependency>
>
> The newly generated jar does have:
>
> $ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory
> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class
>
> org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class
>
> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class
>
> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class
>
> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class
> org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class
> org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class
>
> But still got the same error.  Anything else is missing? Thanks. Regards!
>
>
> More detailed exception:
> jobmanager_1      | Caused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Could not find any factory for identifier 'avro' that
> implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in
> the classpath.
> jobmanager_1      |
> jobmanager_1      | Available factory identifiers are:
> jobmanager_1      |
> jobmanager_1      | csv
> jobmanager_1      | json
> jobmanager_1      | parquet
> jobmanager_1      |     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     ... 10 more
> jobmanager_1      | Caused by:
> org.apache.flink.table.api.ValidationException: Could not find any factory
> for identifier 'avro' that implements
> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
> jobmanager_1      |
> jobmanager_1      | Available factory identifiers are:
> jobmanager_1      |
> jobmanager_1      | csv
> jobmanager_1      | json
> jobmanager_1      | parquet
> jobmanager_1      |     at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.Iterator$class.foreach(Iterator.scala:891)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
> ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.playgrounds.spendreport.SpendReport.localavro_mysql(SpendReport.java:220)
> ~[?:?]
> jobmanager_1      |     at
> org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:31)
> ~[?:?]
> jobmanager_1      |     at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]
> jobmanager_1      |     at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_252]
> jobmanager_1      |     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_252]
> jobmanager_1      |     at
> java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252]
> jobmanager_1      |     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>
> On Sat, Jul 11, 2020 at 12:33 AM Jörn Franke <jo...@gmail.com> wrote:
>
>> You are missing additional dependencies
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>>
>> Am 11.07.2020 um 04:16 schrieb Lian Jiang <ji...@gmail.com>:
>>
>> 
>> Hi,
>>
>> According to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html,
>> avro is supported for table API but below code failed:
>>
>> tEnv.executeSql("CREATE TABLE people (\n" +
>>         "    id  INT,\n" +
>>         "    name STRING\n" +
>>         ") WITH (\n" +
>>         "    'connector' = 'filesystem',\n" +
>>         "    'path'     = 'file:///data/test.avro',\n" +
>>         "    'format'    = 'avro',\n" +
>>         "    'record-class'    = 'avro.Person',\n" +
>>         "    'property-version'    = '1',\n" +
>>         "    'properties.bootstrap.servers' = 'kafka:9092'\n" +
>>         ")");
>>
>> But got:
>>
>>  Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The main method caused an error: Could not find any factory for identifier
>> 'avro' that implements
>> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
>> jobmanager_1      |
>> jobmanager_1      | Available factory identifiers are:
>> jobmanager_1      |
>> jobmanager_1      | csv
>> jobmanager_1      | json
>> jobmanager_1      | parquet
>> jobmanager_1      |     at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> jobmanager_1      |     ... 10 more
>> jobmanager_1      | Caused by:
>> org.apache.flink.table.api.ValidationException: Could not find any factory
>> for identifier 'avro' that implements
>> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
>>
>> Any idea? Thanks!
>>
>> Regards
>> Leon
>>
>>
>>
>>
>>
>
> --
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>
>


-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Re: Flink 1.11 Table API cannot process Avro

Posted by Lian Jiang <ji...@gmail.com>.
Thanks Jörn!

I added the documented dependency in my pom.xml file:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.11.0</version></dependency>

The newly generated jar does have:

$ jar tf target//spend-report-1.0.0.jar | grep FileSystemFormatFactory
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.class
org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory$ParquetInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroWriterFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$RowDataAvroInputFormat.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory.class
org/apache/flink/formats/avro/AvroFileSystemFormatFactory$1.class

But still got the same error.  Anything else is missing? Thanks. Regards!


More detailed exception:
jobmanager_1      | Caused by:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Could not find any factory for identifier 'avro' that
implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in
the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     ... 10 more
jobmanager_1      | Caused by:
org.apache.flink.table.api.ValidationException: Could not find any factory
for identifier 'avro' that implements
'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
jobmanager_1      |
jobmanager_1      | Available factory identifiers are:
jobmanager_1      |
jobmanager_1      | csv
jobmanager_1      | json
jobmanager_1      | parquet
jobmanager_1      |     at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.filesystem.FileSystemTableFactory.createFormatFactory(FileSystemTableFactory.java:112)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:143)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.filesystem.FileSystemTableSource.getDataStream(FileSystemTableSource.java:127)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.physical.PhysicalLegacyTableSourceScan.getSourceTransformation(PhysicalLegacyTableSourceScan.scala:82)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:98)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.playgrounds.spendreport.SpendReport.localavro_mysql(SpendReport.java:220)
~[?:?]
jobmanager_1      |     at
org.apache.flink.playgrounds.spendreport.SpendReport.main(SpendReport.java:31)
~[?:?]
jobmanager_1      |     at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252]
jobmanager_1      |     at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_252]
jobmanager_1      |     at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_252]
jobmanager_1      |     at java.lang.reflect.Method.invoke(Method.java:498)
~[?:1.8.0_252]
jobmanager_1      |     at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
jobmanager_1      |     at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
~[flink-dist_2.11-1.11.0.jar:1.11.0]

On Sat, Jul 11, 2020 at 12:33 AM Jörn Franke <jo...@gmail.com> wrote:

> You are missing additional dependencies
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html
>
> Am 11.07.2020 um 04:16 schrieb Lian Jiang <ji...@gmail.com>:
>
> 
> Hi,
>
> According to
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html,
> avro is supported for table API but below code failed:
>
> tEnv.executeSql("CREATE TABLE people (\n" +
>         "    id  INT,\n" +
>         "    name STRING\n" +
>         ") WITH (\n" +
>         "    'connector' = 'filesystem',\n" +
>         "    'path'     = 'file:///data/test.avro',\n" +
>         "    'format'    = 'avro',\n" +
>         "    'record-class'    = 'avro.Person',\n" +
>         "    'property-version'    = '1',\n" +
>         "    'properties.bootstrap.servers' = 'kafka:9092'\n" +
>         ")");
>
> But got:
>
>  Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The main method caused an error: Could not find any factory for identifier
> 'avro' that implements
> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
> jobmanager_1      |
> jobmanager_1      | Available factory identifiers are:
> jobmanager_1      |
> jobmanager_1      | csv
> jobmanager_1      | json
> jobmanager_1      | parquet
> jobmanager_1      |     at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     ... 10 more
> jobmanager_1      | Caused by:
> org.apache.flink.table.api.ValidationException: Could not find any factory
> for identifier 'avro' that implements
> 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
>
> Any idea? Thanks!
>
> Regards
> Leon
>
>
>
>
>

-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion&utm_medium=signature&utm_campaign=create_your_own&srcid=5234462839406592>

Re: Flink 1.11 Table API cannot process Avro

Posted by Jörn Franke <jo...@gmail.com>.
You are missing additional dependencies 

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html

> Am 11.07.2020 um 04:16 schrieb Lian Jiang <ji...@gmail.com>:
> 
> 
> Hi,
> 
> According to https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html, avro is supported for table API but below code failed:
> tEnv.executeSql("CREATE TABLE people (\n" +
>         "    id  INT,\n" +
>         "    name STRING\n" +
>         ") WITH (\n" +
>         "    'connector' = 'filesystem',\n" +
>         "    'path'     = 'file:///data/test.avro',\n" +
>         "    'format'    = 'avro',\n" +
>         "    'record-class'    = 'avro.Person',\n" +
>         "    'property-version'    = '1',\n" +
>         "    'properties.bootstrap.servers' = 'kafka:9092'\n" +
>         ")");
> 
> But got:
>  Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
> jobmanager_1      | 
> jobmanager_1      | Available factory identifiers are:
> jobmanager_1      | 
> jobmanager_1      | csv
> jobmanager_1      | json
> jobmanager_1      | parquet
> jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
> jobmanager_1      |     ... 10 more
> jobmanager_1      | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro' that implements 'org.apache.flink.table.factories.FileSystemFormatFactory' in the classpath.
> 
> Any idea? Thanks!
> 
> Regards
> Leon
> 
> 
> 
>