You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Srikanth <sr...@gmail.com> on 2016/05/03 01:18:26 UTC

Scala compilation error

Hello,

I'm fac

val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
BidderRawLogs(b)).keyBy(b => b.strategyId)

val metaStrategy: KeyedStream[(Int, String), Int] =
env.readTextFile("path").name("Strategy")
 .map((1, _) ).keyBy(_._1)

val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
(Int, BidderRawLogs, (Int, String))] =
 new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
staticTypeInfo)
val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
{}.getTypeInfo()

val funName = "test"
val joinedStream = bidderStream.connect(metaStrategy)
.transform(funName, joinOperator, outTypeInfo)

Re: Scala compilation error

Posted by Srikanth <sr...@gmail.com>.
Yes, I did notice the usage of implicit in ConnectedStreams.scala.
Better Scaladoc will be helpful, especially when compiler errors are not
clear.

Thanks

On Tue, May 3, 2016 at 5:02 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> There is a Scaladoc but it is not covering all packages, unfortunately. In
> the Scala API you can call transform without specifying a TypeInformation,
> it works using implicits/context bounds.
>
> On Tue, 3 May 2016 at 01:48 Srikanth <sr...@gmail.com> wrote:
>
>> Sorry for the previous incomplete email. Didn't realize I hit send!
>>
>> I was facing a weird compilation error in Scala when I did
>> val joinedStream = stream1.connect(stream2)
>> .transform("funName", outTypeInfo, joinOperator)
>>
>> It turned out to be due to a difference in API signature between Scala
>> and Java API. I was refering to javadoc. Is there a scaladoc?
>>
>> Java API has
>> public <R> SingleOutputStreamOperator<R> transform(
>>                         String functionName,
>> TypeInformation<R> outTypeInfo,
>> TwoInputStreamOperator<IN1, IN2, R> operator)
>>
>> Scala API has
>> def transform[R: TypeInformation](
>>       functionName: String,
>>       operator: TwoInputStreamOperator[IN1, IN2, R])
>>
>> Srikanth
>>
>> On Mon, May 2, 2016 at 7:18 PM, Srikanth <sr...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I'm fac
>>>
>>> val stream = env.addSource(new
>>> FlinkKafkaConsumer09[String]("test-topic", new SimpleStringSchema(),
>>> properties))
>>> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
>>> BidderRawLogs(b)).keyBy(b => b.strategyId)
>>>
>>> val metaStrategy: KeyedStream[(Int, String), Int] =
>>> env.readTextFile("path").name("Strategy")
>>>  .map((1, _) ).keyBy(_._1)
>>>
>>> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
>>> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
>>> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
>>> (Int, BidderRawLogs, (Int, String))] =
>>>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
>>> staticTypeInfo)
>>> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
>>> {}.getTypeInfo()
>>>
>>> val funName = "test"
>>> val joinedStream = bidderStream.connect(metaStrategy)
>>> .transform(funName, joinOperator, outTypeInfo)
>>>
>>>
>>

Re: Scala compilation error

Posted by Aljoscha Krettek <al...@apache.org>.
There is a Scaladoc but it is not covering all packages, unfortunately. In
the Scala API you can call transform without specifying a TypeInformation,
it works using implicits/context bounds.

On Tue, 3 May 2016 at 01:48 Srikanth <sr...@gmail.com> wrote:

> Sorry for the previous incomplete email. Didn't realize I hit send!
>
> I was facing a weird compilation error in Scala when I did
> val joinedStream = stream1.connect(stream2)
> .transform("funName", outTypeInfo, joinOperator)
>
> It turned out to be due to a difference in API signature between Scala and
> Java API. I was refering to javadoc. Is there a scaladoc?
>
> Java API has
> public <R> SingleOutputStreamOperator<R> transform(
>                         String functionName,
> TypeInformation<R> outTypeInfo,
> TwoInputStreamOperator<IN1, IN2, R> operator)
>
> Scala API has
> def transform[R: TypeInformation](
>       functionName: String,
>       operator: TwoInputStreamOperator[IN1, IN2, R])
>
> Srikanth
>
> On Mon, May 2, 2016 at 7:18 PM, Srikanth <sr...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm fac
>>
>> val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
>> new SimpleStringSchema(), properties))
>> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
>> BidderRawLogs(b)).keyBy(b => b.strategyId)
>>
>> val metaStrategy: KeyedStream[(Int, String), Int] =
>> env.readTextFile("path").name("Strategy")
>>  .map((1, _) ).keyBy(_._1)
>>
>> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
>> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
>> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
>> (Int, BidderRawLogs, (Int, String))] =
>>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
>> staticTypeInfo)
>> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
>> {}.getTypeInfo()
>>
>> val funName = "test"
>> val joinedStream = bidderStream.connect(metaStrategy)
>> .transform(funName, joinOperator, outTypeInfo)
>>
>>
>

Re: Scala compilation error

Posted by Srikanth <sr...@gmail.com>.
Sorry for the previous incomplete email. Didn't realize I hit send!

I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)

It turned out to be due to a difference in API signature between Scala and
Java API. I was refering to javadoc. Is there a scaladoc?

Java API has
public <R> SingleOutputStreamOperator<R> transform(
                        String functionName,
TypeInformation<R> outTypeInfo,
TwoInputStreamOperator<IN1, IN2, R> operator)

Scala API has
def transform[R: TypeInformation](
      functionName: String,
      operator: TwoInputStreamOperator[IN1, IN2, R])

Srikanth

On Mon, May 2, 2016 at 7:18 PM, Srikanth <sr...@gmail.com> wrote:

> Hello,
>
> I'm fac
>
> val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
> new SimpleStringSchema(), properties))
> val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
> BidderRawLogs(b)).keyBy(b => b.strategyId)
>
> val metaStrategy: KeyedStream[(Int, String), Int] =
> env.readTextFile("path").name("Strategy")
>  .map((1, _) ).keyBy(_._1)
>
> val staticTypeInfo = new TypeHint[(Int, String)]() {}.getTypeInfo()
> val dynamicTypeInfo = new TypeHint[BidderRawLogs]() {}.getTypeInfo()
> val joinOperator: TwoInputStreamOperator[BidderRawLogs, (Int, String),
> (Int, BidderRawLogs, (Int, String))] =
>  new JoinOperator[Int, BidderRawLogs, (Int, String)] (dynamicTypeInfo,
> staticTypeInfo)
> val outTypeInfo = new TypeHint[(Int, BidderRawLogs, (Int, String))]()
> {}.getTypeInfo()
>
> val funName = "test"
> val joinedStream = bidderStream.connect(metaStrategy)
> .transform(funName, joinOperator, outTypeInfo)
>
>