You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sri hari kali charan Tummala <ka...@gmail.com> on 2019/07/16 15:00:42 UTC
Fwd: org.apache.flink.table.api.TableException: Only tables that
originate from Scala DataStreams can be converted to Scala DataStreams.
>
>
> Hi All,
>
> I am trying to convert sql query results value to distinct and writing to
> CSV which is failing with below error.
>
> *Exception in thread "main" org.apache.flink.table.api.TableException:
> Only tables that originate from Scala DataStreams can be converted to Scala
> DataStreams.*
>
>
> * at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
> at
> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
> at
> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)*
>
> Code Example:-
>
> val data = kinesis.map(mapFunction)
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
> import org.apache.flink.streaming.api.scala._
> tEnv.sqlQuery(query).distinct().toRetractStream[Row]
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
> FileSystem.WriteMode.NO_OVERWRITE,"~","|")
>
>
>
Thanks & Regards
Sri Tummala
Re: org.apache.flink.table.api.TableException: Only tables that
originate from Scala DataStreams can be converted to Scala DataStreams.
Posted by Hequn Cheng <ch...@gmail.com>.
Hi Sri,
For scala jobs, we should import the corresponding scala Environment and
DataStream.
e.g,
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.scala.StreamTableEnvironment
See example here[1].
Best,
Hequn
[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
On Tue, Jul 16, 2019 at 11:03 PM sri hari kali charan Tummala <
kali.tummala@gmail.com> wrote:
> is this a Bug in Flink Scala?
>
> Full code and Maven POM:-
>
> package com.aws.examples.kinesis.consumer.TransactionExample
>
> import java.lang
> import java.util.Properties
>
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema}
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants}
> import com.google.gson.{Gson, JsonObject}
> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
> import java.sql.{DriverManager, Time}
>
> import com.aws.SchemaJavaClasses.Row1
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.scala._
> import org.apache.flink.table.sinks.CsvTableSink
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.table.sinks.TableSink
> import org.apache.flink.core.fs.{FileSystem, Path}
>
> import scala.collection.JavaConversions._
> import org.apache.flink.table.sources.CsvTableSource
> import org.apache.flink.table.api.Table
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.streaming.api.datastream.DataStream
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
> import com.aws.customSinks.CsvCustomSink
> import org.apache.flink.api.java.tuple
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.sinks.AppendStreamTableSink
> import org.apache.flink.table.sinks.RetractStreamTableSink
> import org.apache.flink.api.java.DataSet
>
>
>
> object KinesisConsumer extends RetractStreamTableSink[Row] {
>
> override def configure(strings: Array[String], typeInformations: Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
>
> override def getFieldNames: Array[String] = ???
>
> override def getFieldTypes: Array[TypeInformation[_]] = ???
>
> override def emitDataStream(dataStream: DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
>
> override def getOutputType(): TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType()
>
> override def getRecordType: TypeInformation[Row] = ???
>
>
> def main(args: Array[String]): Unit = {
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]] =
> new MapFunction[String, Tuple10[String, String, String, String, String, String, String, String, String, String]]() {
>
> override def map(s: String): Tuple10[String, String, String, String, String, String, String, String, String, String] = {
>
> val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
> val csvData = data.getCc_num + "," +
> data.getFirst + "," +
> data.getLast + "," +
> data.getTrans_num + "," +
> data.getTrans_time + "," +
> data.getCategory + "," +
> data.getMerchant + "," +
> data.getAmt + "," +
> data.getMerch_lat + "," +
> data.getMerch_long
>
> //println(csvData)
>
> val p: Array[String] = csvData.split(",")
> var cc_num: String = p(0)
> var first: String = p(1)
> var last: String = p(2)
> var trans_num: String = p(3)
> var trans_time: String = p(4)
> var category: String = p(5)
> var merchant: String = p(6)
> var amt: String = p(7)
> var merch_lat: String = p(8)
> var merch_long: String = p(9)
>
> val creationDate: Time = new Time(System.currentTimeMillis())
> return new Tuple10(cc_num, first, last, trans_num, trans_time, category, merchant, amt, merch_lat, merch_long)
> }
> }
>
> val data = kinesis.map(mapFunction)
> tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
> import org.apache.flink.streaming.api.scala._
> tEnv.sqlQuery(query).distinct().toRetractStream[Row]
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
> FileSystem.WriteMode.NO_OVERWRITE, "~", "|")
>
> env.execute()
> }
> }
>
> *POM:-*
>
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <modelVersion>4.0.0</modelVersion>
>
> <groupId>FlinkStreamAndSql</groupId>
> <artifactId>FlinkStreamAndSql</artifactId>
> <version>1.0-SNAPSHOT</version>
> <build>
> <sourceDirectory>src/main/scala</sourceDirectory>
> <testSourceDirectory>src/test/scala</testSourceDirectory>
> <plugins>
> <plugin>
> <!-- see http://davidb.github.com/scala-maven-plugin -->
> <groupId>net.alchim31.maven</groupId>
> <artifactId>scala-maven-plugin</artifactId>
> <version>3.1.3</version>
> <executions>
> <execution>
> <goals>
> <goal>compile</goal>
> <goal>testCompile</goal>
> </goals>
> <configuration>
> </configuration>
> </execution>
> </executions>
> </plugin>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-surefire-plugin</artifactId>
> <version>2.13</version>
> <configuration>
> <useFile>false</useFile>
> <disableXmlReport>true</disableXmlReport>
> <!-- If you have classpath issue like NoDefClassError,... -->
> <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
> <includes>
> <include>**/*Test.*</include>
> <include>**/*Suite.*</include>
> </includes>
> </configuration>
> </plugin>
>
> <!-- "package" command plugin -->
> <plugin>
> <artifactId>maven-assembly-plugin</artifactId>
> <version>2.4.1</version>
> <configuration>
> <descriptorRefs>
> <descriptorRef>jar-with-dependencies</descriptorRef>
> </descriptorRefs>
> </configuration>
> <executions>
> <execution>
> <id>make-assembly</id>
> <phase>package</phase>
> <goals>
> <goal>single</goal>
> </goals>
> </execution>
> </executions>
> </plugin>
> </plugins>
> </build>
> <dependencies>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-core</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-core</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.derby</groupId>
> <artifactId>derby</artifactId>
> <version>10.13.1.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-jdbc_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-api-scala_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-api-java</artifactId>
> <version>1.8.1</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-json</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-scala_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-scala_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-scala_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kinesis_2.11</artifactId>
> <version>1.8.0</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
> <version>1.8.1</version>
> </dependency>
>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>amazon-kinesis-client</artifactId>
> <version>1.8.8</version>
> </dependency>
>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>aws-java-sdk-kinesis</artifactId>
> <version>1.11.579</version>
> </dependency>
>
> <dependency>
> <groupId>commons-dbcp</groupId>
> <artifactId>commons-dbcp</artifactId>
> <version>1.2.2</version>
> </dependency>
> <dependency>
> <groupId>com.google.code.gson</groupId>
> <artifactId>gson</artifactId>
> <version>2.1</version>
> </dependency>
>
> <dependency>
> <groupId>commons-cli</groupId>
> <artifactId>commons-cli</artifactId>
> <version>1.4</version>
> </dependency>
>
> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
> <dependency>
> <groupId>org.apache.commons</groupId>
> <artifactId>commons-csv</artifactId>
> <version>1.7</version>
> </dependency>
>
> <dependency>
> <groupId>org.apache.commons</groupId>
> <artifactId>commons-compress</artifactId>
> <version>1.4.1</version>
> </dependency>
>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
> <version>1.4.0</version>
> </dependency>
>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
> <version>1.4.0</version>
> </dependency>
>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>aws-java-sdk</artifactId>
> <version>1.11.579</version>
> </dependency>
>
> </dependencies>
>
> </project>
>
>
> On Tue, Jul 16, 2019 at 11:00 AM sri hari kali charan Tummala <
> kali.tummala@gmail.com> wrote:
>
>>
>>> Hi All,
>>>
>>> I am trying to convert sql query results value to distinct and writing
>>> to CSV which is failing with below error.
>>>
>>> *Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Only tables that originate from Scala DataStreams can be converted to Scala
>>> DataStreams.*
>>>
>>>
>>> * at
>>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
>>> at
>>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
>>> at
>>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)*
>>>
>>> Code Example:-
>>>
>>> val data = kinesis.map(mapFunction)
>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
>>> val table = tEnv.sqlQuery(query)
>>> import org.apache.flink.streaming.api.scala._
>>> tEnv.sqlQuery(query).distinct().toRetractStream[Row]
>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
>>> FileSystem.WriteMode.NO_OVERWRITE,"~","|")
>>>
>>>
>>>
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>
Re: org.apache.flink.table.api.TableException: Only tables that
originate from Scala DataStreams can be converted to Scala DataStreams.
Posted by sri hari kali charan Tummala <ka...@gmail.com>.
is this a Bug in Flink Scala?
Full code and Maven POM:-
package com.aws.examples.kinesis.consumer.TransactionExample
import java.lang
import java.util.Properties
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
ConsumerConfigConstants}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}
import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.core.fs.{FileSystem, Path}
import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.api.java.tuple
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.sinks.AppendStreamTableSink
import org.apache.flink.table.sinks.RetractStreamTableSink
import org.apache.flink.api.java.DataSet
object KinesisConsumer extends RetractStreamTableSink[Row] {
override def configure(strings: Array[String], typeInformations:
Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]]
= ???
override def getFieldNames: Array[String] = ???
override def getFieldTypes: Array[TypeInformation[_]] = ???
override def emitDataStream(dataStream:
DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
override def getOutputType():
TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] = super.getOutputType()
override def getRecordType: TypeInformation[Row] = ???
def main(args: Array[String]): Unit = {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.enableCheckpointing(10)
val tEnv: org.apache.flink.table.api.java.StreamTableEnvironment =
TableEnvironment.getTableEnvironment(env)
// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials
// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")
// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))
val mapFunction: MapFunction[String, Tuple10[String, String,
String, String, String, String, String, String, String, String]] =
new MapFunction[String, Tuple10[String, String, String, String,
String, String, String, String, String, String]]() {
override def map(s: String): Tuple10[String, String, String,
String, String, String, String, String, String, String] = {
val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
val csvData = data.getCc_num + "," +
data.getFirst + "," +
data.getLast + "," +
data.getTrans_num + "," +
data.getTrans_time + "," +
data.getCategory + "," +
data.getMerchant + "," +
data.getAmt + "," +
data.getMerch_lat + "," +
data.getMerch_long
//println(csvData)
val p: Array[String] = csvData.split(",")
var cc_num: String = p(0)
var first: String = p(1)
var last: String = p(2)
var trans_num: String = p(3)
var trans_time: String = p(4)
var category: String = p(5)
var merchant: String = p(6)
var amt: String = p(7)
var merch_lat: String = p(8)
var merch_long: String = p(9)
val creationDate: Time = new Time(System.currentTimeMillis())
return new Tuple10(cc_num, first, last, trans_num,
trans_time, category, merchant, amt, merch_lat, merch_long)
}
}
val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions", data,
"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
import org.apache.flink.streaming.api.scala._
tEnv.sqlQuery(query).distinct().toRetractStream[Row]
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE, "~", "|")
env.execute()
}
}
*POM:-*
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>FlinkStreamAndSql</groupId>
<artifactId>FlinkStreamAndSql</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.3</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.13</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like
NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- "package" command plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.13.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.8.8</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.579</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<!--
https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.579</version>
</dependency>
</dependencies>
</project>
On Tue, Jul 16, 2019 at 11:00 AM sri hari kali charan Tummala <
kali.tummala@gmail.com> wrote:
>
>> Hi All,
>>
>> I am trying to convert sql query results value to distinct and writing to
>> CSV which is failing with below error.
>>
>> *Exception in thread "main" org.apache.flink.table.api.TableException:
>> Only tables that originate from Scala DataStreams can be converted to Scala
>> DataStreams.*
>>
>>
>> * at
>> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
>> at
>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
>> at
>> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)*
>>
>> Code Example:-
>>
>> val data = kinesis.map(mapFunction)
>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>> val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')"
>> val table = tEnv.sqlQuery(query)
>> import org.apache.flink.streaming.api.scala._
>> tEnv.sqlQuery(query).distinct().toRetractStream[Row]
>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
>> FileSystem.WriteMode.NO_OVERWRITE,"~","|")
>>
>>
>>
> Thanks & Regards
> Sri Tummala
>
>
--
Thanks & Regards
Sri Tummala