You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Vandenabeele <pe...@vandenabeele.com> on 2015/12/20 15:20:56 UTC

Minimal KakfaConsumer in Scala fails compilation with `could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]`

Hi,

I am trying to write a minimal Kafka consumer in Scala and got
this far:

➜  scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala
package io.allthingsdata.kafkaConsumer

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.api.common.typeinfo._
//import org.apache.flink.streaming.api.windowing.time.Time

object KafkaConsumer {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val valueDeserializer = new SimpleStringSchema()
    val props = new java.util.Properties()

    // create a Kafka Consumer
    val kafkaConsumer: FlinkKafkaConsumer082[String] =
      new FlinkKafkaConsumer082(
        "Topic1",
        valueDeserializer,
        props
      )

    // get input data
    val messageStream: DataStream[String] = env.addSource(kafkaConsumer) //
supply typeInfo ?

    // do something with it
    val messages = messageStream.
      map ( s => "Kafka and Flink say: $s" )

    // execute and print result
    messages.print()
  }
}

/*  based on this Java example code
ParameterTool parameterTool = ParameterTool.fromArgs(args);

DataStream<String> messageStream = env
  .addSource(new FlinkKafkaConsumer082<>(
    parameterTool.getRequired("topic"),
    new SimpleStringSchema(),
    parameterTool.getProperties()));

Once a DataStream is created, you can transform it as you like. For
example, let us pad every word with a fixed prefix, and print to stdout:

messageStream
  .rebalance()
  .map ( s -> “Kafka and Flink says: ” + s)
  .print();
*/


When trying to compile in sbt I get these error messages:

```
> compile
[info] Compiling 1 Scala source to
/Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error]     val messageStream: DataStream[String] =
env.addSource(kafkaConsumer) // supply typeInfo ?
[error]                                                          ^
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error]       map ( s => "Kafka and Flink say: $s" )
[error]           ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM
```

When inspecting DataStreamSource addSource, I read:

