You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/11/30 16:02:00 UTC

[jira] [Closed] (FLINK-11031) How to consume the Hive data in Flink using Scala

     [ https://issues.apache.org/jira/browse/FLINK-11031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fabian Hueske closed FLINK-11031.
---------------------------------
       Resolution: Invalid
    Fix Version/s:     (was: 1.6.2)

Hi [~Preethi@C], please post questions like yours to the Flink user mailing list.
We use Jira to track bugs, improvements, and new features. 

Thank you,
Fabian

> How to consume the Hive data in Flink using Scala
> -------------------------------------------------
>
>                 Key: FLINK-11031
>                 URL: https://issues.apache.org/jira/browse/FLINK-11031
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.6.2
>            Reporter: Preethi.C
>            Priority: Major
>
> It showing the below error
> "could not find implicit value for evidence parameter of type" when i call the class,below are my code.
> Object HiveMainMethod {
> def main(args: Array[String]): Unit = {
>  
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> implicit val dataStream: DataStream[Hivedata] = env.addSource(new JDBC_Connection)
> }
> }
> class JDBC_Connection extends RichSourceFunction[Hivedata] {
>  private var connection: Connection = null
>  private var ps: PreparedStatement = null
>  
>  override def open(parameters: Configuration): Unit = {
>  super.open(parameters)
>  val driver = "com.mysql.jdbc.Driver"
>  val url = "jdbc:mysql://qingcheng11:3306/flinktest"
>  val username = "root"
>  val password = "qingcheng"
>  Class.forName(driver)
>  connection = DriverManager.getConnection(url, username, password)
>  val sql = "select stuid,stuname,stuaddr,stusex from Student;"
>  ps = connection.prepareStatement(sql)
>  }
>  override def run(implicit sourceContext: SourceContext[Hivedata]): Unit = {
>  try {
>  val resultSet = ps.executeQuery()
>  while (resultSet.next()) {
>  val student = Hivedata(resultSet.getInt("ID"), resultSet.getString("Name").trim)
>  sourceContext.collect(student)
>  }
>  } 
>  catch {
>  case e: Exception => println(e.getMessage)
>  }
>  }
> override def cancel(): Unit = {
>  }
>  override def close(): Unit = {
>  super.close()
>  if (connection != null) {
>  connection.close()
>  }
>  if (ps != null) {
>  ps.close()
>  }
>  }
> }
> Refered the below linkĀ 
> https://github.com/liguohua-bigdata/simple-flink/blob/master/book/stream/customSource/customSourceScala.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)