You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ankur Sharma <an...@stud.uni-saarland.de> on 2016/03/06 20:17:28 UTC

SourceFunction Scala

Hello,

I am trying to use a custom source function (declaration given below) for DataStream.
if I add the source to stream using add source: 

val stream = env.addSource(new QueryOneSource(args))
I get following error:  Any explanations and help ??

Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
    val stream = env.addSource(new QueryOneSource(args))
                              ^
Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
Unspecified value parameter evidence$16.
    val stream = env.addSource(new QueryOneSource(args))
                              ^

class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
override def run(ctx: SourceContext[Tuple]) = {
  while (true) {
    nextRecord()
    ctx.collect(this.nextTuple)
  }
}

override def cancel() = { }
}

override def nextRecord() = {
}
}

Best,
Ankur Sharma
Information Systems Group
3.15 E1.1 Universität des Saarlandes
66123, Saarbrücken Germany
Email: ankur.sharma@mpi-inf.mpg.de <ma...@mpi-inf.mpg.de> 
            ankur@stud.uni-saarland.de <ma...@stud.uni-saarland.de>

Re: SourceFunction Scala

Posted by Stefano Baghino <st...@radicalbit.io>.
Hi Ankur,

I'm catching up with this week mailing list right now; I hope you already
solved the issue, but if you haven't this kind of problem happen when you
use a version of Scala for which your Flink dependencies have not been
compiled for. Make sure you append the correct Scala version to the
dependencies you're using, depending on the one you are using for your
project.

You can find more details here:
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

On Mon, Mar 7, 2016 at 1:19 PM, Ankur Sharma <an...@stud.uni-saarland.de>
wrote:

> Hi,
>
>
> I am getting following error while executing the fat jar of project: Any
> help?
>
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/flink/streaming/util/serialization/DeserializationSchema
>         at org.mpi.debs.Main.main(Main.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.util.serialization.DeserializationSchema
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         ... 1 more
>
>
> Main.scala:
>
> import org.apache.flink.streaming.api.functions.sink.SinkFunction
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
>
>
> object Main {
>   def main(args: Array[String]) {
>     val env = StreamExecutionEnvironment.createLocalEnvironment(1)
>     val stream = env.addSource(new RMQSource[String]("localhost","query-one", new SimpleStringSchema))
>     stream.addSink(new SinkFunction[String] {
>       override def invoke(value: String) = {
>         println(value)
>       }
>     })
>     env.execute("QueryOneExecutor")
>   }
> }
>
> Best,
> *Ankur Sharma*
>
> On 06 Mar 2016, at 20:34, Márton Balassi <ba...@gmail.com> wrote:
>
> Hey Ankur,
>
> Add the following line to your imports, and have a look at the referenced
> FAQ. [1]
>
> import org.apache.flink.streaming.api.scala._
>
> [1]
> https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters
>
> Best,
>
> Marton
>
> On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <an...@stud.uni-saarland.de>
> wrote:
>
>> Hello,
>>
>> I am trying to use a custom source function (declaration given below) for
>> DataStream.
>> if I add the source to stream using add source:
>>
>> val stream = env.addSource(new QueryOneSource(args))
>>
>> *I get following error:  Any explanations and help ??*
>>
>>
>> Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
>>
>>     val stream = env.addSource(new QueryOneSource(args))
>>
>>                               ^
>>
>> Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
>>
>> Unspecified value parameter evidence$16.
>>
>>     val stream = env.addSource(new QueryOneSource(args))
>>
>>                               ^
>>
>>
>> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
>>
>> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
>>
>> override def run(ctx: SourceContext[Tuple]) = {
>>   while (true) {
>>     nextRecord()
>>     ctx.collect(this.nextTuple)
>>   }
>> }
>>
>> override def cancel() = { }
>>
>> }
>>
>> override def nextRecord() = {
>>
>> }
>>
>> }
>>
>> Best,
>> *Ankur Sharma*
>> *Information Systems Group*
>> *3.15 E1.1 Universität des Saarlandes*
>> *66123, Saarbrücken Germany*
>> *Email: ankur.sharma@mpi-inf.mpg.de <an...@mpi-inf.mpg.de> *
>> *            ankur@stud.uni-saarland.de <an...@stud.uni-saarland.de>*
>>
>>
>
>


-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: SourceFunction Scala

