You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by 基勇 <25...@qq.com> on 2017/06/12 09:28:44 UTC

Input/Output data to kafka exception

Hi,    I used beam API to write code to read Kafka data and run with Flink, but run to throw the following exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
ClassLoader info: URL ClassLoader:
    file: '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' (valid JAR)
Class not resolvable through given classloader.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
	at java.lang.Thread.run(Thread.java:745)



code:
 
KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
 
		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
 
		options.setStreaming(true);
 
		options.setCheckpointingInterval(1000L);
 
		options.setNumberOfExecutionRetries(5);
 
		options.setExecutionRetryDelay(3000L);
 
		options.setRunner(FlinkRunner.class);
 


 
		Pipeline pipeline = Pipeline.create(options);
 


 
		pipeline.apply(KafkaIO.<String, String>read()
 
			       .withBootstrapServers("localhost:9092")
 
			       .withTopic(KAFKA_TOPIC)  // use withTopics(List<String>) to read from multiple topics.
 
			       .withKeyDeserializer(StringDeserializer.class)
 
			       .withValueDeserializer(StringDeserializer.class)
 
			       .withoutMetadata() // PCollection<KV<String, String>>
 
		).apply(Values.<String> create());
 
//				.apply(KafkaIO.<Void, String>write()
 
//				        .withBootstrapServers("localhost:9092")
 
//				        .withTopic(KAFKA_OUTPUT_TOPIC)
 
//				        .withValueSerializer(StringSerializer.class)
 
//				        .values());
 
			       
 
		pipeline.run();//.waitUntilFinish();




How to fix it ?




Thank you!

Re: Input/Output data to kafka exception

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Did you also specify the “-Pflink-runner” flag while building? From your mail it seems as you’re just building with “mvn clean install -DskipTests”. You need to also specify the flag when building so that the Flink dependencies are bundled into the created fat jar.

Best,
Aljoscha
> On 13. Jun 2017, at 10:25, 基勇 <25...@qq.com> wrote:
> 
> Beam 2.0.0 and flink 1.2.1
> 
> 
> ------------------ 原始邮件 ------------------
> 发件人: "Aljoscha Krettek";<al...@apache.org>;
> 发送时间: 2017年6月12日(星期一) 下午5:42
> 收件人: "user"<us...@beam.apache.org>;
> 主题: Re: Input/Output data to kafka exception
> 
> Hi,
> 
> How are you bundling your program for execution? Are you, for example, building a fat-jar using Maven? How are you executing the program? Using bin/flink or by executing the program using mvn exec? Also, which Beam/Flink versions are you using?
> 
> Best,
> Aljoscha
>> On 12. Jun 2017, at 11:28, 基勇 <252637867@qq.com <ma...@qq.com>> wrote:
>> 
>> Hi,
>>     I used beam API to write code to read Kafka data and run with Flink, but run to throw the following exception:
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
>> ClassLoader info: URL ClassLoader:
>>     file: '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' (valid JAR)
>> Class not resolvable through given classloader.
>> 	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
>> 	at java.lang.Thread.run(Thread.java:745)
>> 
>> code:
>> KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
>> 		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
>> 		options.setStreaming(true);
>> 		options.setCheckpointingInterval(1000L);
>> 		options.setNumberOfExecutionRetries(5);
>> 		options.setExecutionRetryDelay(3000L);
>> 		options.setRunner(FlinkRunner.class);
>> 
>> 		Pipeline pipeline = Pipeline.create(options);
>> 
>> 		pipeline.apply(KafkaIO.<String, String>read()
>> 			       .withBootstrapServers("localhost:9092")
>> 			       .withTopic(KAFKA_TOPIC)  // use withTopics(List<String>) to read from multiple topics.
>> 			       .withKeyDeserializer(StringDeserializer.class)
>> 			       .withValueDeserializer(StringDeserializer.class)
>> 			       .withoutMetadata() // PCollection<KV<String, String>>
>> 		).apply(Values.<String> create());
>> //				.apply(KafkaIO.<Void, String>write()
>> //				        .withBootstrapServers("localhost:9092")
>> //				        .withTopic(KAFKA_OUTPUT_TOPIC)
>> //				        .withValueSerializer(StringSerializer.class)
>> //				        .values());
>> 			       
>> 		pipeline.run();//.waitUntilFinish();
>> 
>> How to fix it ?
>> 
>> Thank you!
> 


回复: Input/Output data to kafka exception

Posted by 基勇 <25...@qq.com>.
Beam 2.0.0 and flink 1.2.1




