You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Peter Vandenabeele <pe...@vandenabeele.com> on 2015/12/20 15:20:56 UTC
Minimal KakfaConsumer in Scala fails compilation with `could not find
implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]`
Hi,
I am trying to write a minimal Kafka consumer in Scala and got
this far:
➜ scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala
package io.allthingsdata.kafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.api.common.typeinfo._
//import org.apache.flink.streaming.api.windowing.time.Time
object KafkaConsumer {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val valueDeserializer = new SimpleStringSchema()
val props = new java.util.Properties()
// create a Kafka Consumer
val kafkaConsumer: FlinkKafkaConsumer082[String] =
new FlinkKafkaConsumer082(
"Topic1",
valueDeserializer,
props
)
// get input data
val messageStream: DataStream[String] = env.addSource(kafkaConsumer) //
supply typeInfo ?
// do something with it
val messages = messageStream.
map ( s => "Kafka and Flink say: $s" )
// execute and print result
messages.print()
}
}
/* based on this Java example code
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer082<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
Once a DataStream is created, you can transform it as you like. For
example, let us pad every word with a fixed prefix, and print to stdout:
messageStream
.rebalance()
.map ( s -> “Kafka and Flink says: ” + s)
.print();
*/
When trying to compile in sbt I get these error messages:
```
> compile
[info] Compiling 1 Scala source to
/Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error] val messageStream: DataStream[String] =
env.addSource(kafkaConsumer) // supply typeInfo ?
[error] ^
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error] map ( s => "Kafka and Flink say: $s" )
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM
```
When inspecting DataStreamSource addSource, I read:
/**
* Ads a data source with a custom type information thus opening a
* {@link DataStream}. Only in very special cases does the user need to
* support type information. Otherwise use
* {@link #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
*
I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo
argument, but that does not solve it.
When trying:
`val messageStream: DataStream[String] = env.addSource(kafkaConsumer,
BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?`
I get:
> compile
[info] Compiling 1 Scala source to
/Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
overloaded method value addSource with alternatives:
[error] [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
=> Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit
evidence$18:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
<and>
[error] [T](function:
org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
evidence$15: scala.reflect.ClassTag[T], implicit evidence$16:
org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
[error] cannot be applied to
(org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String],
org.apache.flink.api.common.typeinfo.BasicTypeInfo[String])
[error] val messageStream: DataStream[String] =
env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply
typeInfo ?
[error] ^
[error]
/Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error] map ( s => "Kafka and Flink say: $s" )
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 0 s, completed Dec 20, 2015 3:13:04 PM
>
The sbt.build I am using is:
```
➜ flink-sbt git:(kafka_exp_001) ✗ cat build.sbt
val flink_scala_0_10_0 = "org.apache.flink" % "flink-scala" % "0.10.0"
val flink_clients_0_10_0 = "org.apache.flink" % "flink-clients" % "0.10.0"
val flink_streaming_scala_0_10_0 = "org.apache.flink" %
"flink-streaming-scala" % "0.10.0"
val flink_connector_kafka_0_10_0 = "org.apache.flink" %
"flink-connector-kafka" % "0.10.0"
lazy val commonSettings = Seq(
organization := "io.allthingsdata",
version := "0.1.0",
scalaVersion := "2.10.6"
)
lazy val root = (project in file(".")).
settings(commonSettings: _*).
settings(
name := "flink-sbt",
fork in run := true,
libraryDependencies += flink_scala_0_10_0,
libraryDependencies += flink_clients_0_10_0,
libraryDependencies += flink_streaming_scala_0_10_0,
libraryDependencies += flink_connector_kafka_0_10_0
)
```
Any hints would be very much appreciated at how to make this minimal
example work.
Many thanks :-)
Peter
Re: Minimal KakfaConsumer in Scala fails compilation with `could not
find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]`
Posted by Robert Metzger <rm...@apache.org>.
Hi Peter,
The problem is that you have the DataSet and DataStream package imports.
Remove the import from the DataSet API (import org.apache.flink.api.scala._)
to make the example work.
On Sun, Dec 20, 2015 at 3:20 PM, Peter Vandenabeele <pe...@vandenabeele.com>
wrote:
> Hi,
>
> I am trying to write a minimal Kafka consumer in Scala and got
> this far:
>
> ➜ scala git:(kafka_exp_001) ✗ cat KafkaConsumer.scala
> package io.allthingsdata.kafkaConsumer
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
> import org.apache.flink.api.common.typeinfo._
> //import org.apache.flink.streaming.api.windowing.time.Time
>
> object KafkaConsumer {
> def main(args: Array[String]) {
>
> // set up the execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val valueDeserializer = new SimpleStringSchema()
> val props = new java.util.Properties()
>
> // create a Kafka Consumer
> val kafkaConsumer: FlinkKafkaConsumer082[String] =
> new FlinkKafkaConsumer082(
> "Topic1",
> valueDeserializer,
> props
> )
>
> // get input data
> val messageStream: DataStream[String] = env.addSource(kafkaConsumer) //
> supply typeInfo ?
>
> // do something with it
> val messages = messageStream.
> map ( s => "Kafka and Flink say: $s" )
>
> // execute and print result
> messages.print()
> }
> }
>
> /* based on this Java example code
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
>
> DataStream<String> messageStream = env
> .addSource(new FlinkKafkaConsumer082<>(
> parameterTool.getRequired("topic"),
> new SimpleStringSchema(),
> parameterTool.getProperties()));
>
> Once a DataStream is created, you can transform it as you like. For
> example, let us pad every word with a fixed prefix, and print to stdout:
>
> messageStream
> .rebalance()
> .map ( s -> “Kafka and Flink says: ” + s)
> .print();
> */
>
>
> When trying to compile in sbt I get these error messages:
>
> ```
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error] val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer) // supply typeInfo ?
> [error] ^
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error] map ( s => "Kafka and Flink say: $s" )
> [error] ^
> [error] two errors found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 0 s, completed Dec 19, 2015 5:11:56 PM
> ```
>
> When inspecting DataStreamSource addSource, I read:
>
> /**
> * Ads a data source with a custom type information thus opening a
> * {@link DataStream}. Only in very special cases does the user need to
> * support type information. Otherwise use
> * {@link
> #addSource(org.apache.flink.streaming.api.functions.source.SourceFunction)}
> *
>
>
> I did try to supply a `BasicTypeInfo.STRING_TYPE_INFO` as typeInfo
> argument, but that does not solve it.
>
> When trying:
>
> `val messageStream: DataStream[String] = env.addSource(kafkaConsumer,
> BasicTypeInfo.STRING_TYPE_INFO) // supply typeInfo ?`
>
> I get:
>
> > compile
> [info] Compiling 1 Scala source to
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/target/scala-2.10/classes...
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:28:
> overloaded method value addSource with alternatives:
> [error] [T](function:
>
> org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T]
> => Unit)(implicit evidence$17: scala.reflect.ClassTag[T], implicit
> evidence$18:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> <and>
> [error] [T](function:
> org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit
> evidence$15: scala.reflect.ClassTag[T], implicit evidence$16:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
> [error] cannot be applied to
> (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082[String],
> org.apache.flink.api.common.typeinfo.BasicTypeInfo[String])
> [error] val messageStream: DataStream[String] =
> env.addSource(kafkaConsumer, BasicTypeInfo.STRING_TYPE_INFO) // supply
> typeInfo ?
> [error] ^
> [error]
>
> /Users/peter_v/data/github/petervandenabeele/flink-sbt/src/main/scala/KafkaConsumer.scala:32:
> could not find implicit value for evidence parameter of type
> org.apache.flink.api.common.typeinfo.TypeInformation[String]
> [error] map ( s => "Kafka and Flink say: $s" )
> [error] ^
> [error] two errors found
> [error] (compile:compileIncremental) Compilation failed
> [error] Total time: 0 s, completed Dec 20, 2015 3:13:04 PM
> >
>
> The sbt.build I am using is:
>
> ```
> ➜ flink-sbt git:(kafka_exp_001) ✗ cat build.sbt
> val flink_scala_0_10_0 = "org.apache.flink" % "flink-scala" % "0.10.0"
> val flink_clients_0_10_0 = "org.apache.flink" % "flink-clients" % "0.10.0"
> val flink_streaming_scala_0_10_0 = "org.apache.flink" %
> "flink-streaming-scala" % "0.10.0"
> val flink_connector_kafka_0_10_0 = "org.apache.flink" %
> "flink-connector-kafka" % "0.10.0"
>
> lazy val commonSettings = Seq(
> organization := "io.allthingsdata",
> version := "0.1.0",
> scalaVersion := "2.10.6"
> )
>
> lazy val root = (project in file(".")).
> settings(commonSettings: _*).
> settings(
> name := "flink-sbt",
> fork in run := true,
> libraryDependencies += flink_scala_0_10_0,
> libraryDependencies += flink_clients_0_10_0,
> libraryDependencies += flink_streaming_scala_0_10_0,
> libraryDependencies += flink_connector_kafka_0_10_0
> )
> ```
>
> Any hints would be very much appreciated at how to make this minimal
> example work.
>
> Many thanks :-)
>
> Peter
>