You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2017/05/21 11:19:04 UTC

[jira] [Closed] (FLINK-4061) about flink jdbc connect oracle db exists a crital bug

     [ https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chesnay Schepler closed FLINK-4061.
-----------------------------------
    Resolution: Cannot Reproduce

>  about flink jdbc connect oracle db exists a crital bug
> -------------------------------------------------------
>
>                 Key: FLINK-4061
>                 URL: https://issues.apache.org/jira/browse/FLINK-4061
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API
>    Affects Versions: 1.1.0
>         Environment: ubuntu ,jdk1.8.0  ,Start a Local Flink Cluster
>            Reporter: dengchangfu
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I use flink-jdbc to connect oracle db for etl, so i write a demo to test the feature. the code is simple,but after I submit this app ,a exception happen.
> exception info like this:
> Caused by: java.lang.NullPointerException
> 	at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231)
> 	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)
> my code like this:
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
> import org.apache.flink.api.table.Row;
> import org.apache.flink.api.table.typeutils.RowTypeInfo;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
> import java.sql.ResultSet;
> import java.sql.Types;
> /**
>  * Skeleton for a Flink Job.
>  *
>  * For a full example of a Flink Job, see the WordCountJob.java file in the
>  * same package/directory or have a look at the website.
>  *
>  * You can also generate a .jar file that you can submit on your Flink
>  * cluster.
>  * Just type
>  * 		mvn clean package
>  * in the projects root directory.
>  * You will find the jar in
>  * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
>  *
>  */
> public class Job {
>     public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[]{
>             BasicTypeInfo.STRING_TYPE_INFO,
>             BasicTypeInfo.FLOAT_TYPE_INFO
>     };
>     public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
>     public static void main(String[] args) {
>         // set up the execution environment
>         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>         JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
>                 .setDrivername("oracle.jdbc.driver.OracleDriver")
>                 .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
>                 .setUsername("crmii")
>                 .setPassword("crmii")
>                 .setQuery("select CLIENT_ID,OCCUR_BALANCE from HS_ASSET.FUNDJOUR@OTC")
>                 .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
>                 .setRowTypeInfo(rowTypeInfo);
>         DataSet<Row> source = env.createInput(inputBuilder.finish());
>         source.output(JDBCOutputFormat.buildJDBCOutputFormat()
>                 .setDrivername("oracle.jdbc.driver.OracleDriver")
>                 .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
>                 .setUsername("crmii")
>                 .setPassword("crmii")
>                 .setQuery("insert into dengabc (client_id,salary) values(?,?)")
>                 .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE})
>                 .finish());
>         //source.print();
>         //source.first(20).print();
>         //dbData.print();
>         /**
>          * Here, you can start creating your execution plan for Flink.
>          *
>          * Start with getting some data from the environment, like
>          * 	env.readTextFile(textPath);
>          *
>          * then, transform the resulting DataSet<String> using operations
>          * like
>          * 	.filter()
>          * 	.flatMap()
>          * 	.join()
>          * 	.coGroup()
>          * and many more.
>          * Have a look at the programming guide for the Java API:
>          *
>          * http://flink.apache.org/docs/latest/apis/batch/index.html
>          *
>          * and the examples
>          *
>          * http://flink.apache.org/docs/latest/apis/batch/examples.html
>          *
>          */
>         // execute program
>         try {
>             env.execute("Flink Java API Skeleton");
>         } catch (Exception e) {
>             e.getMessage();
>         }
>     }
> }
> my pom.xml like this:
> <!--
> Licensed to the Apache Software Foundation (ASF) under one
> or more contributor license agreements.  See the NOTICE file
> distributed with this work for additional information
> regarding copyright ownership.  The ASF licenses this file
> to you under the Apache License, Version 2.0 (the
> "License"); you may not use this file except in compliance
> with the License.  You may obtain a copy of the License at
>   http://www.apache.org/licenses/LICENSE-2.0
> Unless required by applicable law or agreed to in writing,
> software distributed under the License is distributed on an
> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> KIND, either express or implied.  See the License for the
> specific language governing permissions and limitations
> under the License.
> -->
> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> 	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
> 	<modelVersion>4.0.0</modelVersion>
> 	
> 	<groupId>com.gf.flink</groupId>
> 	<artifactId>mot</artifactId>
> 	<version>1.0-SNAPSHOT</version>
> 	<packaging>jar</packaging>
> 	<name>Flink Quickstart Job</name>
> 	<url>http://www.myorganization.org</url>
> 	<properties>
> 		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> 		<flink.version>1.0.3</flink.version>
> 	</properties>
> 	<repositories>
> 		<repository>
> 			<id>apache.snapshots</id>
> 			<name>Apache Development Snapshot Repository</name>
> 			<url>https://repository.apache.org/content/repositories/snapshots/</url>
> 			<releases>
> 				<enabled>false</enabled>
> 			</releases>
> 			<snapshots>
> 				<enabled>true</enabled>
> 			</snapshots>
> 		</repository>
> 		<!--<repository>-->
> 			<!--<id>mvnrepository</id>-->
> 			<!--<name>mvnrepository</name>-->
> 			<!--<url>http://mvnrepository.com/</url>-->
> 			<!--<releases>-->
> 				<!--<enabled>true</enabled>-->
> 			<!--</releases>-->
> 			<!--<snapshots>-->
> 				<!--<enabled>true</enabled>-->
> 			<!--</snapshots>-->
> 		<!--</repository>-->
> 		<repository>
> 			<id>nexus</id>
> 			<name>local private nexus</name>
> 			<url>http://10.2.110.202:8081/nexus/content/groups/public/</url>
> 			<releases><enabled>true</enabled></releases>
> 			<snapshots><enabled>false</enabled></snapshots>
> 		</repository>
> 		<repository>
> 			<id>nexus-snapshots</id>
> 			<name>local private nexus</name>
> 			<url>http://10.2.110.202:8081/nexus/content/groups/public-snapshots/</url>
> 			<releases><enabled>false</enabled></releases>
> 			<snapshots><enabled>true</enabled></snapshots>
> 		</repository>
> 	</repositories>
> 	
> 	<!-- 
> 		
> 		Execute "mvn clean package -Pbuild-jar"
> 		to build a jar file out of this project!
> 		How to use the Flink Quickstart pom:
> 		a) Adding new dependencies:
> 			You can add dependencies to the list below.
> 			Please check if the maven-shade-plugin below is filtering out your dependency
> 			and remove the exclude from there.
> 		b) Build a jar for running on the cluster:
> 			There are two options for creating a jar from this project
> 			b.1) "mvn clean package" -> this will create a fat jar which contains all
> 					dependencies necessary for running the jar created by this pom in a cluster.
> 					The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
> 			b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
> 					nicer dependency exclusion handling. This approach is preferred and leads to
> 					much cleaner jar files.
> 	-->
> 	<dependencies>
> 		<dependency>
> 			<groupId>org.apache.flink</groupId>
> 			<artifactId>flink-java</artifactId>
> 			<version>1.1-SNAPSHOT</version>
> 		</dependency>
> 		<dependency>
> 			<groupId>org.apache.flink</groupId>
> 			<artifactId>flink-streaming-java_2.10</artifactId>
> 			<version>1.1-SNAPSHOT</version>
> 		</dependency>
> 		<dependency>
> 			<groupId>org.apache.flink</groupId>
> 			<artifactId>flink-clients_2.10</artifactId>
> 			<version>1.1-SNAPSHOT</version>
> 		</dependency>
> 		<dependency>
> 			<groupId>org.apache.flink</groupId>
> 			<artifactId>flink-table_2.10</artifactId>
> 			<version>1.1-SNAPSHOT</version>
> 		</dependency>
> 		<dependency>
> 			<groupId>org.apache.flink</groupId>
> 			<artifactId>flink-core</artifactId>
> 			<version>1.1-SNAPSHOT</version>
> 		</dependency>
> 		<dependency>
> 			<groupId>org.apache.flink</groupId>
> 			<artifactId>flink-jdbc</artifactId>
> 			<version>1.1-SNAPSHOT</version>
> 		</dependency>
> 		<dependency>
> 			<groupId>com.oracle</groupId>
> 			<artifactId>com.oracle.ojdbc</artifactId>
> 			<version>14</version>
> 		</dependency>
> 	</dependencies>
> 	<profiles>
> 		<profile>
> 			<!-- Profile for packaging correct JAR files -->
> 			<id>build-jar</id>
> 			<activation>
> 				<activeByDefault>false</activeByDefault>
> 			</activation>
> 			<dependencies>
> 				<dependency>
> 					<groupId>org.apache.flink</groupId>
> 					<artifactId>flink-java</artifactId>
> 					<version>${flink.version}</version>
> 					<scope>provided</scope>
> 				</dependency>
> 				<dependency>
> 					<groupId>org.apache.flink</groupId>
> 					<artifactId>flink-streaming-java_2.10</artifactId>
> 					<version>${flink.version}</version>
> 					<scope>provided</scope>
> 				</dependency>
> 				<dependency>
> 					<groupId>org.apache.flink</groupId>
> 					<artifactId>flink-clients_2.10</artifactId>
> 					<version>${flink.version}</version>
> 					<scope>provided</scope>
> 				</dependency>
> 			</dependencies>
> 			<build>
> 				<plugins>
> 					<!-- disable the exclusion rules -->
> 					<plugin>
> 						<groupId>org.apache.maven.plugins</groupId>
> 						<artifactId>maven-shade-plugin</artifactId>
> 						<version>2.4.1</version>
> 						<executions>
> 							<execution>
> 								<phase>package</phase>
> 								<goals>
> 									<goal>shade</goal>
> 								</goals>
> 								<configuration>
> 									<artifactSet>
> 										<excludes combine.self="override"></excludes>
> 									</artifactSet>
> 								</configuration>
> 							</execution>
> 						</executions>
> 					</plugin>
> 				</plugins>
> 			</build>
> 		</profile>
> 	</profiles>
> 	<build>
> 		<plugins>
> 			<!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
> 			except flink and it's transitive dependencies. The resulting fat-jar can be executed
> 			on a cluster. Change the value of Program-Class if your program entry point changes. -->
> 			<plugin>
> 				<groupId>org.apache.maven.plugins</groupId>
> 				<artifactId>maven-shade-plugin</artifactId>
> 				<version>2.4.1</version>
> 				<executions>
> 					<!-- Run shade goal on package phase -->
> 					<execution>
> 						<phase>package</phase>
> 						<goals>
> 							<goal>shade</goal>
> 						</goals>
> 						<configuration>
> 							<artifactSet>
> 								<excludes>
> 									<!-- This list contains all dependencies of flink-dist
> 									Everything else will be packaged into the fat-jar
> 									-->
> 									<exclude>org.apache.flink:flink-annotations</exclude>
> 									<exclude>org.apache.flink:flink-shaded-hadoop1</exclude>
> 									<exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
> 									<exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
> 									<exclude>org.apache.flink:flink-core</exclude>
> 									<exclude>org.apache.flink:flink-java</exclude>
> 									<exclude>org.apache.flink:flink-scala_2.10</exclude>
> 									<exclude>org.apache.flink:flink-runtime_2.10</exclude>
> 									<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
> 									<exclude>org.apache.flink:flink-clients_2.10</exclude>
> 									<exclude>org.apache.flink:flink-avro_2.10</exclude>
> 									<exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
> 									<exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
> 									<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
> 									<!-- Also exclude very big transitive dependencies of Flink
> 									WARNING: You have to remove these excludes if your code relies on other
> 									versions of these dependencies.
> 									-->
> 									<exclude>org.scala-lang:scala-library</exclude>
> 									<exclude>org.scala-lang:scala-compiler</exclude>
> 									<exclude>org.scala-lang:scala-reflect</exclude>
> 									<exclude>com.amazonaws:aws-java-sdk</exclude>
> 									<exclude>com.typesafe.akka:akka-actor_*</exclude>
> 									<exclude>com.typesafe.akka:akka-remote_*</exclude>
> 									<exclude>com.typesafe.akka:akka-slf4j_*</exclude>
> 									<exclude>io.netty:netty-all</exclude>
> 									<exclude>io.netty:netty</exclude>
> 									<exclude>commons-fileupload:commons-fileupload</exclude>
> 									<exclude>org.apache.avro:avro</exclude>
> 									<exclude>commons-collections:commons-collections</exclude>
> 									<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
> 									<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
> 									<exclude>com.thoughtworks.paranamer:paranamer</exclude>
> 									<exclude>org.xerial.snappy:snappy-java</exclude>
> 									<exclude>org.apache.commons:commons-compress</exclude>
> 									<exclude>org.tukaani:xz</exclude>
> 									<exclude>com.esotericsoftware.kryo:kryo</exclude>
> 									<exclude>com.esotericsoftware.minlog:minlog</exclude>
> 									<exclude>org.objenesis:objenesis</exclude>
> 									<exclude>com.twitter:chill_*</exclude>
> 									<exclude>com.twitter:chill-java</exclude>
> 									<exclude>com.twitter:chill-avro_*</exclude>
> 									<exclude>com.twitter:chill-bijection_*</exclude>
> 									<exclude>com.twitter:bijection-core_*</exclude>
> 									<exclude>com.twitter:bijection-avro_*</exclude>
> 									<exclude>commons-lang:commons-lang</exclude>
> 									<exclude>junit:junit</exclude>
> 									<exclude>de.javakaffee:kryo-serializers</exclude>
> 									<exclude>joda-time:joda-time</exclude>
> 									<exclude>org.apache.commons:commons-lang3</exclude>
> 									<exclude>org.slf4j:slf4j-api</exclude>
> 									<exclude>org.slf4j:slf4j-log4j12</exclude>
> 									<exclude>log4j:log4j</exclude>
> 									<exclude>org.apache.commons:commons-math</exclude>
> 									<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
> 									<exclude>commons-logging:commons-logging</exclude>
> 									<exclude>commons-codec:commons-codec</exclude>
> 									<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
> 									<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
> 									<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
> 									<exclude>stax:stax-api</exclude>
> 									<exclude>com.typesafe:config</exclude>
> 									<exclude>org.uncommons.maths:uncommons-maths</exclude>
> 									<exclude>com.github.scopt:scopt_*</exclude>
> 									<exclude>commons-io:commons-io</exclude>
> 									<exclude>commons-cli:commons-cli</exclude>
> 								</excludes>
> 							</artifactSet>
> 							<filters>
> 								<filter>
> 									<artifact>org.apache.flink:*</artifact>
> 									<excludes>
> 										<!-- exclude shaded google but include shaded curator -->
> 										<exclude>org/apache/flink/shaded/com/**</exclude>
> 										<exclude>web-docs/**</exclude>
> 									</excludes>
> 								</filter>
> 								<filter>
> 									<!-- Do not copy the signatures in the META-INF folder.
> 									Otherwise, this might cause SecurityExceptions when using the JAR. -->
> 									<artifact>*:*</artifact>
> 									<excludes>
> 										<exclude>META-INF/*.SF</exclude>
> 										<exclude>META-INF/*.DSA</exclude>
> 										<exclude>META-INF/*.RSA</exclude>
> 									</excludes>
> 								</filter>
> 							</filters>
> 							<transformers>
> 								<!-- add Main-Class to manifest file -->
> 								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> 									<mainClass>com.gf.flink.Job</mainClass>
> 								</transformer>
> 							</transformers>
> 							<createDependencyReducedPom>false</createDependencyReducedPom>
> 						</configuration>
> 					</execution>
> 				</executions>
> 			</plugin>
> 			<plugin>
> 				<groupId>org.apache.maven.plugins</groupId>
> 				<artifactId>maven-compiler-plugin</artifactId>
> 				<version>3.1</version>
> 				<configuration>
> 					<source>1.7</source> <!-- If you want to use Java 8, change this to "1.8" -->
> 					<target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
> 				</configuration>
> 			</plugin>
> 		</plugins>
> 		
> 			
> 		<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
> 		<!--
> 		<pluginManagement>
> 			<plugins>
> 				<plugin>
> 					<artifactId>maven-compiler-plugin</artifactId>
> 					<configuration>
> 						<source>1.8</source>
> 						<target>1.8</target>
> 						<compilerId>jdt</compilerId>
> 					</configuration>
> 					<dependencies>
> 						<dependency>
> 							<groupId>org.eclipse.tycho</groupId>
> 							<artifactId>tycho-compiler-jdt</artifactId>
> 							<version>0.21.0</version>
> 						</dependency>
> 					</dependencies>
> 				</plugin>
> 		
> 				<plugin>
> 					<groupId>org.eclipse.m2e</groupId>
> 					<artifactId>lifecycle-mapping</artifactId>
> 					<version>1.0.0</version>
> 					<configuration>
> 						<lifecycleMappingMetadata>
> 							<pluginExecutions>
> 								<pluginExecution>
> 									<pluginExecutionFilter>
> 										<groupId>org.apache.maven.plugins</groupId>
> 										<artifactId>maven-assembly-plugin</artifactId>
> 										<versionRange>[2.4,)</versionRange>
> 										<goals>
> 											<goal>single</goal>
> 										</goals>
> 									</pluginExecutionFilter>
> 									<action>
> 										<ignore/>
> 									</action>
> 								</pluginExecution>
> 								<pluginExecution>
> 									<pluginExecutionFilter>
> 										<groupId>org.apache.maven.plugins</groupId>
> 										<artifactId>maven-compiler-plugin</artifactId>
> 										<versionRange>[3.1,)</versionRange>
> 										<goals>
> 											<goal>testCompile</goal>
> 											<goal>compile</goal>
> 										</goals>
> 									</pluginExecutionFilter>
> 									<action>
> 										<ignore/>
> 									</action>
> 								</pluginExecution>
> 							</pluginExecutions>
> 						</lifecycleMappingMetadata>
> 					</configuration>
> 				</plugin>
> 			</plugins>
> 		</pluginManagement>
> 		-->
> 		
> 	</build>
> </project>



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)