------------------ 原始邮件 ------------------
发件人: "Aljoscha Krettek";<al...@apache.org>;
发送时间: 2017年6月12日(星期一) 下午5:42
收件人: "user"<us...@beam.apache.org>; 

主题: Re: Input/Output data to kafka exception



Hi,

How are you bundling your program for execution? Are you, for example, building a fat-jar using Maven? How are you executing the program? Using bin/flink or by executing the program using mvn exec? Also, which Beam/Flink versions are you using?


Best,
Aljoscha
On 12. Jun 2017, at 11:28, 基勇 <25...@qq.com> wrote:

Hi,    I used beam API to write code to read Kafka data and run with Flink, but run to throw the following exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
ClassLoader info: URL ClassLoader:
    file: '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' (valid JAR)
Class not resolvable through given classloader.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
	at java.lang.Thread.run(Thread.java:745)



code:
KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
		options.setStreaming(true);
		options.setCheckpointingInterval(1000L);
		options.setNumberOfExecutionRetries(5);
		options.setExecutionRetryDelay(3000L);
		options.setRunner(FlinkRunner.class);


		Pipeline pipeline = Pipeline.create(options);


		pipeline.apply(KafkaIO.<String, String>read()
			       .withBootstrapServers("localhost:9092")
			       .withTopic(KAFKA_TOPIC)  // use withTopics(List<String>) to read from multiple topics.
			       .withKeyDeserializer(StringDeserializer.class)
			       .withValueDeserializer(StringDeserializer.class)
			       .withoutMetadata() // PCollection<KV<String, String>>
		).apply(Values.<String> create());
//				.apply(KafkaIO.<Void, String>write()
//				        .withBootstrapServers("localhost:9092")
//				        .withTopic(KAFKA_OUTPUT_TOPIC)
//				        .withValueSerializer(StringSerializer.class)
//				        .values());

			       

		pipeline.run();//.waitUntilFinish();


How to fix it ?


Thank you!

回复: Input/Output data to kafka exception

Posted by 基勇 <25...@qq.com>.
@Aljoscha Krettek
The execution command is :
mvn exec:java -Dexec.mainClass=com.exmind.beam.App \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --flinkMaster=localhost:6123 \
      --filesToStage=target/com.exmind.beam-0.0.1-SNAPSHOT.jar"





------------------ 原始邮件 ------------------
发件人: "真心の傻子楓";<25...@qq.com>;
发送时间: 2017年6月12日(星期一) 晚上11:24
收件人: "user"<us...@beam.apache.org>; 

主题: 回复: Input/Output data to kafka exception



Yes! 
 I build the jar file by command "mvn clean install -DskipTests", and then execute it through the "mvn exec:java " command!




------------------ 原始邮件 ------------------
发件人: "Aljoscha Krettek";<al...@apache.org>;
发送时间: 2017年6月12日(星期一) 下午5:42
收件人: "user"<us...@beam.apache.org>; 

主题: Re: Input/Output data to kafka exception



Hi,

How are you bundling your program for execution? Are you, for example, building a fat-jar using Maven? How are you executing the program? Using bin/flink or by executing the program using mvn exec? Also, which Beam/Flink versions are you using?


Best,
Aljoscha
On 12. Jun 2017, at 11:28, 基勇 <25...@qq.com> wrote:

Hi,    I used beam API to write code to read Kafka data and run with Flink, but run to throw the following exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
ClassLoader info: URL ClassLoader:
    file: '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' (valid JAR)
Class not resolvable through given classloader.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
	at java.lang.Thread.run(Thread.java:745)



code:
KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
		options.setStreaming(true);
		options.setCheckpointingInterval(1000L);
		options.setNumberOfExecutionRetries(5);
		options.setExecutionRetryDelay(3000L);
		options.setRunner(FlinkRunner.class);


		Pipeline pipeline = Pipeline.create(options);


		pipeline.apply(KafkaIO.<String, String>read()
			       .withBootstrapServers("localhost:9092")
			       .withTopic(KAFKA_TOPIC)  // use withTopics(List<String>) to read from multiple topics.
			       .withKeyDeserializer(StringDeserializer.class)
			       .withValueDeserializer(StringDeserializer.class)
			       .withoutMetadata() // PCollection<KV<String, String>>
		).apply(Values.<String> create());
