You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Fábio Dias (JIRA)" <ji...@apache.org> on 2017/02/09 11:38:41 UTC

[jira] [Created] (FLINK-5755) Flink with Kafka connection

Fábio Dias created FLINK-5755:
---------------------------------

             Summary: Flink with Kafka connection
                 Key: FLINK-5755
                 URL: https://issues.apache.org/jira/browse/FLINK-5755
             Project: Flink
          Issue Type: Bug
          Components: DataStream API, Kafka Connector
    Affects Versions: 1.1.3
         Environment: Ubuntu 16.04.1 LTS
Flink 1.1.3
Kakfa 0.10.1.1
            Reporter: Fábio Dias


I'm trying to connect flink with kafka (Flink 1.1.3 Kakfa 0.10.1.1)

I already try all the fixes that i could find, but none of them work.

pom.xml :

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	
	<groupId>ux</groupId>
	<artifactId>logs</artifactId>
	<version>1.3-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>Flink Quickstart Job</name>
	<url>http://www.myorganization.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>${project.version}</flink.version>
		<slf4j.version>1.7.7</slf4j.version>
		<log4j.version>1.2.17</log4j.version>
	</properties>

	<repositories>
		<repository>
			<id>apache.snapshots</id>
			<name>Apache Development Snapshot Repository</name>
			<url>https://repository.apache.org/content/repositories/snapshots/</url>
			<releases>
				<enabled>false</enabled>
			</releases>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories>
	<dependencies>
<dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
		<dependency>
		  <groupId>org.apache.flink</groupId>
		  <artifactId>flink-streaming-java_2.10</artifactId>
		  <version>${project.version}</version>
		</dependency>
		<dependency>
		  <groupId>org.apache.flink</groupId>
		  <artifactId>flink-java</artifactId>
		  <version>${project.version}</version>
		</dependency>
		<dependency>
		  <groupId>org.apache.flink</groupId>
		  <artifactId>flink-clients_2.10</artifactId>
		  <version>${project.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka-0.10_2.10</artifactId>
			<version>1.3-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>${slf4j.version}</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>${log4j.version}</version>
		</dependency>
		
	</dependencies>

	<profiles>
		<profile>
			<id>build-jar</id>
			<activation>
				<activeByDefault>false</activeByDefault>
			</activation>
			<dependencies>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-java</artifactId>
					<version>${project.version}</version>
					<scope>provided</scope>
				</dependency>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-streaming-java_2.10</artifactId>
					<version>${project.version}</version>
					<scope>provided</scope>
				</dependency>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-clients_2.10</artifactId>
					<version>1.3-SNAPSHOT</version>
					<scope>provided</scope>
				</dependency>
				<dependency>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
					<version>${slf4j.version}</version>
					<scope>provided</scope>
				</dependency>
				<dependency>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
					<version>${log4j.version}</version>
					<scope>provided</scope>
				</dependency>
			</dependencies>

			<build>
				<plugins>
					<plugin>
						<groupId>org.apache.maven.plugins</groupId>
						<artifactId>maven-shade-plugin</artifactId>
						<version>2.4.1</version>
						<executions>
							<execution>
								<phase>package</phase>
								<goals>
									<goal>shade</goal>
								</goals>
								<configuration>
									<artifactSet>
										<excludes combine.self="override"></excludes>
									</artifactSet>
								</configuration>
							</execution>
						</executions>
					</plugin>
				</plugins>
			</build>
		</profile>
	</profiles>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

my java code : 

import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class App 
{
    public static void main(String[] args) throws Exception {
		
		System.out.println("Hello World!");
    
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		Properties properties = new Properties();
	    properties.setProperty("bootstrap.servers", "localhost:9092");
	    properties.setProperty("zookeeper.connect", "localhost:2181");
	    properties.setProperty("group.id", "flink_consumer");

		DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>
			("ux_logs", new SimpleStringSchema(), properties));

		messageStream.rebalance().map(new MapFunction<String, String>() {

			private static final long serialVersionUID = -6867736771747690202L;

			public String map(String value) throws Exception {
				return "Kafka and Flink says: " + value;
			}
		}).print();
		
	    env.execute();
    }
}

And when i compile it, i get the following error:

java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010
        at ux.App.main(App.java:28)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Do i need to remove my kafka, and run a older version?

Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)