You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2017/05/21 11:19:04 UTC
[jira] [Closed] (FLINK-4061) about flink jdbc connect oracle db
exists a crital bug
[ https://issues.apache.org/jira/browse/FLINK-4061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chesnay Schepler closed FLINK-4061.
-----------------------------------
Resolution: Cannot Reproduce
> about flink jdbc connect oracle db exists a crital bug
> -------------------------------------------------------
>
> Key: FLINK-4061
> URL: https://issues.apache.org/jira/browse/FLINK-4061
> Project: Flink
> Issue Type: Bug
> Components: DataSet API
> Affects Versions: 1.1.0
> Environment: ubuntu ,jdk1.8.0 ,Start a Local Flink Cluster
> Reporter: dengchangfu
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I use flink-jdbc to connect oracle db for etl, so i write a demo to test the feature. the code is simple,but after I submit this app ,a exception happen.
> exception info like this:
> Caused by: java.lang.NullPointerException
> at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:231)
> at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> my code like this:
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
> import org.apache.flink.api.table.Row;
> import org.apache.flink.api.table.typeutils.RowTypeInfo;
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder;
> import java.sql.ResultSet;
> import java.sql.Types;
> /**
> * Skeleton for a Flink Job.
> *
> * For a full example of a Flink Job, see the WordCountJob.java file in the
> * same package/directory or have a look at the website.
> *
> * You can also generate a .jar file that you can submit on your Flink
> * cluster.
> * Just type
> * mvn clean package
> * in the projects root directory.
> * You will find the jar in
> * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
> *
> */
> public class Job {
> public static final TypeInformation<?>[] fieldTypes = new TypeInformation<?>[]{
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.FLOAT_TYPE_INFO
> };
> public static final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
> public static void main(String[] args) {
> // set up the execution environment
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("select CLIENT_ID,OCCUR_BALANCE from HS_ASSET.FUNDJOUR@OTC")
> .setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
> .setRowTypeInfo(rowTypeInfo);
> DataSet<Row> source = env.createInput(inputBuilder.finish());
> source.output(JDBCOutputFormat.buildJDBCOutputFormat()
> .setDrivername("oracle.jdbc.driver.OracleDriver")
> .setDBUrl("jdbc:oracle:thin:@10.2.121.128:1521:jgjtest")
> .setUsername("crmii")
> .setPassword("crmii")
> .setQuery("insert into dengabc (client_id,salary) values(?,?)")
> .setSqlTypes(new int[]{Types.VARCHAR, Types.DOUBLE})
> .finish());
> //source.print();
> //source.first(20).print();
> //dbData.print();
> /**
> * Here, you can start creating your execution plan for Flink.
> *
> * Start with getting some data from the environment, like
> * env.readTextFile(textPath);
> *
> * then, transform the resulting DataSet<String> using operations
> * like
> * .filter()
> * .flatMap()
> * .join()
> * .coGroup()
> * and many more.
> * Have a look at the programming guide for the Java API:
> *
> * http://flink.apache.org/docs/latest/apis/batch/index.html
> *
> * and the examples
> *
> * http://flink.apache.org/docs/latest/apis/batch/examples.html
> *
> */
> // execute program
> try {
> env.execute("Flink Java API Skeleton");
> } catch (Exception e) {
> e.getMessage();
> }
> }
> }
> my pom.xml like this:
> <!--
> Licensed to the Apache Software Foundation (ASF) under one
> or more contributor license agreements. See the NOTICE file
> distributed with this work for additional information
> regarding copyright ownership. The ASF licenses this file
> to you under the Apache License, Version 2.0 (the
> "License"); you may not use this file except in compliance
> with the License. You may obtain a copy of the License at
> http://www.apache.org/licenses/LICENSE-2.0
> Unless required by applicable law or agreed to in writing,
> software distributed under the License is distributed on an
> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> KIND, either express or implied. See the License for the
> specific language governing permissions and limitations
> under the License.
> -->
> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <modelVersion>4.0.0</modelVersion>
>
> <groupId>com.gf.flink</groupId>
> <artifactId>mot</artifactId>
> <version>1.0-SNAPSHOT</version>
> <packaging>jar</packaging>
> <name>Flink Quickstart Job</name>
> <url>http://www.myorganization.org</url>
> <properties>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> <flink.version>1.0.3</flink.version>
> </properties>
> <repositories>
> <repository>
> <id>apache.snapshots</id>
> <name>Apache Development Snapshot Repository</name>
> <url>https://repository.apache.org/content/repositories/snapshots/</url>
> <releases>
> <enabled>false</enabled>
> </releases>
> <snapshots>
> <enabled>true</enabled>
> </snapshots>
> </repository>
> <!--<repository>-->
> <!--<id>mvnrepository</id>-->
> <!--<name>mvnrepository</name>-->
> <!--<url>http://mvnrepository.com/</url>-->
> <!--<releases>-->
> <!--<enabled>true</enabled>-->
> <!--</releases>-->
> <!--<snapshots>-->
> <!--<enabled>true</enabled>-->
> <!--</snapshots>-->
> <!--</repository>-->
> <repository>
> <id>nexus</id>
> <name>local private nexus</name>
> <url>http://10.2.110.202:8081/nexus/content/groups/public/</url>
> <releases><enabled>true</enabled></releases>
> <snapshots><enabled>false</enabled></snapshots>
> </repository>
> <repository>
> <id>nexus-snapshots</id>
> <name>local private nexus</name>
> <url>http://10.2.110.202:8081/nexus/content/groups/public-snapshots/</url>
> <releases><enabled>false</enabled></releases>
> <snapshots><enabled>true</enabled></snapshots>
> </repository>
> </repositories>
>
> <!--
>
> Execute "mvn clean package -Pbuild-jar"
> to build a jar file out of this project!
> How to use the Flink Quickstart pom:
> a) Adding new dependencies:
> You can add dependencies to the list below.
> Please check if the maven-shade-plugin below is filtering out your dependency
> and remove the exclude from there.
> b) Build a jar for running on the cluster:
> There are two options for creating a jar from this project
> b.1) "mvn clean package" -> this will create a fat jar which contains all
> dependencies necessary for running the jar created by this pom in a cluster.
> The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
> b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
> nicer dependency exclusion handling. This approach is preferred and leads to
> much cleaner jar files.
> -->
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>1.1-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.10</artifactId>
> <version>1.1-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.10</artifactId>
> <version>1.1-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table_2.10</artifactId>
> <version>1.1-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-core</artifactId>
> <version>1.1-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-jdbc</artifactId>
> <version>1.1-SNAPSHOT</version>
> </dependency>
> <dependency>
> <groupId>com.oracle</groupId>
> <artifactId>com.oracle.ojdbc</artifactId>
> <version>14</version>
> </dependency>
> </dependencies>
> <profiles>
> <profile>
> <!-- Profile for packaging correct JAR files -->
> <id>build-jar</id>
> <activation>
> <activeByDefault>false</activeByDefault>
> </activation>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-java</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-java_2.10</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.10</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> </dependencies>
> <build>
> <plugins>
> <!-- disable the exclusion rules -->
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-shade-plugin</artifactId>
> <version>2.4.1</version>
> <executions>
> <execution>
> <phase>package</phase>
> <goals>
> <goal>shade</goal>
> </goals>
> <configuration>
> <artifactSet>
> <excludes combine.self="override"></excludes>
> </artifactSet>
> </configuration>
> </execution>
> </executions>
> </plugin>
> </plugins>
> </build>
> </profile>
> </profiles>
> <build>
> <plugins>
> <!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
> except flink and it's transitive dependencies. The resulting fat-jar can be executed
> on a cluster. Change the value of Program-Class if your program entry point changes. -->
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-shade-plugin</artifactId>
> <version>2.4.1</version>
> <executions>
> <!-- Run shade goal on package phase -->
> <execution>
> <phase>package</phase>
> <goals>
> <goal>shade</goal>
> </goals>
> <configuration>
> <artifactSet>
> <excludes>
> <!-- This list contains all dependencies of flink-dist
> Everything else will be packaged into the fat-jar
> -->
> <exclude>org.apache.flink:flink-annotations</exclude>
> <exclude>org.apache.flink:flink-shaded-hadoop1</exclude>
> <exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
> <exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
> <exclude>org.apache.flink:flink-core</exclude>
> <exclude>org.apache.flink:flink-java</exclude>
> <exclude>org.apache.flink:flink-scala_2.10</exclude>
> <exclude>org.apache.flink:flink-runtime_2.10</exclude>
> <exclude>org.apache.flink:flink-optimizer_2.10</exclude>
> <exclude>org.apache.flink:flink-clients_2.10</exclude>
> <exclude>org.apache.flink:flink-avro_2.10</exclude>
> <exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
> <exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
> <exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
> <!-- Also exclude very big transitive dependencies of Flink
> WARNING: You have to remove these excludes if your code relies on other
> versions of these dependencies.
> -->
> <exclude>org.scala-lang:scala-library</exclude>
> <exclude>org.scala-lang:scala-compiler</exclude>
> <exclude>org.scala-lang:scala-reflect</exclude>
> <exclude>com.amazonaws:aws-java-sdk</exclude>
> <exclude>com.typesafe.akka:akka-actor_*</exclude>
> <exclude>com.typesafe.akka:akka-remote_*</exclude>
> <exclude>com.typesafe.akka:akka-slf4j_*</exclude>
> <exclude>io.netty:netty-all</exclude>
> <exclude>io.netty:netty</exclude>
> <exclude>commons-fileupload:commons-fileupload</exclude>
> <exclude>org.apache.avro:avro</exclude>
> <exclude>commons-collections:commons-collections</exclude>
> <exclude>org.codehaus.jackson:jackson-core-asl</exclude>
> <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
> <exclude>com.thoughtworks.paranamer:paranamer</exclude>
> <exclude>org.xerial.snappy:snappy-java</exclude>
> <exclude>org.apache.commons:commons-compress</exclude>
> <exclude>org.tukaani:xz</exclude>
> <exclude>com.esotericsoftware.kryo:kryo</exclude>
> <exclude>com.esotericsoftware.minlog:minlog</exclude>
> <exclude>org.objenesis:objenesis</exclude>
> <exclude>com.twitter:chill_*</exclude>
> <exclude>com.twitter:chill-java</exclude>
> <exclude>com.twitter:chill-avro_*</exclude>
> <exclude>com.twitter:chill-bijection_*</exclude>
> <exclude>com.twitter:bijection-core_*</exclude>
> <exclude>com.twitter:bijection-avro_*</exclude>
> <exclude>commons-lang:commons-lang</exclude>
> <exclude>junit:junit</exclude>
> <exclude>de.javakaffee:kryo-serializers</exclude>
> <exclude>joda-time:joda-time</exclude>
> <exclude>org.apache.commons:commons-lang3</exclude>
> <exclude>org.slf4j:slf4j-api</exclude>
> <exclude>org.slf4j:slf4j-log4j12</exclude>
> <exclude>log4j:log4j</exclude>
> <exclude>org.apache.commons:commons-math</exclude>
> <exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
> <exclude>commons-logging:commons-logging</exclude>
> <exclude>commons-codec:commons-codec</exclude>
> <exclude>com.fasterxml.jackson.core:jackson-core</exclude>
> <exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
> <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
> <exclude>stax:stax-api</exclude>
> <exclude>com.typesafe:config</exclude>
> <exclude>org.uncommons.maths:uncommons-maths</exclude>
> <exclude>com.github.scopt:scopt_*</exclude>
> <exclude>commons-io:commons-io</exclude>
> <exclude>commons-cli:commons-cli</exclude>
> </excludes>
> </artifactSet>
> <filters>
> <filter>
> <artifact>org.apache.flink:*</artifact>
> <excludes>
> <!-- exclude shaded google but include shaded curator -->
> <exclude>org/apache/flink/shaded/com/**</exclude>
> <exclude>web-docs/**</exclude>
> </excludes>
> </filter>
> <filter>
> <!-- Do not copy the signatures in the META-INF folder.
> Otherwise, this might cause SecurityExceptions when using the JAR. -->
> <artifact>*:*</artifact>
> <excludes>
> <exclude>META-INF/*.SF</exclude>
> <exclude>META-INF/*.DSA</exclude>
> <exclude>META-INF/*.RSA</exclude>
> </excludes>
> </filter>
> </filters>
> <transformers>
> <!-- add Main-Class to manifest file -->
> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> <mainClass>com.gf.flink.Job</mainClass>
> </transformer>
> </transformers>
> <createDependencyReducedPom>false</createDependencyReducedPom>
> </configuration>
> </execution>
> </executions>
> </plugin>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-compiler-plugin</artifactId>
> <version>3.1</version>
> <configuration>
> <source>1.7</source> <!-- If you want to use Java 8, change this to "1.8" -->
> <target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
> </configuration>
> </plugin>
> </plugins>
>
>
> <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
> <!--
> <pluginManagement>
> <plugins>
> <plugin>
> <artifactId>maven-compiler-plugin</artifactId>
> <configuration>
> <source>1.8</source>
> <target>1.8</target>
> <compilerId>jdt</compilerId>
> </configuration>
> <dependencies>
> <dependency>
> <groupId>org.eclipse.tycho</groupId>
> <artifactId>tycho-compiler-jdt</artifactId>
> <version>0.21.0</version>
> </dependency>
> </dependencies>
> </plugin>
>
> <plugin>
> <groupId>org.eclipse.m2e</groupId>
> <artifactId>lifecycle-mapping</artifactId>
> <version>1.0.0</version>
> <configuration>
> <lifecycleMappingMetadata>
> <pluginExecutions>
> <pluginExecution>
> <pluginExecutionFilter>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-assembly-plugin</artifactId>
> <versionRange>[2.4,)</versionRange>
> <goals>
> <goal>single</goal>
> </goals>
> </pluginExecutionFilter>
> <action>
> <ignore/>
> </action>
> </pluginExecution>
> <pluginExecution>
> <pluginExecutionFilter>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-compiler-plugin</artifactId>
> <versionRange>[3.1,)</versionRange>
> <goals>
> <goal>testCompile</goal>
> <goal>compile</goal>
> </goals>
> </pluginExecutionFilter>
> <action>
> <ignore/>
> </action>
> </pluginExecution>
> </pluginExecutions>
> </lifecycleMappingMetadata>
> </configuration>
> </plugin>
> </plugins>
> </pluginManagement>
> -->
>
> </build>
> </project>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)