//				.apply(KafkaIO.<Void, String>write()
//				        .withBootstrapServers("localhost:9092")
//				        .withTopic(KAFKA_OUTPUT_TOPIC)
//				        .withValueSerializer(StringSerializer.class)
//				        .values());

			       

		pipeline.run();//.waitUntilFinish();


How to fix it ?


Thank you!

回复: Input/Output data to kafka exception

Posted by 基勇 <25...@qq.com>.
Yes! 
 I build the jar file by command "mvn clean install -DskipTests", and then execute it through the "mvn exec:java " command!




------------------ 原始邮件 ------------------
发件人: "Aljoscha Krettek";<al...@apache.org>;
发送时间: 2017年6月12日(星期一) 下午5:42
收件人: "user"<us...@beam.apache.org>; 

主题: Re: Input/Output data to kafka exception



Hi,

How are you bundling your program for execution? Are you, for example, building a fat-jar using Maven? How are you executing the program? Using bin/flink or by executing the program using mvn exec? Also, which Beam/Flink versions are you using?


Best,
Aljoscha
On 12. Jun 2017, at 11:28, 基勇 <25...@qq.com> wrote:

Hi,    I used beam API to write code to read Kafka data and run with Flink, but run to throw the following exception:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
ClassLoader info: URL ClassLoader:
    file: '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' (valid JAR)
Class not resolvable through given classloader.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
	at java.lang.Thread.run(Thread.java:745)



code:
KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
		options.setStreaming(true);
		options.setCheckpointingInterval(1000L);
		options.setNumberOfExecutionRetries(5);
		options.setExecutionRetryDelay(3000L);
		options.setRunner(FlinkRunner.class);


		Pipeline pipeline = Pipeline.create(options);


		pipeline.apply(KafkaIO.<String, String>read()
			       .withBootstrapServers("localhost:9092")
			       .withTopic(KAFKA_TOPIC)  // use withTopics(List<String>) to read from multiple topics.
			       .withKeyDeserializer(StringDeserializer.class)
			       .withValueDeserializer(StringDeserializer.class)
			       .withoutMetadata() // PCollection<KV<String, String>>
		).apply(Values.<String> create());
//				.apply(KafkaIO.<Void, String>write()
//				        .withBootstrapServers("localhost:9092")
//				        .withTopic(KAFKA_OUTPUT_TOPIC)
//				        .withValueSerializer(StringSerializer.class)
//				        .values());

			       

		pipeline.run();//.waitUntilFinish();


How to fix it ?


Thank you!

Re: Input/Output data to kafka exception

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

How are you bundling your program for execution? Are you, for example, building a fat-jar using Maven? How are you executing the program? Using bin/flink or by executing the program using mvn exec? Also, which Beam/Flink versions are you using?

Best,
Aljoscha
> On 12. Jun 2017, at 11:28, 基勇 <25...@qq.com> wrote:
> 
> Hi,
>     I used beam API to write code to read Kafka data and run with Flink, but run to throw the following exception:
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper
> ClassLoader info: URL ClassLoader:
>     file: '/var/folders/vq/887txtm92w508tj7gknjpflc0000gp/T/blobStore-78ed2c9c-03e5-438b-b2ec-514ac394eb11/cache/blob_7123d3372a6822ba044c5ea153ee6801cd5377ff' (valid JAR)
> Class not resolvable through given classloader.
> 	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:210)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:81)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> code:
> KafkaOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaOptions.class);
> 		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");
> 		options.setStreaming(true);
> 		options.setCheckpointingInterval(1000L);
> 		options.setNumberOfExecutionRetries(5);
> 		options.setExecutionRetryDelay(3000L);
> 		options.setRunner(FlinkRunner.class);
> 
> 		Pipeline pipeline = Pipeline.create(options);
> 
> 		pipeline.apply(KafkaIO.<String, String>read()
> 			       .withBootstrapServers("localhost:9092")
> 			       .withTopic(KAFKA_TOPIC)  // use withTopics(List<String>) to read from multiple topics.
> 			       .withKeyDeserializer(StringDeserializer.class)
> 			       .withValueDeserializer(StringDeserializer.class)
> 			       .withoutMetadata() // PCollection<KV<String, String>>
> 		).apply(Values.<String> create());
> //				.apply(KafkaIO.<Void, String>write()
> //				        .withBootstrapServers("localhost:9092")
> //				        .withTopic(KAFKA_OUTPUT_TOPIC)
> //				        .withValueSerializer(StringSerializer.class)
> //				        .values());
> 			       
> 		pipeline.run();//.waitUntilFinish();
> 
> How to fix it ?
> 
> Thank you!