/**
 * Ads a data source with a custom type information thus opening a
 * {@link DataStream}. Only in very special cases does the user need to
 * support type information. Otherwise use
 * {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
 *


I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo
argument, but that does not solve it.

When trying:

`val messageStream: DataStream[String] = env.addSource(kafkaConsumer,
BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?`

I get:

> compile
[info] Compiling 1 Scala source to
/Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
overloaded method value addSource with alternatives:
[error]   [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
=> Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit
evidence$18:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
<and>
[error]   [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
evidence$15: scala.reflect.ClassTag[T], implicit evidence$16:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
[error]  cannot be applied to
(org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String],
org.apache.flink.api.common.typeinfo.BasicTypeInfo[String])
[error]     val messageStream: DataStream[String] =
env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply
typeInfo ?
[error]                                                 ^
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error]       map ( s => "Kafka and Flink say: $s" )
[error]           ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 0 s, completed Dec 20, 2015 3:13:04 PM
>

The sbt.build I am using is:

```
➜  flink-sbt git:(kafka_exp_001) ✗ cat build.sbt
val flink_scala_0_10_0 = "org.apache.flink" % "flink-scala" % "0.10.0"
val flink_clients_0_10_0 = "org.apache.flink" % "flink-clients" % "0.10.0"
val flink_streaming_scala_0_10_0 = "org.apache.flink" %
"flink-streaming-scala" % "0.10.0"
val flink_connector_kafka_0_10_0 = "org.apache.flink" %
"flink-connector-kafka" % "0.10.0"

lazy val commonSettings = Seq(
  organization := "io.allthingsdata",
  version := "0.1.0",
  scalaVersion := "2.10.6"
)

lazy val root = (project in file(".")).
  settings(commonSettings: _*).
  settings(
    name := "flink-sbt",
    fork in run := true,
    libraryDependencies += flink_scala_0_10_0,
    libraryDependencies += flink_clients_0_10_0,
    libraryDependencies += flink_streaming_scala_0_10_0,
    libraryDependencies += flink_connector_kafka_0_10_0
  )
```

Any hints would be very much appreciated at how to make this minimal
example work.

Many thanks :-)

Peter

Re: Minimal KakfaConsumer in Scala fails compilation with `could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]`

Posted by Robert Metzger <rm...@apache.org>.
Hi Peter,

The problem is that you have the DataSet and DataStream package imports.
Remove the import from the DataSet API (import org.apache.flink.api.scala._)
to make the example work.

On Sun, Dec 20, 2015 at 3:20 PM, Peter Vandenabeele <pe...@vandenabeele.com>
wrote:

> Hi,
>
> I am trying to write a minimal Kafka consumer in Scala and got
> this far:
>
> ➜  scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala
> package io.allthingsdata.kafkaConsumer
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
> import org.apache.flink.api.common.typeinfo._
> //import org.apache.flink.streaming.api.windowing.time.Time
>
> object KafkaConsumer {
>   def main(args: Array[String]) {
>
>     // set up the execution environment
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     val valueDeserializer = new SimpleStringSchema()
>     val props = new java.util.Properties()
>
>     // create a Kafka Consumer
>     val kafkaConsumer: FlinkKafkaConsumer082[String] =
>       new FlinkKafkaConsumer082(
>         "Topic1",
>         valueDeserializer,
>         props
>       )
>
>     // get input data
>     val messageStream: DataStream[String] = env.addSource(kafkaConsumer) //
> supply typeInfo ?
>
>     // do something with it
>     val messages = messageStream.
>       map ( s => "Kafka and Flink say: $s" )
>
>     // execute and print result
>     messages.print()
>   }
> }
>
> /*  based on this Java example code
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
>
> DataStream<String> messageStream = env
>   .addSource(new FlinkKafkaConsumer082<>(
>     parameterTool.getRequired("topic"),
>     new SimpleStringSchema(),
>     parameterTool.getProperties()));
>
> Once a DataStream is created, you can transform it as you like. For
> example, let us pad every word with a fixed prefix, and print to stdout:
>
> messageStream
>   .rebalance()
>   .map ( s -> “Kafka and Flink says: ” + s)
>   .print();
> */
>
>
> When trying to compile in sbt I get these error messages:
>
> ```
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error]     val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer) // supply typeInfo ?
> [error]                                                          ^
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error]       map ( s => "Kafka and Flink say: $s" )
> [error]           ^
> [error] two errors found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM
> ```
>
> When inspecting DataStreamSource addSource, I read:
>
> /**
>  * Ads a data source with a custom type information thus opening a
>  * {@link DataStream}. Only in very special cases does the user need to
>  * support type information. Otherwise use
>  * {@link
> #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
>  *
>
>
> I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo
> argument, but that does not solve it.
>
> When trying:
>
> `val messageStream: DataStream[String] = env.addSource(kafkaConsumer,
> BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?`
>
> I get:
>
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> overloaded method value addSource with alternatives:
> [error]   [T](function:
>
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
> => Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit
> evidence$18:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> <and>
> [error]   [T](function:
> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
> evidence$15: scala.reflect.ClassTag[T], implicit evidence$16:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> [error]  cannot be applied to
> (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String],
> org.apache.flink.api.common.typeinfo.BasicTypeInfo[String])
> [error]     val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply
> typeInfo ?
> [error]                                                 ^
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error]       map ( s => "Kafka and Flink say: $s" )
> [error]           ^
> [error] two errors found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 0 s, completed Dec 20, 2015 3:13:04 PM
> >
>
> The sbt.build I am using is:
>
> ```
> ➜  flink-sbt git:(kafka_exp_001) ✗ cat build.sbt
> val flink_scala_0_10_0 = "org.apache.flink" % "flink-scala" % "0.10.0"
> val flink_clients_0_10_0 = "org.apache.flink" % "flink-clients" % "0.10.0"
> val flink_streaming_scala_0_10_0 = "org.apache.flink" %
> "flink-streaming-scala" % "0.10.0"
> val flink_connector_kafka_0_10_0 = "org.apache.flink" %
> "flink-connector-kafka" % "0.10.0"
>
> lazy val commonSettings = Seq(
>   organization := "io.allthingsdata",
>   version := "0.1.0",
>   scalaVersion := "2.10.6"
> )
>
> lazy val root = (project in file(".")).
>   settings(commonSettings: _*).
>   settings(
>     name := "flink-sbt",
>     fork in run := true,
>     libraryDependencies += flink_scala_0_10_0,
>     libraryDependencies += flink_clients_0_10_0,
>     libraryDependencies += flink_streaming_scala_0_10_0,
>     libraryDependencies += flink_connector_kafka_0_10_0
>   )
> ```
>
> Any hints would be very much appreciated at how to make this minimal
> example work.
>
> Many thanks :-)
>
> Peter
>