You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nirmalya Sengupta <se...@gmail.com> on 2016/02/26 02:44:15 UTC

Need some help to understand the cause of the error

Hello Flinksters,

I am trying to use Flinkspector in a Scala code snippet of mine and Flink
is complaining. The code is here:

---------------------------------------------------------------------------------------------------------------

case class Reading(field1:String,field2:String,field3:Int)

object MultiWindowing {

  def main(args: Array[String]) {}

  //  WindowFunction<IN,OUT,KEY,W extends Window>

  class WindowPrinter extends WindowFunction[Reading, String, String,
TimeWindow] {

      //  .....
    }
  }

  val env = DataStreamTestEnvironment.createTestEnvironment(1)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  val input: EventTimeInput[Reading]  =
    EventTimeInputBuilder
    .startWith(Reading("hans", "elephant", 15))
    .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
    .emit(Reading("pete", "elephant", 40), After.period(20,
TimeUnit.SECONDS))

  //acquire data source from input
  val stream = env.fromInput(input)

  //apply transformation
  val k = stream.keyBy(new KeySelector [Reading,String] {
    def getKey(r:Reading) =  r.field2
  })
    .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))

    k.sum(3)
    .print()

  env.execute()

}

---------------------------------------------------------------------------------------------------------------

And at runtime, I get this error:

----------------------------------------------------------------------------------------------------------------

Exception in thread "main" java.lang.ExceptionInInitializerError
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for
a simple type (non-tuple, non-array).
at
org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
at
org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
at
org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
at
org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
... 6 more


---------------------------------------------------------------------------------------------------------------

Can someone help me by pointing out the mistake I am making?

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Re: Need some help to understand the cause of the error

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
as far as I can see it the problem is in this line:
k.sum(3)

using field indices is only valid for Tuple Types. In your case you should be able to use this:
k.sum(“field3”)

because this is a field of your Reading type.

Cheers,
Aljoscha
> On 26 Feb 2016, at 02:44, Nirmalya Sengupta <se...@gmail.com> wrote:
> 
> Hello Flinksters,
> 
> I am trying to use Flinkspector in a Scala code snippet of mine and Flink is complaining. The code is here:
> 
> ---------------------------------------------------------------------------------------------------------------
> 
> case class Reading(field1:String,field2:String,field3:Int)
> 
> object MultiWindowing {
> 
>   def main(args: Array[String]) {}
> 
>   //  WindowFunction<IN,OUT,KEY,W extends Window>
> 
>   class WindowPrinter extends WindowFunction[Reading, String, String, TimeWindow] {
> 
>       //  .....
>     }
>   }
> 
>   val env = DataStreamTestEnvironment.createTestEnvironment(1)
> 
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
>   val input: EventTimeInput[Reading]  =
>     EventTimeInputBuilder
>     .startWith(Reading("hans", "elephant", 15))
>     .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS))
>     .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS))
> 
>   //acquire data source from input
>   val stream = env.fromInput(input)
> 
>   //apply transformation
>   val k = stream.keyBy(new KeySelector [Reading,String] {
>     def getKey(r:Reading) =  r.field2
>   })
>     .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES))
> 
>     k.sum(3)
>     .print()
> 
>   env.execute()
> 
> }
> 
> ---------------------------------------------------------------------------------------------------------------
> 
> And at runtime, I get this error:
> 
> ----------------------------------------------------------------------------------------------------------------
> 
> Exception in thread "main" java.lang.ExceptionInInitializerError
> 	at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array).
> 	at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76)
> 	at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37)
> 	at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373)
> 	at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63)
> 	at org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala)
> 	... 6 more
> 
> 
> ---------------------------------------------------------------------------------------------------------------
> 
> Can someone help me by pointing out the mistake I am making?
> 
> -- Nirmalya
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is where they should be.
> Now put the foundation under them."