You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nirmalya Sengupta <se...@gmail.com> on 2016/02/26 02:44:15 UTC
Need some help to understand the cause of the error
Hello Flinksters,
I am trying to use Flinkspector in a Scala code snippet of mine and Flink
is complaining. The code is here:
---------------------------------------------------------------------------------------------------------------
case class Reading(field1:String,field2:String,field3:Int)
object MultiWindowing {
def main(args: Array[String]) {}
// WindowFunction<IN,OUT,KEY,W extends Window>
class WindowPrinter extends WindowFunction[Reading, String, String,
TimeWindow] {
// .....
}
}
val env = DataStreamTestEnvironment.createTestEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val input: EventTimeInput[Reading] =
EventTimeInputBuilder
.startWith(Reading("hans", "elephant", 15))
.emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
.emit(Reading("pete", "elephant", 40), After.period(20,
TimeUnit.SECONDS))
//acquire data source from input
val stream = env.fromInput(input)
//apply transformation
val k = stream.keyBy(new KeySelector [Reading,String] {
def getKey(r:Reading) = r.field2
})
.timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))
k.sum(3)
.print()
env.execute()
}
---------------------------------------------------------------------------------------------------------------
And at runtime, I get this error:
----------------------------------------------------------------------------------------------------------------
Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for
a simple type (non-tuple, non-array).
at
org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
at
org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
at
org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
... 6 more
---------------------------------------------------------------------------------------------------------------
Can someone help me by pointing out the mistake I am making?
-- Nirmalya
--
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."
Re: Need some help to understand the cause of the error
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
as far as I can see it the problem is in this line:
k.sum(3)
using field indices is only valid for Tuple Types. In your case you should be able to use this:
k.sum(“field3”)
because this is a field of your Reading type.
Cheers,
Aljoscha
> On 26 Feb 2016, at 02:44, Nirmalya Sengupta <se...@gmail.com> wrote:
>
> Hello Flinksters,
>
> I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining. The code is here:
>
> ---------------------------------------------------------------------------------------------------------------
>
> case class Reading(field1:String,field2:String,field3:Int)
>
> object MultiWindowing {
>
> def main(args: Array[String]) {}
>
> // WindowFunction<IN,OUT,KEY,W extends Window>
>
> class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] {
>
> // .....
> }
> }
>
> val env = DataStreamTestEnvironment.createTestEnvironment(1)
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val input: EventTimeInput[Reading] =
> EventTimeInputBuilder
> .startWith(Reading("hans", "elephant", 15))
> .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
> .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS))
>
> //acquire data source from input
> val stream = env.fromInput(input)
>
> //apply transformation
> val k = stream.keyBy(new KeySelector [Reading,String] {
> def getKey(r:Reading) = r.field2
> })
> .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))
>
> k.sum(3)
> .print()
>
> env.execute()
>
> }
>
> ---------------------------------------------------------------------------------------------------------------
>
> And at runtime, I get this error:
>
> ----------------------------------------------------------------------------------------------------------------
>
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array).
> at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
> at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
> at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
> at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
> at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
> ... 6 more
>
>
> ---------------------------------------------------------------------------------------------------------------
>
> Can someone help me by pointing out the mistake I am making?
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is where they should be.
> Now put the foundation under them."