Posted by Ankur Sharma <an...@stud.uni-saarland.de>.
Hi, 


I am getting following error while executing the fat jar of project: Any help?


Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema
        at org.mpi.debs.Main.main(Main.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.DeserializationSchema
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 1 more


Main.scala: 

import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    val stream = env.addSource(new RMQSource[String]("localhost","query-one", new SimpleStringSchema))
    stream.addSink(new SinkFunction[String] {
      override def invoke(value: String) = {
        println(value)
      }
    })
    env.execute("QueryOneExecutor")
  }
}
Best,
Ankur Sharma

> On 06 Mar 2016, at 20:34, Márton Balassi <ba...@gmail.com> wrote:
> 
> Hey Ankur,
> 
> Add the following line to your imports, and have a look at the referenced FAQ. [1]
> 
> import org.apache.flink.streaming.api.scala._
> 
> [1] https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters <https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters>
> 
> Best,
> 
> Marton
> 
> On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <ankur@stud.uni-saarland.de <ma...@stud.uni-saarland.de>> wrote:
> Hello,
> 
> I am trying to use a custom source function (declaration given below) for DataStream.
> if I add the source to stream using add source: 
> 
> val stream = env.addSource(new QueryOneSource(args))
> I get following error:  Any explanations and help ??
> 
> Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
>     val stream = env.addSource(new QueryOneSource(args))
>                               ^
> Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
> Unspecified value parameter evidence$16.
>     val stream = env.addSource(new QueryOneSource(args))
>                               ^
> 
> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
> override def run(ctx: SourceContext[Tuple]) = {
>   while (true) {
>     nextRecord()
>     ctx.collect(this.nextTuple)
>   }
> }
> 
> override def cancel() = { }
> }
> 
> override def nextRecord() = {
> }
> }
> 
> Best,
> Ankur Sharma
> Information Systems Group
> 3.15 E1.1 Universität des Saarlandes
> 66123, Saarbrücken Germany
> Email: ankur.sharma@mpi-inf.mpg.de <ma...@mpi-inf.mpg.de> 
>             ankur@stud.uni-saarland.de <ma...@stud.uni-saarland.de>
> 


Re: SourceFunction Scala

Posted by Márton Balassi <ba...@gmail.com>.
Hey Ankur,

Add the following line to your imports, and have a look at the referenced
FAQ. [1]

import org.apache.flink.streaming.api.scala._

[1]
https://flink.apache.org/faq.html#in-scala-api-i-get-an-error-about-implicit-values-and-evidence-parameters

Best,

Marton

On Sun, Mar 6, 2016 at 8:17 PM, Ankur Sharma <an...@stud.uni-saarland.de>
wrote:

> Hello,
>
> I am trying to use a custom source function (declaration given below) for
> DataStream.
> if I add the source to stream using add source:
>
> val stream = env.addSource(new QueryOneSource(args))
>
> *I get following error:  Any explanations and help ??*
>
>
> Error:(14, 31) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple]
>
>     val stream = env.addSource(new QueryOneSource(args))
>
>                               ^
>
> Error:(14, 31) not enough arguments for method addSource: (implicit evidence$15: scala.reflect.ClassTag[org.mpi.debs.Tuple], implicit evidence$16: org.apache.flink.api.common.typeinfo.TypeInformation[org.mpi.debs.Tuple])org.apache.flink.streaming.api.scala.DataStream[org.mpi.debs.Tuple].
>
> Unspecified value parameter evidence$16.
>
>     val stream = env.addSource(new QueryOneSource(args))
>
>                               ^
>
>
> class QueryOneSource(filenames: Array[String]) extends SourceFunction[Tuple] {
>
> val nextTuple: Tuple // case class Tuple(id: Long, value: Int)
>
> override def run(ctx: SourceContext[Tuple]) = {
>   while (true) {
>     nextRecord()
>     ctx.collect(this.nextTuple)
>   }
> }
>
> override def cancel() = { }
>
> }
>
> override def nextRecord() = {
>
> }
>
> }
>
> Best,
> *Ankur Sharma*
> *Information Systems Group*
> *3.15 E1.1 Universität des Saarlandes*
> *66123, Saarbrücken Germany*
> *Email: ankur.sharma@mpi-inf.mpg.de <an...@mpi-inf.mpg.de> *
> *            ankur@stud.uni-saarland.de <an...@stud.uni-saarland.de>*
>
>