You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Others <41...@qq.com> on 2020/01/15 06:59:51 UTC

求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&nbsp;&nbsp;&nbsp;&nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&nbsp;&nbsp;&nbsp;&nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&nbsp;&nbsp;&nbsp;&nbsp;... 17 more


一下是pom内容


<name&gt;Flink Quickstart Job</name&gt;
<url&gt;http://www.myorganization.org</url&gt;
<profiles&gt;
   <profile&gt;
      <id&gt;dev</id&gt;
      <activation&gt;
         <activeByDefault&gt;true</activeByDefault&gt;
      </activation&gt;
      <properties&gt;
         <project.scope&gt;compile</project.scope&gt;
      </properties&gt;
   </profile&gt;
   <profile&gt;
      <id&gt;pro</id&gt;
      <properties&gt;
         <project.scope&gt;provided</project.scope&gt;
      </properties&gt;
   </profile&gt;
</profiles&gt;
<properties&gt;
   <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
   <flink.version&gt;1.9.1</flink.version&gt;
   <java.version&gt;1.8</java.version&gt;
   <scala.binary.version&gt;2.11</scala.binary.version&gt;
   <maven.compiler.source&gt;${java.version}</maven.compiler.source&gt;
   <maven.compiler.target&gt;${java.version}</maven.compiler.target&gt;
</properties&gt;

<repositories&gt;
   <repository&gt;
      <id&gt;apache.snapshots</id&gt;
      <name&gt;Apache Development Snapshot Repository</name&gt;
      <url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt;
      <releases&gt;
         <enabled&gt;false</enabled&gt;
      </releases&gt;
      <snapshots&gt;
         <enabled&gt;true</enabled&gt;
      </snapshots&gt;
   </repository&gt;
</repositories&gt;

<dependencies&gt;
   <!-- Apache Flink dependencies --&gt;
   <!-- These dependencies are provided, because they should not be packaged into the JAR file. --&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-java</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-json</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-common</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-jdbc_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;



   <!--mysql--&gt;
   <dependency&gt;
      <groupId&gt;mysql</groupId&gt;
      <artifactId&gt;mysql-connector-java</artifactId&gt;
      <version&gt;5.1.48</version&gt;
   </dependency&gt;
   <!-- Gson--&gt;
   <dependency&gt;
      <groupId&gt;com.google.code.gson</groupId&gt;
      <artifactId&gt;gson</artifactId&gt;
      <version&gt;2.8.5</version&gt;
   </dependency&gt;
   <!-- Add logging framework, to produce console output when running in the IDE. --&gt;
   <!-- These dependencies are excluded from the application JAR by default. --&gt;
   <dependency&gt;
      <groupId&gt;org.slf4j</groupId&gt;
      <artifactId&gt;slf4j-log4j12</artifactId&gt;
      <version&gt;1.7.7</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;log4j</groupId&gt;
      <artifactId&gt;log4j</artifactId&gt;
      <version&gt;1.2.17</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
</dependencies&gt;

<build&gt;
   <plugins&gt;

      <!-- Java Compiler --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-compiler-plugin</artifactId&gt;
         <version&gt;3.1</version&gt;
         <configuration&gt;
            <source&gt;${java.version}</source&gt;
            <target&gt;${java.version}</target&gt;
         </configuration&gt;
      </plugin&gt;
      <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&gt;
      <!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry point changes. --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-shade-plugin</artifactId&gt;
         <version&gt;3.0.0</version&gt;
         <executions&gt;
            <!-- Run shade goal on package phase --&gt;
            <execution&gt;
               <phase&gt;package</phase&gt;
               <goals&gt;
                  <goal&gt;shade</goal&gt;
               </goals&gt;
               <configuration&gt;
                  <artifactSet&gt;
                     <excludes&gt;
                        <exclude&gt;org.apache.flink:force-shading</exclude&gt;
                        <exclude&gt;com.google.code.findbugs:jsr305</exclude&gt;
                        <exclude&gt;org.slf4j:*</exclude&gt;
                        <exclude&gt;log4j:*</exclude&gt;
                     </excludes&gt;
                  </artifactSet&gt;
                  <filters&gt;
                     <filter&gt;
                        <!-- Do not copy the signatures in the META-INF folder.
                        Otherwise, this might cause SecurityExceptions when using the JAR. --&gt;
                        <artifact&gt;*:*</artifact&gt;
                        <excludes&gt;
                           <exclude&gt;META-INF/*.SF</exclude&gt;
                           <exclude&gt;META-INF/*.DSA</exclude&gt;
                           <exclude&gt;META-INF/*.RSA</exclude&gt;
                        </excludes&gt;
                     </filter&gt;
                  </filters&gt;
                  <transformers&gt;
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt;
                        <mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt;
                     </transformer&gt;
                  </transformers&gt;
               </configuration&gt;
            </execution&gt;
         </executions&gt;
      </plugin&gt;
   </plugins&gt;


</build&gt;


请问 这个应该如何解决 谢谢

回复:求助帖:flink 连接kafka source 部署集群报错

Posted by Others <41...@qq.com>.
放到 lib 下可以成功启动了 感谢解答




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"AS"<allensu213@163.com&gt;;
发送时间:&nbsp;2020年1月15日(星期三) 下午4:19
收件人:&nbsp;"user-zh@flink.apache.org"<user-zh@flink.apache.org&gt;;

主题:&nbsp;回复:求助帖:flink 连接kafka source 部署集群报错



Hi:
&nbsp;&nbsp;&nbsp; 之前有遇到过类似的情况, 我这边是kafka的factory没有找到.
&nbsp;&nbsp;&nbsp; 把 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies 下的jar放在flink的lib目录下(可能需要重启一下集群) 就行了. 
&nbsp;&nbsp;&nbsp; 你试一下吧.






在2020年01月15日 14:59,Others<41486661@qq.com&gt; 写道:
我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler&nbsp;&nbsp;&nbsp; - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;... 17 more


一下是pom内容


<name&amp;gt;Flink Quickstart Job</name&amp;gt;
<url&amp;gt;http://www.myorganization.org</url&amp;gt;
<profiles&amp;gt;
<profile&amp;gt;
<id&amp;gt;dev</id&amp;gt;
<activation&amp;gt;
<activeByDefault&amp;gt;true</activeByDefault&amp;gt;
</activation&amp;gt;
<properties&amp;gt;
<project.scope&amp;gt;compile</project.scope&amp;gt;
</properties&amp;gt;
</profile&amp;gt;
<profile&amp;gt;
<id&amp;gt;pro</id&amp;gt;
<properties&amp;gt;
<project.scope&amp;gt;provided</project.scope&amp;gt;
</properties&amp;gt;
</profile&amp;gt;
</profiles&amp;gt;
<properties&amp;gt;
<project.build.sourceEncoding&amp;gt;UTF-8</project.build.sourceEncoding&amp;gt;
<flink.version&amp;gt;1.9.1</flink.version&amp;gt;
<java.version&amp;gt;1.8</java.version&amp;gt;
<scala.binary.version&amp;gt;2.11</scala.binary.version&amp;gt;
<maven.compiler.source&amp;gt;${java.version}</maven.compiler.source&amp;gt;
<maven.compiler.target&amp;gt;${java.version}</maven.compiler.target&amp;gt;
</properties&amp;gt;

<repositories&amp;gt;
<repository&amp;gt;
<id&amp;gt;apache.snapshots</id&amp;gt;
<name&amp;gt;Apache Development Snapshot Repository</name&amp;gt;
<url&amp;gt;https://repository.apache.org/content/repositories/snapshots/</url&amp;gt;
<releases&amp;gt;
<enabled&amp;gt;false</enabled&amp;gt;
</releases&amp;gt;
<snapshots&amp;gt;
<enabled&amp;gt;true</enabled&amp;gt;
</snapshots&amp;gt;
</repository&amp;gt;
</repositories&amp;gt;

<dependencies&amp;gt;
<!-- Apache Flink dependencies --&amp;gt;
<!-- These dependencies are provided, because they should not be packaged into the JAR file. --&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-java</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
<scope&amp;gt;${project.scope}</scope&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-streaming-java_${scala.binary.version}</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
<scope&amp;gt;${project.scope}</scope&amp;gt;
</dependency&amp;gt;

<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-api-java-bridge_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-planner-blink_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;

<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-connector-kafka_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-json</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-common</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;

<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-jdbc_2.11</artifactId&amp;gt;
<version&amp;gt;${flink.version}</version&amp;gt;
</dependency&amp;gt;



<!--mysql--&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;mysql</groupId&amp;gt;
<artifactId&amp;gt;mysql-connector-java</artifactId&amp;gt;
<version&amp;gt;5.1.48</version&amp;gt;
</dependency&amp;gt;
<!-- Gson--&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;com.google.code.gson</groupId&amp;gt;
<artifactId&amp;gt;gson</artifactId&amp;gt;
<version&amp;gt;2.8.5</version&amp;gt;
</dependency&amp;gt;
<!-- Add logging framework, to produce console output when running in the IDE. --&amp;gt;
<!-- These dependencies are excluded from the application JAR by default. --&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;org.slf4j</groupId&amp;gt;
<artifactId&amp;gt;slf4j-log4j12</artifactId&amp;gt;
<version&amp;gt;1.7.7</version&amp;gt;
<scope&amp;gt;runtime</scope&amp;gt;
</dependency&amp;gt;
<dependency&amp;gt;
<groupId&amp;gt;log4j</groupId&amp;gt;
<artifactId&amp;gt;log4j</artifactId&amp;gt;
<version&amp;gt;1.2.17</version&amp;gt;
<scope&amp;gt;runtime</scope&amp;gt;
</dependency&amp;gt;
</dependencies&amp;gt;

<build&amp;gt;
<plugins&amp;gt;

<!-- Java Compiler --&amp;gt;
<plugin&amp;gt;
<groupId&amp;gt;org.apache.maven.plugins</groupId&amp;gt;
<artifactId&amp;gt;maven-compiler-plugin</artifactId&amp;gt;
<version&amp;gt;3.1</version&amp;gt;
<configuration&amp;gt;
<source&amp;gt;${java.version}</source&amp;gt;
<target&amp;gt;${java.version}</target&amp;gt;
</configuration&amp;gt;
</plugin&amp;gt;
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&amp;gt;
<!-- Change the value of <mainClass&amp;gt;...</mainClass&amp;gt; if your program entry point changes. --&amp;gt;
<plugin&amp;gt;
<groupId&amp;gt;org.apache.maven.plugins</groupId&amp;gt;
<artifactId&amp;gt;maven-shade-plugin</artifactId&amp;gt;
<version&amp;gt;3.0.0</version&amp;gt;
<executions&amp;gt;
<!-- Run shade goal on package phase --&amp;gt;
<execution&amp;gt;
<phase&amp;gt;package</phase&amp;gt;
<goals&amp;gt;
<goal&amp;gt;shade</goal&amp;gt;
</goals&amp;gt;
<configuration&amp;gt;
<artifactSet&amp;gt;
<excludes&amp;gt;
<exclude&amp;gt;org.apache.flink:force-shading</exclude&amp;gt;
<exclude&amp;gt;com.google.code.findbugs:jsr305</exclude&amp;gt;
<exclude&amp;gt;org.slf4j:*</exclude&amp;gt;
<exclude&amp;gt;log4j:*</exclude&amp;gt;
</excludes&amp;gt;
</artifactSet&amp;gt;
<filters&amp;gt;
<filter&amp;gt;
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. --&amp;gt;
<artifact&amp;gt;*:*</artifact&amp;gt;
<excludes&amp;gt;
<exclude&amp;gt;META-INF/*.SF</exclude&amp;gt;
<exclude&amp;gt;META-INF/*.DSA</exclude&amp;gt;
<exclude&amp;gt;META-INF/*.RSA</exclude&amp;gt;
</excludes&amp;gt;
</filter&amp;gt;
</filters&amp;gt;
<transformers&amp;gt;
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&amp;gt;
<mainClass&amp;gt;com.doumob.flink.BuoyDataJob</mainClass&amp;gt;
</transformer&amp;gt;
</transformers&amp;gt;
</configuration&amp;gt;
</execution&amp;gt;
</executions&amp;gt;
</plugin&amp;gt;
</plugins&amp;gt;


</build&amp;gt;


请问 这个应该如何解决 谢谢

回复:求助帖:flink 连接kafka source 部署集群报错

Posted by AS <al...@163.com>.
Hi:
    之前有遇到过类似的情况, 我这边是kafka的factory没有找到.
    把 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies 下的jar放在flink的lib目录下(可能需要重启一下集群) 就行了. 
    你试一下吧.






在2020年01月15日 14:59,Others<41...@qq.com> 写道:
我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&nbsp;&nbsp;&nbsp;&nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&nbsp;&nbsp;&nbsp;&nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&nbsp;&nbsp;&nbsp;&nbsp;... 17 more


一下是pom内容


<name&gt;Flink Quickstart Job</name&gt;
<url&gt;http://www.myorganization.org</url&gt;
<profiles&gt;
<profile&gt;
<id&gt;dev</id&gt;
<activation&gt;
<activeByDefault&gt;true</activeByDefault&gt;
</activation&gt;
<properties&gt;
<project.scope&gt;compile</project.scope&gt;
</properties&gt;
</profile&gt;
<profile&gt;
<id&gt;pro</id&gt;
<properties&gt;
<project.scope&gt;provided</project.scope&gt;
</properties&gt;
</profile&gt;
</profiles&gt;
<properties&gt;
<project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
<flink.version&gt;1.9.1</flink.version&gt;
<java.version&gt;1.8</java.version&gt;
<scala.binary.version&gt;2.11</scala.binary.version&gt;
<maven.compiler.source&gt;${java.version}</maven.compiler.source&gt;
<maven.compiler.target&gt;${java.version}</maven.compiler.target&gt;
</properties&gt;

<repositories&gt;
<repository&gt;
<id&gt;apache.snapshots</id&gt;
<name&gt;Apache Development Snapshot Repository</name&gt;
<url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt;
<releases&gt;
<enabled&gt;false</enabled&gt;
</releases&gt;
<snapshots&gt;
<enabled&gt;true</enabled&gt;
</snapshots&gt;
</repository&gt;
</repositories&gt;

<dependencies&gt;
<!-- Apache Flink dependencies --&gt;
<!-- These dependencies are provided, because they should not be packaged into the JAR file. --&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-java</artifactId&gt;
<version&gt;${flink.version}</version&gt;
<scope&gt;${project.scope}</scope&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt;
<version&gt;${flink.version}</version&gt;
<scope&gt;${project.scope}</scope&gt;
</dependency&gt;

<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-json</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-table-common</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;

<dependency&gt;
<groupId&gt;org.apache.flink</groupId&gt;
<artifactId&gt;flink-jdbc_2.11</artifactId&gt;
<version&gt;${flink.version}</version&gt;
</dependency&gt;



<!--mysql--&gt;
<dependency&gt;
<groupId&gt;mysql</groupId&gt;
<artifactId&gt;mysql-connector-java</artifactId&gt;
<version&gt;5.1.48</version&gt;
</dependency&gt;
<!-- Gson--&gt;
<dependency&gt;
<groupId&gt;com.google.code.gson</groupId&gt;
<artifactId&gt;gson</artifactId&gt;
<version&gt;2.8.5</version&gt;
</dependency&gt;
<!-- Add logging framework, to produce console output when running in the IDE. --&gt;
<!-- These dependencies are excluded from the application JAR by default. --&gt;
<dependency&gt;
<groupId&gt;org.slf4j</groupId&gt;
<artifactId&gt;slf4j-log4j12</artifactId&gt;
<version&gt;1.7.7</version&gt;
<scope&gt;runtime</scope&gt;
</dependency&gt;
<dependency&gt;
<groupId&gt;log4j</groupId&gt;
<artifactId&gt;log4j</artifactId&gt;
<version&gt;1.2.17</version&gt;
<scope&gt;runtime</scope&gt;
</dependency&gt;
</dependencies&gt;

<build&gt;
<plugins&gt;

<!-- Java Compiler --&gt;
<plugin&gt;
<groupId&gt;org.apache.maven.plugins</groupId&gt;
<artifactId&gt;maven-compiler-plugin</artifactId&gt;
<version&gt;3.1</version&gt;
<configuration&gt;
<source&gt;${java.version}</source&gt;
<target&gt;${java.version}</target&gt;
</configuration&gt;
</plugin&gt;
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&gt;
<!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry point changes. --&gt;
<plugin&gt;
<groupId&gt;org.apache.maven.plugins</groupId&gt;
<artifactId&gt;maven-shade-plugin</artifactId&gt;
<version&gt;3.0.0</version&gt;
<executions&gt;
<!-- Run shade goal on package phase --&gt;
<execution&gt;
<phase&gt;package</phase&gt;
<goals&gt;
<goal&gt;shade</goal&gt;
</goals&gt;
<configuration&gt;
<artifactSet&gt;
<excludes&gt;
<exclude&gt;org.apache.flink:force-shading</exclude&gt;
<exclude&gt;com.google.code.findbugs:jsr305</exclude&gt;
<exclude&gt;org.slf4j:*</exclude&gt;
<exclude&gt;log4j:*</exclude&gt;
</excludes&gt;
</artifactSet&gt;
<filters&gt;
<filter&gt;
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. --&gt;
<artifact&gt;*:*</artifact&gt;
<excludes&gt;
<exclude&gt;META-INF/*.SF</exclude&gt;
<exclude&gt;META-INF/*.DSA</exclude&gt;
<exclude&gt;META-INF/*.RSA</exclude&gt;
</excludes&gt;
</filter&gt;
</filters&gt;
<transformers&gt;
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt;
<mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt;
</transformer&gt;
</transformers&gt;
</configuration&gt;
</execution&gt;
</executions&gt;
</plugin&gt;
</plugins&gt;


</build&gt;


请问 这个应该如何解决 谢谢

回复: Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Posted by Others <41...@qq.com>.
放到 lib 下可以成功启动了 感谢解答







------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"JingsongLee"<lzljs3620320@aliyun.com.INVALID&gt;;
发送时间:&nbsp;2020年1月15日(星期三) 下午4:06
收件人:&nbsp;"JingsongLee"<lzljs3620320@aliyun.com&gt;;"Others"<41486661@qq.com&gt;;
抄送:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp;  Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错



+user-zh


------------------------------------------------------------------
From:JingsongLee <lzljs3620320@aliyun.com&gt;
Send Time:2020年1月15日(星期三) 16:05
To:Others <41486661@qq.com&gt;
Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

是的。
另一个方法是使用[1]的classpath,添加多个jars。

BTW, 回复邮件时请带上user-zh。

Best,
Jingsong Lee

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage


------------------------------------------------------------------
From:Others <41486661@qq.com&gt;
Send Time:2020年1月15日(星期三) 15:54
To:user-zh@flink.apache.org JingsongLee <lzljs3620320@aliyun.com&gt;
Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,
我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群?


------------------ 原始邮件 ------------------
发件人: "JingsongLee"<lzljs3620320@aliyun.com.INVALID&gt;;
发送时间: 2020年1月15日(星期三) 下午3:46
收件人: "Others"<41486661@qq.com&gt;;"user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp; Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41486661@qq.com&gt;
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee <lzljs3620320@aliyun.com&gt;
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 overlapping classes: 
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.util.resource.ResourceCreator
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.ReflectionIClass$ReflectionIField
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.IClass$1
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.UnitCompiler$35
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.Java$PackageMemberEnumDeclaration
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.UnitCompiler$13$1
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.Unparser
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.CodeContext$Branch
[WARNING]&nbsp;&nbsp; - org.codehaus.janino.UnitCompiler$33$2
[WARNING]&nbsp;&nbsp; - 430 more...
[WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar define 605 overlapping classes: 
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.AvaticaParameter
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.Meta$ExecuteResult
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.ConnectStringParser
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.ConnectionConfigImpl$3
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.AvaticaDatabaseMetaData$2
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.remote.RemoteMeta$11
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.proto.Common$1
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.remote.JsonService
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.util.Spaces$SpaceString
[WARNING]&nbsp;&nbsp; - org.apache.calcite.avatica.proto.Responses$DatabasePropertyResponseOrBuilder
[WARNING]&nbsp;&nbsp; - 595 more...
[WARNING] flink-table-planner-blink_2.11-1.9.1.jar, commons-compiler-3.0.9.jar define 28 overlapping classes: 
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.package-info
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.ICookable
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.samples.ScriptDemo
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.Sandbox
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.CompileException
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.Sandbox$1
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.WarningHandler
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.CompilerFactoryFactory
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.AbstractCompilerFactory
[WARNING]&nbsp;&nbsp; - org.codehaus.commons.compiler.Cookable
[WARNING]&nbsp;&nbsp; - 18 more...
[WARNING] maven-shade-plugin has detected that some class files are
[WARNING] present in two or more JARs. When this happens, only one
[WARNING] single version of the class is copied to the uber jar.
[WARNING] Usually this is not harmful and you can skip these warnings,
[WARNING] otherwise try to manually exclude artifacts based on
[WARNING] mvn dependency:tree -Ddetail=true and the above output.
[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar with /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar
[INFO] Dependency-reduced POM written at: /Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.447 s
[INFO] Finished at: 2020-01-15T15:24:56+08:00
[INFO] Final Memory: 69M/781M
[INFO] ------------------------------------------------------------------------

Process finished with exit code 0

其中显示包含[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.

------------------ 原始邮件 ------------------
发件人: "JingsongLee"<lzljs3620320@aliyun.com.INVALID&gt;;
发送时间: 2020年1月15日(星期三) 下午3:19
收件人: "user-zh"<user-zh@flink.apache.org&gt;;
主题:&nbsp; Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41486661@qq.com&gt;
Send Time:2020年1月15日(星期三) 15:03
To:user-zh <user-zh@flink.apache.org&gt;
Subject:求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler&nbsp;&nbsp;&nbsp; - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;... 17 more


一下是pom内容


<name&amp;gt;Flink Quickstart Job</name&amp;gt;
<url&amp;gt;http://www.myorganization.org</url&amp;gt;
<profiles&amp;gt;
&nbsp;&nbsp; <profile&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <id&amp;gt;dev</id&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <activation&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <activeByDefault&amp;gt;true</activeByDefault&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </activation&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <properties&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <project.scope&amp;gt;compile</project.scope&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </properties&amp;gt;
&nbsp;&nbsp; </profile&amp;gt;
&nbsp;&nbsp; <profile&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <id&amp;gt;pro</id&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <properties&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <project.scope&amp;gt;provided</project.scope&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </properties&amp;gt;
&nbsp;&nbsp; </profile&amp;gt;
</profiles&amp;gt;
<properties&amp;gt;
&nbsp;&nbsp; <project.build.sourceEncoding&amp;gt;UTF-8</project.build.sourceEncoding&amp;gt;
&nbsp;&nbsp; <flink.version&amp;gt;1.9.1</flink.version&amp;gt;
&nbsp;&nbsp; <java.version&amp;gt;1.8</java.version&amp;gt;
&nbsp;&nbsp; <scala.binary.version&amp;gt;2.11</scala.binary.version&amp;gt;
&nbsp;&nbsp; <maven.compiler.source&amp;gt;${java.version}</maven.compiler.source&amp;gt;
&nbsp;&nbsp; <maven.compiler.target&amp;gt;${java.version}</maven.compiler.target&amp;gt;
</properties&amp;gt;

<repositories&amp;gt;
&nbsp;&nbsp; <repository&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <id&amp;gt;apache.snapshots</id&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <name&amp;gt;Apache Development Snapshot Repository</name&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <url&amp;gt;https://repository.apache.org/content/repositories/snapshots/</url&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <releases&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <enabled&amp;gt;false</enabled&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </releases&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <snapshots&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <enabled&amp;gt;true</enabled&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </snapshots&amp;gt;
&nbsp;&nbsp; </repository&amp;gt;
</repositories&amp;gt;

<dependencies&amp;gt;
&nbsp;&nbsp; <!-- Apache Flink dependencies --&amp;gt;
&nbsp;&nbsp; <!-- These dependencies are provided, because they should not be packaged into the JAR file. --&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-java</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <scope&amp;gt;${project.scope}</scope&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-streaming-java_${scala.binary.version}</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <scope&amp;gt;${project.scope}</scope&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;

&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-table-api-java-bridge_2.11</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-table-planner-blink_2.11</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;

&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-connector-kafka_2.11</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-json</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-table-common</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;

&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;flink-jdbc_2.11</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;${flink.version}</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;



&nbsp;&nbsp; <!--mysql--&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;mysql</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;mysql-connector-java</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;5.1.48</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
&nbsp;&nbsp; <!-- Gson--&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;com.google.code.gson</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;gson</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;2.8.5</version&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
&nbsp;&nbsp; <!-- Add logging framework, to produce console output when running in the IDE. --&amp;gt;
&nbsp;&nbsp; <!-- These dependencies are excluded from the application JAR by default. --&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.slf4j</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;slf4j-log4j12</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;1.7.7</version&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <scope&amp;gt;runtime</scope&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
&nbsp;&nbsp; <dependency&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;log4j</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;log4j</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;1.2.17</version&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <scope&amp;gt;runtime</scope&amp;gt;
&nbsp;&nbsp; </dependency&amp;gt;
</dependencies&amp;gt;

<build&amp;gt;
&nbsp;&nbsp; <plugins&amp;gt;

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <!-- Java Compiler --&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <plugin&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.maven.plugins</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;maven-compiler-plugin</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;3.1</version&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <configuration&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <source&amp;gt;${java.version}</source&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <target&amp;gt;${java.version}</target&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </configuration&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </plugin&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <!-- Change the value of <mainClass&amp;gt;...</mainClass&amp;gt; if your program entry point changes. --&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <plugin&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <groupId&amp;gt;org.apache.maven.plugins</groupId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactId&amp;gt;maven-shade-plugin</artifactId&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <version&amp;gt;3.0.0</version&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <executions&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <!-- Run shade goal on package phase --&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <execution&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <phase&amp;gt;package</phase&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <goals&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <goal&amp;gt;shade</goal&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </goals&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <configuration&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifactSet&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <excludes&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <exclude&amp;gt;org.apache.flink:force-shading</exclude&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <exclude&amp;gt;com.google.code.findbugs:jsr305</exclude&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <exclude&amp;gt;org.slf4j:*</exclude&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <exclude&amp;gt;log4j:*</exclude&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </excludes&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </artifactSet&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <filters&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <filter&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <!-- Do not copy the signatures in the META-INF folder.
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Otherwise, this might cause SecurityExceptions when using the JAR. --&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <artifact&amp;gt;*:*</artifact&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <excludes&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <exclude&amp;gt;META-INF/*.SF</exclude&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <exclude&amp;gt;META-INF/*.DSA</exclude&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <exclude&amp;gt;META-INF/*.RSA</exclude&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </excludes&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </filter&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </filters&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <transformers&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <mainClass&amp;gt;com.doumob.flink.BuoyDataJob</mainClass&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </transformer&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </transformers&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </configuration&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </execution&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </executions&amp;gt;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; </plugin&amp;gt;
&nbsp;&nbsp; </plugins&amp;gt;


</build&amp;gt;


请问 这个应该如何解决 谢谢

Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Posted by JingsongLee <lz...@aliyun.com.INVALID>.
+user-zh


------------------------------------------------------------------
From:JingsongLee <lz...@aliyun.com>
Send Time:2020年1月15日(星期三) 16:05
To:Others <41...@qq.com>
Subject:Re: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

是的。
另一个方法是使用[1]的classpath,添加多个jars。

BTW, 回复邮件时请带上user-zh。

Best,
Jingsong Lee

[1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage


------------------------------------------------------------------
From:Others <41...@qq.com>
Send Time:2020年1月15日(星期三) 15:54
To:user-zh@flink.apache.org JingsongLee <lz...@aliyun.com>
Subject:回复: Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,
我的集群 是Standalone 方式部署的 是加在 Flink Master机器下么 还是每一台都要加? 加完之后是否需要重启集群?


------------------ 原始邮件 ------------------
发件人: "JingsongLee"<lz...@aliyun.com.INVALID>;
发送时间: 2020年1月15日(星期三) 下午3:46
收件人: "Others"<41...@flink.apache.org>;
主题:  Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41...@qq.com>
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee <lz...@aliyun.com>
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 overlapping classes: 
[WARNING]   - org.codehaus.janino.util.resource.ResourceCreator
[WARNING]   - org.codehaus.janino.ReflectionIClass$ReflectionIField
[WARNING]   - org.codehaus.janino.IClass$1
[WARNING]   - org.codehaus.janino.UnitCompiler$35
[WARNING]   - org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration
[WARNING]   - org.codehaus.janino.Java$PackageMemberEnumDeclaration
[WARNING]   - org.codehaus.janino.UnitCompiler$13$1
[WARNING]   - org.codehaus.janino.Unparser
[WARNING]   - org.codehaus.janino.CodeContext$Branch
[WARNING]   - org.codehaus.janino.UnitCompiler$33$2
[WARNING]   - 430 more...
[WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar define 605 overlapping classes: 
[WARNING]   - org.apache.calcite.avatica.AvaticaParameter
[WARNING]   - org.apache.calcite.avatica.Meta$ExecuteResult
[WARNING]   - org.apache.calcite.avatica.ConnectStringParser
[WARNING]   - org.apache.calcite.avatica.ConnectionConfigImpl$3
[WARNING]   - org.apache.calcite.avatica.AvaticaDatabaseMetaData$2
[WARNING]   - org.apache.calcite.avatica.remote.RemoteMeta$11
[WARNING]   - org.apache.calcite.avatica.proto.Common$1
[WARNING]   - org.apache.calcite.avatica.remote.JsonService
[WARNING]   - org.apache.calcite.avatica.util.Spaces$SpaceString
[WARNING]   - org.apache.calcite.avatica.proto.Responses$DatabasePropertyResponseOrBuilder
[WARNING]   - 595 more...
[WARNING] flink-table-planner-blink_2.11-1.9.1.jar, commons-compiler-3.0.9.jar define 28 overlapping classes: 
[WARNING]   - org.codehaus.commons.compiler.package-info
[WARNING]   - org.codehaus.commons.compiler.ICookable
[WARNING]   - org.codehaus.commons.compiler.samples.ScriptDemo
[WARNING]   - org.codehaus.commons.compiler.Sandbox
[WARNING]   - org.codehaus.commons.compiler.CompileException
[WARNING]   - org.codehaus.commons.compiler.Sandbox$1
[WARNING]   - org.codehaus.commons.compiler.WarningHandler
[WARNING]   - org.codehaus.commons.compiler.CompilerFactoryFactory
[WARNING]   - org.codehaus.commons.compiler.AbstractCompilerFactory
[WARNING]   - org.codehaus.commons.compiler.Cookable
[WARNING]   - 18 more...
[WARNING] maven-shade-plugin has detected that some class files are
[WARNING] present in two or more JARs. When this happens, only one
[WARNING] single version of the class is copied to the uber jar.
[WARNING] Usually this is not harmful and you can skip these warnings,
[WARNING] otherwise try to manually exclude artifacts based on
[WARNING] mvn dependency:tree -Ddetail=true and the above output.
[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar with /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar
[INFO] Dependency-reduced POM written at: /Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.447 s
[INFO] Finished at: 2020-01-15T15:24:56+08:00
[INFO] Final Memory: 69M/781M
[INFO] ------------------------------------------------------------------------

Process finished with exit code 0

其中显示包含[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.

------------------ 原始邮件 ------------------
发件人: "JingsongLee"<lz...@aliyun.com.INVALID>;
发送时间: 2020年1月15日(星期三) 下午3:19
收件人: "user-zh"<us...@flink.apache.org>;
主题:  Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41...@qq.com>
Send Time:2020年1月15日(星期三) 15:03
To:user-zh <us...@flink.apache.org>
Subject:求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&nbsp;&nbsp;&nbsp;&nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&nbsp;&nbsp;&nbsp;&nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&nbsp;&nbsp;&nbsp;&nbsp;... 17 more


一下是pom内容


<name&gt;Flink Quickstart Job</name&gt;
<url&gt;http://www.myorganization.org</url&gt;
<profiles&gt;
   <profile&gt;
      <id&gt;dev</id&gt;
      <activation&gt;
         <activeByDefault&gt;true</activeByDefault&gt;
      </activation&gt;
      <properties&gt;
         <project.scope&gt;compile</project.scope&gt;
      </properties&gt;
   </profile&gt;
   <profile&gt;
      <id&gt;pro</id&gt;
      <properties&gt;
         <project.scope&gt;provided</project.scope&gt;
      </properties&gt;
   </profile&gt;
</profiles&gt;
<properties&gt;
   <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
   <flink.version&gt;1.9.1</flink.version&gt;
   <java.version&gt;1.8</java.version&gt;
   <scala.binary.version&gt;2.11</scala.binary.version&gt;
   <maven.compiler.source&gt;${java.version}</maven.compiler.source&gt;
   <maven.compiler.target&gt;${java.version}</maven.compiler.target&gt;
</properties&gt;

<repositories&gt;
   <repository&gt;
      <id&gt;apache.snapshots</id&gt;
      <name&gt;Apache Development Snapshot Repository</name&gt;
      <url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt;
      <releases&gt;
         <enabled&gt;false</enabled&gt;
      </releases&gt;
      <snapshots&gt;
         <enabled&gt;true</enabled&gt;
      </snapshots&gt;
   </repository&gt;
</repositories&gt;

<dependencies&gt;
   <!-- Apache Flink dependencies --&gt;
   <!-- These dependencies are provided, because they should not be packaged into the JAR file. --&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-java</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-json</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-common</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-jdbc_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;



   <!--mysql--&gt;
   <dependency&gt;
      <groupId&gt;mysql</groupId&gt;
      <artifactId&gt;mysql-connector-java</artifactId&gt;
      <version&gt;5.1.48</version&gt;
   </dependency&gt;
   <!-- Gson--&gt;
   <dependency&gt;
      <groupId&gt;com.google.code.gson</groupId&gt;
      <artifactId&gt;gson</artifactId&gt;
      <version&gt;2.8.5</version&gt;
   </dependency&gt;
   <!-- Add logging framework, to produce console output when running in the IDE. --&gt;
   <!-- These dependencies are excluded from the application JAR by default. --&gt;
   <dependency&gt;
      <groupId&gt;org.slf4j</groupId&gt;
      <artifactId&gt;slf4j-log4j12</artifactId&gt;
      <version&gt;1.7.7</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;log4j</groupId&gt;
      <artifactId&gt;log4j</artifactId&gt;
      <version&gt;1.2.17</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
</dependencies&gt;

<build&gt;
   <plugins&gt;

      <!-- Java Compiler --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-compiler-plugin</artifactId&gt;
         <version&gt;3.1</version&gt;
         <configuration&gt;
            <source&gt;${java.version}</source&gt;
            <target&gt;${java.version}</target&gt;
         </configuration&gt;
      </plugin&gt;
      <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&gt;
      <!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry point changes. --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-shade-plugin</artifactId&gt;
         <version&gt;3.0.0</version&gt;
         <executions&gt;
            <!-- Run shade goal on package phase --&gt;
            <execution&gt;
               <phase&gt;package</phase&gt;
               <goals&gt;
                  <goal&gt;shade</goal&gt;
               </goals&gt;
               <configuration&gt;
                  <artifactSet&gt;
                     <excludes&gt;
                        <exclude&gt;org.apache.flink:force-shading</exclude&gt;
                        <exclude&gt;com.google.code.findbugs:jsr305</exclude&gt;
                        <exclude&gt;org.slf4j:*</exclude&gt;
                        <exclude&gt;log4j:*</exclude&gt;
                     </excludes&gt;
                  </artifactSet&gt;
                  <filters&gt;
                     <filter&gt;
                        <!-- Do not copy the signatures in the META-INF folder.
                        Otherwise, this might cause SecurityExceptions when using the JAR. --&gt;
                        <artifact&gt;*:*</artifact&gt;
                        <excludes&gt;
                           <exclude&gt;META-INF/*.SF</exclude&gt;
                           <exclude&gt;META-INF/*.DSA</exclude&gt;
                           <exclude&gt;META-INF/*.RSA</exclude&gt;
                        </excludes&gt;
                     </filter&gt;
                  </filters&gt;
                  <transformers&gt;
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt;
                        <mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt;
                     </transformer&gt;
                  </transformers&gt;
               </configuration&gt;
            </execution&gt;
         </executions&gt;
      </plugin&gt;
   </plugins&gt;


</build&gt;


请问 这个应该如何解决 谢谢

Re: Re: 求助帖:flink 连接kafka source 部署集群报错

Posted by JingsongLee <lz...@aliyun.com.INVALID>.
Hi,

我怀疑的你这样打包会导致meta-inf.services的文件相互覆盖。
你试试把flink-json和flink-kafka的jar直接放入flink/lib下

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41...@qq.com>
Send Time:2020年1月15日(星期三) 15:27
To:user-zh@flink.apache.org JingsongLee <lz...@aliyun.com>
Subject:回复: Re: 求助帖:flink 连接kafka source 部署集群报错


集群环境下应该放在哪个lib下?

一下是打包过程的log
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flinkjob ---
[INFO] Including org.apache.flink:flink-core:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-annotations:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-metrics-core:jar:1.9.1 in the shaded jar.
[INFO] Including com.esotericsoftware.kryo:kryo:jar:2.24.0 in the shaded jar.
[INFO] Including com.esotericsoftware.minlog:minlog:jar:1.2 in the shaded jar.
[INFO] Including org.objenesis:objenesis:jar:2.1 in the shaded jar.
[INFO] Including commons-collections:commons-collections:jar:3.2.2 in the shaded jar.
[INFO] Including org.apache.commons:commons-compress:jar:1.18 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-asm-6:jar:6.2.1-7.0 in the shaded jar.
[INFO] Including org.apache.commons:commons-lang3:jar:3.3.2 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding com.google.code.findbugs:jsr305:jar:1.3.9 from the shaded jar.
[INFO] Excluding org.apache.flink:force-shading:jar:1.9.1 from the shaded jar.
[INFO] Including org.javassist:javassist:jar:3.19.0-GA in the shaded jar.
[INFO] Including org.scala-lang:scala-library:jar:2.11.12 in the shaded jar.
[INFO] Including org.xerial.snappy:snappy-java:jar:1.1.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-shaded-guava:jar:18.0-7.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java-bridge_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-java:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-planner-blink_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.scala-lang:scala-reflect:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang:scala-compiler:jar:2.11.12 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-xml_2.11:jar:1.0.5 in the shaded jar.
[INFO] Including org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-api-scala-bridge_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-runtime-blink_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.codehaus.janino:janino:jar:3.0.9 in the shaded jar.
[INFO] Including org.codehaus.janino:commons-compiler:jar:3.0.9 in the shaded jar.
[INFO] Including org.apache.calcite.avatica:avatica-core:jar:1.15.0 in the shaded jar.
[INFO] Including org.reflections:reflections:jar:0.9.10 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-connector-kafka-base_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.kafka:kafka-clients:jar:2.2.0 in the shaded jar.
[INFO] Including com.github.luben:zstd-jni:jar:1.3.8-1 in the shaded jar.
[INFO] Including org.lz4:lz4-java:jar:1.5.0 in the shaded jar.
[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-table-common:jar:1.9.1 in the shaded jar.
[INFO] Including org.apache.flink:flink-jdbc_2.11:jar:1.9.1 in the shaded jar.
[INFO] Including mysql:mysql-connector-java:jar:5.1.48 in the shaded jar.
[INFO] Including com.google.code.gson:gson:jar:2.8.5 in the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[WARNING] janino-3.0.9.jar, flink-table-planner-blink_2.11-1.9.1.jar define 440 overlapping classes: 
[WARNING]   - org.codehaus.janino.util.resource.ResourceCreator
[WARNING]   - org.codehaus.janino.ReflectionIClass$ReflectionIField
[WARNING]   - org.codehaus.janino.IClass$1
[WARNING]   - org.codehaus.janino.UnitCompiler$35
[WARNING]   - org.codehaus.janino.Java$CompilationUnit$SingleStaticImportDeclaration
[WARNING]   - org.codehaus.janino.Java$PackageMemberEnumDeclaration
[WARNING]   - org.codehaus.janino.UnitCompiler$13$1
[WARNING]   - org.codehaus.janino.Unparser
[WARNING]   - org.codehaus.janino.CodeContext$Branch
[WARNING]   - org.codehaus.janino.UnitCompiler$33$2
[WARNING]   - 430 more...
[WARNING] avatica-core-1.15.0.jar, flink-table-planner-blink_2.11-1.9.1.jar define 605 overlapping classes: 
[WARNING]   - org.apache.calcite.avatica.AvaticaParameter
[WARNING]   - org.apache.calcite.avatica.Meta$ExecuteResult
[WARNING]   - org.apache.calcite.avatica.ConnectStringParser
[WARNING]   - org.apache.calcite.avatica.ConnectionConfigImpl$3
[WARNING]   - org.apache.calcite.avatica.AvaticaDatabaseMetaData$2
[WARNING]   - org.apache.calcite.avatica.remote.RemoteMeta$11
[WARNING]   - org.apache.calcite.avatica.proto.Common$1
[WARNING]   - org.apache.calcite.avatica.remote.JsonService
[WARNING]   - org.apache.calcite.avatica.util.Spaces$SpaceString
[WARNING]   - org.apache.calcite.avatica.proto.Responses$DatabasePropertyResponseOrBuilder
[WARNING]   - 595 more...
[WARNING] flink-table-planner-blink_2.11-1.9.1.jar, commons-compiler-3.0.9.jar define 28 overlapping classes: 
[WARNING]   - org.codehaus.commons.compiler.package-info
[WARNING]   - org.codehaus.commons.compiler.ICookable
[WARNING]   - org.codehaus.commons.compiler.samples.ScriptDemo
[WARNING]   - org.codehaus.commons.compiler.Sandbox
[WARNING]   - org.codehaus.commons.compiler.CompileException
[WARNING]   - org.codehaus.commons.compiler.Sandbox$1
[WARNING]   - org.codehaus.commons.compiler.WarningHandler
[WARNING]   - org.codehaus.commons.compiler.CompilerFactoryFactory
[WARNING]   - org.codehaus.commons.compiler.AbstractCompilerFactory
[WARNING]   - org.codehaus.commons.compiler.Cookable
[WARNING]   - 18 more...
[WARNING] maven-shade-plugin has detected that some class files are
[WARNING] present in two or more JARs. When this happens, only one
[WARNING] single version of the class is copied to the uber jar.
[WARNING] Usually this is not harmful and you can skip these warnings,
[WARNING] otherwise try to manually exclude artifacts based on
[WARNING] mvn dependency:tree -Ddetail=true and the above output.
[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar with /Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar
[INFO] Dependency-reduced POM written at: /Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.447 s
[INFO] Finished at: 2020-01-15T15:24:56+08:00
[INFO] Final Memory: 69M/781M
[INFO] ------------------------------------------------------------------------

Process finished with exit code 0

其中显示包含[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the shaded jar.

------------------ 原始邮件 ------------------
发件人: "JingsongLee"<lz...@aliyun.com.INVALID>;
发送时间: 2020年1月15日(星期三) 下午3:19
收件人: "user-zh"<us...@flink.apache.org>;
主题:  Re: 求助帖:flink 连接kafka source 部署集群报错

Hi,

你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41...@qq.com>
Send Time:2020年1月15日(星期三) 15:03
To:user-zh <us...@flink.apache.org>
Subject:求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&nbsp;&nbsp;&nbsp;&nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&nbsp;&nbsp;&nbsp;&nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&nbsp;&nbsp;&nbsp;&nbsp;... 17 more


一下是pom内容


<name&gt;Flink Quickstart Job</name&gt;
<url&gt;http://www.myorganization.org</url&gt;
<profiles&gt;
   <profile&gt;
      <id&gt;dev</id&gt;
      <activation&gt;
         <activeByDefault&gt;true</activeByDefault&gt;
      </activation&gt;
      <properties&gt;
         <project.scope&gt;compile</project.scope&gt;
      </properties&gt;
   </profile&gt;
   <profile&gt;
      <id&gt;pro</id&gt;
      <properties&gt;
         <project.scope&gt;provided</project.scope&gt;
      </properties&gt;
   </profile&gt;
</profiles&gt;
<properties&gt;
   <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
   <flink.version&gt;1.9.1</flink.version&gt;
   <java.version&gt;1.8</java.version&gt;
   <scala.binary.version&gt;2.11</scala.binary.version&gt;
   <maven.compiler.source&gt;${java.version}</maven.compiler.source&gt;
   <maven.compiler.target&gt;${java.version}</maven.compiler.target&gt;
</properties&gt;

<repositories&gt;
   <repository&gt;
      <id&gt;apache.snapshots</id&gt;
      <name&gt;Apache Development Snapshot Repository</name&gt;
      <url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt;
      <releases&gt;
         <enabled&gt;false</enabled&gt;
      </releases&gt;
      <snapshots&gt;
         <enabled&gt;true</enabled&gt;
      </snapshots&gt;
   </repository&gt;
</repositories&gt;

<dependencies&gt;
   <!-- Apache Flink dependencies --&gt;
   <!-- These dependencies are provided, because they should not be packaged into the JAR file. --&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-java</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-json</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-common</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-jdbc_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;



   <!--mysql--&gt;
   <dependency&gt;
      <groupId&gt;mysql</groupId&gt;
      <artifactId&gt;mysql-connector-java</artifactId&gt;
      <version&gt;5.1.48</version&gt;
   </dependency&gt;
   <!-- Gson--&gt;
   <dependency&gt;
      <groupId&gt;com.google.code.gson</groupId&gt;
      <artifactId&gt;gson</artifactId&gt;
      <version&gt;2.8.5</version&gt;
   </dependency&gt;
   <!-- Add logging framework, to produce console output when running in the IDE. --&gt;
   <!-- These dependencies are excluded from the application JAR by default. --&gt;
   <dependency&gt;
      <groupId&gt;org.slf4j</groupId&gt;
      <artifactId&gt;slf4j-log4j12</artifactId&gt;
      <version&gt;1.7.7</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;log4j</groupId&gt;
      <artifactId&gt;log4j</artifactId&gt;
      <version&gt;1.2.17</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
</dependencies&gt;

<build&gt;
   <plugins&gt;

      <!-- Java Compiler --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-compiler-plugin</artifactId&gt;
         <version&gt;3.1</version&gt;
         <configuration&gt;
            <source&gt;${java.version}</source&gt;
            <target&gt;${java.version}</target&gt;
         </configuration&gt;
      </plugin&gt;
      <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&gt;
      <!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry point changes. --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-shade-plugin</artifactId&gt;
         <version&gt;3.0.0</version&gt;
         <executions&gt;
            <!-- Run shade goal on package phase --&gt;
            <execution&gt;
               <phase&gt;package</phase&gt;
               <goals&gt;
                  <goal&gt;shade</goal&gt;
               </goals&gt;
               <configuration&gt;
                  <artifactSet&gt;
                     <excludes&gt;
                        <exclude&gt;org.apache.flink:force-shading</exclude&gt;
                        <exclude&gt;com.google.code.findbugs:jsr305</exclude&gt;
                        <exclude&gt;org.slf4j:*</exclude&gt;
                        <exclude&gt;log4j:*</exclude&gt;
                     </excludes&gt;
                  </artifactSet&gt;
                  <filters&gt;
                     <filter&gt;
                        <!-- Do not copy the signatures in the META-INF folder.
                        Otherwise, this might cause SecurityExceptions when using the JAR. --&gt;
                        <artifact&gt;*:*</artifact&gt;
                        <excludes&gt;
                           <exclude&gt;META-INF/*.SF</exclude&gt;
                           <exclude&gt;META-INF/*.DSA</exclude&gt;
                           <exclude&gt;META-INF/*.RSA</exclude&gt;
                        </excludes&gt;
                     </filter&gt;
                  </filters&gt;
                  <transformers&gt;
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt;
                        <mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt;
                     </transformer&gt;
                  </transformers&gt;
               </configuration&gt;
            </execution&gt;
         </executions&gt;
      </plugin&gt;
   </plugins&gt;


</build&gt;


请问 这个应该如何解决 谢谢

Re: 求助帖:flink 连接kafka source 部署集群报错

Posted by JingsongLee <lz...@aliyun.com.INVALID>.
Hi,

你是不是没有把Json的jar包放入lib下?看起来你的User jar也没用jar-with-dependencies,所以也不会包含json的jar。

Best,
Jingsong Lee


------------------------------------------------------------------
From:Others <41...@qq.com>
Send Time:2020年1月15日(星期三) 15:03
To:user-zh <us...@flink.apache.org>
Subject:求助帖:flink 连接kafka source 部署集群报错

我使用的flink 版本 是1.9.1
本地调试正常。部署集群启动时报一下错误
2020-01-15 11:57:44,255 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
&nbsp;&nbsp;&nbsp;&nbsp;at com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:498)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
&nbsp;&nbsp;&nbsp;&nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
&nbsp;&nbsp;&nbsp;&nbsp;at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:65)
&nbsp;&nbsp;&nbsp;&nbsp;... 17 more


一下是pom内容


<name&gt;Flink Quickstart Job</name&gt;
<url&gt;http://www.myorganization.org</url&gt;
<profiles&gt;
   <profile&gt;
      <id&gt;dev</id&gt;
      <activation&gt;
         <activeByDefault&gt;true</activeByDefault&gt;
      </activation&gt;
      <properties&gt;
         <project.scope&gt;compile</project.scope&gt;
      </properties&gt;
   </profile&gt;
   <profile&gt;
      <id&gt;pro</id&gt;
      <properties&gt;
         <project.scope&gt;provided</project.scope&gt;
      </properties&gt;
   </profile&gt;
</profiles&gt;
<properties&gt;
   <project.build.sourceEncoding&gt;UTF-8</project.build.sourceEncoding&gt;
   <flink.version&gt;1.9.1</flink.version&gt;
   <java.version&gt;1.8</java.version&gt;
   <scala.binary.version&gt;2.11</scala.binary.version&gt;
   <maven.compiler.source&gt;${java.version}</maven.compiler.source&gt;
   <maven.compiler.target&gt;${java.version}</maven.compiler.target&gt;
</properties&gt;

<repositories&gt;
   <repository&gt;
      <id&gt;apache.snapshots</id&gt;
      <name&gt;Apache Development Snapshot Repository</name&gt;
      <url&gt;https://repository.apache.org/content/repositories/snapshots/</url&gt;
      <releases&gt;
         <enabled&gt;false</enabled&gt;
      </releases&gt;
      <snapshots&gt;
         <enabled&gt;true</enabled&gt;
      </snapshots&gt;
   </repository&gt;
</repositories&gt;

<dependencies&gt;
   <!-- Apache Flink dependencies --&gt;
   <!-- These dependencies are provided, because they should not be packaged into the JAR file. --&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-java</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-streaming-java_${scala.binary.version}</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
      <scope&gt;${project.scope}</scope&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-api-java-bridge_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-planner-blink_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-connector-kafka_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-json</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-table-common</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;

   <dependency&gt;
      <groupId&gt;org.apache.flink</groupId&gt;
      <artifactId&gt;flink-jdbc_2.11</artifactId&gt;
      <version&gt;${flink.version}</version&gt;
   </dependency&gt;



   <!--mysql--&gt;
   <dependency&gt;
      <groupId&gt;mysql</groupId&gt;
      <artifactId&gt;mysql-connector-java</artifactId&gt;
      <version&gt;5.1.48</version&gt;
   </dependency&gt;
   <!-- Gson--&gt;
   <dependency&gt;
      <groupId&gt;com.google.code.gson</groupId&gt;
      <artifactId&gt;gson</artifactId&gt;
      <version&gt;2.8.5</version&gt;
   </dependency&gt;
   <!-- Add logging framework, to produce console output when running in the IDE. --&gt;
   <!-- These dependencies are excluded from the application JAR by default. --&gt;
   <dependency&gt;
      <groupId&gt;org.slf4j</groupId&gt;
      <artifactId&gt;slf4j-log4j12</artifactId&gt;
      <version&gt;1.7.7</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
   <dependency&gt;
      <groupId&gt;log4j</groupId&gt;
      <artifactId&gt;log4j</artifactId&gt;
      <version&gt;1.2.17</version&gt;
      <scope&gt;runtime</scope&gt;
   </dependency&gt;
</dependencies&gt;

<build&gt;
   <plugins&gt;

      <!-- Java Compiler --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-compiler-plugin</artifactId&gt;
         <version&gt;3.1</version&gt;
         <configuration&gt;
            <source&gt;${java.version}</source&gt;
            <target&gt;${java.version}</target&gt;
         </configuration&gt;
      </plugin&gt;
      <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --&gt;
      <!-- Change the value of <mainClass&gt;...</mainClass&gt; if your program entry point changes. --&gt;
      <plugin&gt;
         <groupId&gt;org.apache.maven.plugins</groupId&gt;
         <artifactId&gt;maven-shade-plugin</artifactId&gt;
         <version&gt;3.0.0</version&gt;
         <executions&gt;
            <!-- Run shade goal on package phase --&gt;
            <execution&gt;
               <phase&gt;package</phase&gt;
               <goals&gt;
                  <goal&gt;shade</goal&gt;
               </goals&gt;
               <configuration&gt;
                  <artifactSet&gt;
                     <excludes&gt;
                        <exclude&gt;org.apache.flink:force-shading</exclude&gt;
                        <exclude&gt;com.google.code.findbugs:jsr305</exclude&gt;
                        <exclude&gt;org.slf4j:*</exclude&gt;
                        <exclude&gt;log4j:*</exclude&gt;
                     </excludes&gt;
                  </artifactSet&gt;
                  <filters&gt;
                     <filter&gt;
                        <!-- Do not copy the signatures in the META-INF folder.
                        Otherwise, this might cause SecurityExceptions when using the JAR. --&gt;
                        <artifact&gt;*:*</artifact&gt;
                        <excludes&gt;
                           <exclude&gt;META-INF/*.SF</exclude&gt;
                           <exclude&gt;META-INF/*.DSA</exclude&gt;
                           <exclude&gt;META-INF/*.RSA</exclude&gt;
                        </excludes&gt;
                     </filter&gt;
                  </filters&gt;
                  <transformers&gt;
                     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"&gt;
                        <mainClass&gt;com.doumob.flink.BuoyDataJob</mainClass&gt;
                     </transformer&gt;
                  </transformers&gt;
               </configuration&gt;
            </execution&gt;
         </executions&gt;
      </plugin&gt;
   </plugins&gt;


</build&gt;


请问 这个应该如何解决 谢谢