You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by nsengupta <se...@gmail.com> on 2017/02/23 13:36:29 UTC
Compilation Error in WindowStream.fold()
For reasons I cannot grasp, I am unable to move ahead.
Here's the code:
---------------------------------------------------------------------------------------------------------------------------------------------
import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala._
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport,
RawMITSIMTuple, VehicleID}
case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int,
eWaySegment: Int)
case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos:
Int)
case class PositionReport(
// tupletype: Int,
timeOfReport: Int,
eWayCoordinates: EWayCoordinates,
vehicleDetails: VehicleDetails
)
// ....
val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4)
envDefault
.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings = IndexedSeq [RawMITSIMTuple] (
RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1),
RawMITSIMTuple(0,2,112,28,1,0,0, 1, 5757,-1,-1,-1,-1,-1,-1)
)
val folder = new FoldFunction[PositionReport, Map[EWayCoordinates,Set[Int]]]
{
override
def fold(
t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport
): Map[EWayCoordinates, Set[VehicleID]] = {
t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty)
+ (o.vehicleDetails.vehicleID)))
}
}
val windower = new AllWindowFunction[Map[EWayCoordinates,
Set[VehicleID]],(EWayCoordinates,Int),Window] {
override
def apply(
w: Window,
bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]],
collector: Collector[(EWayCoordinates, VehicleID)]): Unit = {
val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e =>
e.size)
allVehiclesInLast30Mins.foreach(e => println(e))
collector.collect((EWayCoordinates(-1,-1,-1,-1),0))
}
}
val uniqueVehicles = envDefault
.fromCollection(readings)
.map(e => MITSIMUtils.preparePositionReport(e))
.assignAscendingTimestamps(e => e.timeOfReport)
.keyBy(e => (
e.eWayCoordinates.eWayID,
e.eWayCoordinates.eWayDir,
e.eWayCoordinates.eWaySegment,
e.vehicleDetails.vehicleID))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
.fold(
// Seed
Map[EWayCoordinates,Set[VehicleID]](),
// FoldFunction
folder,
// WindowFunction
windower,
// Satisfying the compiler
new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
new TupleTypeInfo[(EWayCoordinates,Int)]
)
-----------------------------------------------------------------------------------------
The compiler is unhappy:
[ERROR]
/home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136:
error: missing argument list for method fold in class AllWindowedStream
[ERROR] Unapplied methods are only converted to functions when a function
type is expected.
[ERROR] You can make this conversion explicit by writing `fold _` or
`fold(_)(_)(_)` instead of `fold`.
[ERROR] .fold(
[ERROR] ^
[ERROR] one error found
----------------------------------------------------------------------------------------
I understand why is the compiler unhappy, but I am unsure if I have to go
through all the *devilry*. In no Flink example, I see some such thing being
prescribed. But, then, perhaps I am missing an important point.
I have been through this comment
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Incremental-aggregations-Example-not-working-td10581.html#a10585>
by *Yassine Marzougui*, before I added those type hints. But, I am using
*Flink 1.2.0*.
I know this sounds silly, but I am simply failing to get out of this.
All help appreciated.
-- Nirmalya
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Compilation Error in WindowStream.fold()
Posted by Aljoscha Krettek <al...@apache.org>.
It seems the type of your initial accumulator, which
is Map[EWayCoordinates,Set[VehicleID]], does not match the accumulator type
on your FoldFunction, which is Map[EWayCoordinates,Set[Int]]. Could you
change that?
On Sat, 25 Feb 2017 at 04:09 nsengupta <se...@gmail.com> wrote:
> Hello Aljoscha,
>
> Many thanks for taking this up.
>
> This is the modified code:
>
> ----------------------------------------------------------------------------------
> val uniqueVehicles = envDefault
> .fromCollection(readings)
> .map(e => MITSIMUtils.preparePositionReport(e))
> .assignAscendingTimestamps(e => e.timeOfReport)
> .keyBy(e => (
> e.eWayCoordinates.eWayID,
> e.eWayCoordinates.eWayDir,
> e.eWayCoordinates.eWaySegment,
> e.vehicleDetails.vehicleID))
> .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
> .fold(
> // Seed
> Map[EWayCoordinates,Set[VehicleID]](),
>
> // FoldFunction
> folder,
>
> // WindowFunction
> windower
>
> // I have taken the TupleTypeInfo out, to see what the compiler
> says!
> // Satisfying the compiler:
>
> /*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
> new TupleTypeInfo[(EWayCoordinates,Int)]*/
> )
>
> ----------------------------------------------------------------------------------
>
> And, this is what the compiler says:
>
> ----------------------------------------------------------------------------------
> [INFO] Compiling 3 source files to
> /home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at
> 1487991829901
> [ERROR]
>
> /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137:
> error: overloaded method value fold with alternatives:
> [ERROR] [ACC, R](initialValue: ACC, preAggregator: (ACC,
> org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction:
> (org.apache.flink.streaming.api.windowing.windows.TimeWindow,
> Iterable[ACC],
> org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7:
> org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit
> evidence$8:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
> <and>
> [ERROR] [ACC, R](initialValue: ACC, preAggregator:
>
> org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC],
> windowFunction:
>
> org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit
> evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC],
> implicit evidence$6:
>
> org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
> [ERROR] cannot be applied to
>
> (scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],
>
> org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]],
>
> org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates,
> Int),org.apache.flink.streaming.api.windowing.windows.Window])
> [ERROR] .fold(
> [ERROR] ^
> [ERROR] one error found
> [INFO]
> ------------------------------------------------------------------------
> [INFO] BUILD FAILURE
> [INFO]
> ------------------------------------------------------------------------
>
>
> ----------------------------------------------------------------------------------
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830p11910.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
Re: Compilation Error in WindowStream.fold()
Posted by nsengupta <se...@gmail.com>.
Hello Aljoscha,
Many thanks for taking this up.
This is the modified code:
----------------------------------------------------------------------------------
val uniqueVehicles = envDefault
.fromCollection(readings)
.map(e => MITSIMUtils.preparePositionReport(e))
.assignAscendingTimestamps(e => e.timeOfReport)
.keyBy(e => (
e.eWayCoordinates.eWayID,
e.eWayCoordinates.eWayDir,
e.eWayCoordinates.eWaySegment,
e.vehicleDetails.vehicleID))
.windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
.fold(
// Seed
Map[EWayCoordinates,Set[VehicleID]](),
// FoldFunction
folder,
// WindowFunction
windower
// I have taken the TupleTypeInfo out, to see what the compiler
says!
// Satisfying the compiler:
/*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
new TupleTypeInfo[(EWayCoordinates,Int)]*/
)
----------------------------------------------------------------------------------
And, this is what the compiler says:
----------------------------------------------------------------------------------
[INFO] Compiling 3 source files to
/home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at
1487991829901
[ERROR]
/home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137:
error: overloaded method value fold with alternatives:
[ERROR] [ACC, R](initialValue: ACC, preAggregator: (ACC,
org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction:
(org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[ACC],
org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7:
org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit
evidence$8:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
<and>
[ERROR] [ACC, R](initialValue: ACC, preAggregator:
org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC],
windowFunction:
org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit
evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC],
implicit evidence$6:
org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR] cannot be applied to
(scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],
org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]],
org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates,
Int),org.apache.flink.streaming.api.windowing.windows.Window])
[ERROR] .fold(
[ERROR] ^
[ERROR] one error found
[INFO]
------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO]
------------------------------------------------------------------------
----------------------------------------------------------------------------------
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830p11910.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Compilation Error in WindowStream.fold()
Posted by Aljoscha Krettek <al...@apache.org>.
Hi Nirmalya,
what does the compiler say if you use the variant without explicit
TypeInfo? Like this:
.fold(
// Seed
Map[EWayCoordinates,Set[VehicleID]](),
// FoldFunction
folder,
// WindowFunction
windower,
)
Best,
Aljoscha
On Thu, 23 Feb 2017 at 14:41 nsengupta <se...@gmail.com> wrote:
> For reasons I cannot grasp, I am unable to move ahead.
>
> Here's the code:
>
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>
> import org.apache.flink.api.common.functions.FoldFunction
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.streaming.api.scala._
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.{TimeWindow,
> Window}
> import org.apache.flink.util.Collector
> import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport,
> RawMITSIMTuple, VehicleID}
>
> case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int,
> eWaySegment: Int)
>
> case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos:
> Int)
>
> case class PositionReport(
> // tupletype: Int,
> timeOfReport: Int,
> eWayCoordinates: EWayCoordinates,
> vehicleDetails: VehicleDetails
> )
>
>
> // ....
>
>
> val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4)
> envDefault
> .setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val readings = IndexedSeq [RawMITSIMTuple] (
> RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1),
> RawMITSIMTuple(0,2,112,28,1,0,0, 1, 5757,-1,-1,-1,-1,-1,-1)
> )
>
> val folder = new FoldFunction[PositionReport,
> Map[EWayCoordinates,Set[Int]]]
> {
> override
> def fold(
> t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport
> ): Map[EWayCoordinates, Set[VehicleID]] = {
> t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty)
> + (o.vehicleDetails.vehicleID)))
> }
> }
>
> val windower = new AllWindowFunction[Map[EWayCoordinates,
> Set[VehicleID]],(EWayCoordinates,Int),Window] {
> override
> def apply(
> w: Window,
> bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]],
> collector: Collector[(EWayCoordinates, VehicleID)]): Unit = {
>
> val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e =>
> e.size)
>
> allVehiclesInLast30Mins.foreach(e => println(e))
>
> collector.collect((EWayCoordinates(-1,-1,-1,-1),0))
>
> }
> }
>
> val uniqueVehicles = envDefault
> .fromCollection(readings)
> .map(e => MITSIMUtils.preparePositionReport(e))
> .assignAscendingTimestamps(e => e.timeOfReport)
> .keyBy(e => (
> e.eWayCoordinates.eWayID,
> e.eWayCoordinates.eWayDir,
> e.eWayCoordinates.eWaySegment,
> e.vehicleDetails.vehicleID))
> .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
> .fold(
> // Seed
> Map[EWayCoordinates,Set[VehicleID]](),
>
> // FoldFunction
> folder,
>
> // WindowFunction
> windower,
>
> // Satisfying the compiler
> new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
> new TupleTypeInfo[(EWayCoordinates,Int)]
> )
>
>
> -----------------------------------------------------------------------------------------
>
> The compiler is unhappy:
>
> [ERROR]
>
> /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136:
> error: missing argument list for method fold in class AllWindowedStream
> [ERROR] Unapplied methods are only converted to functions when a function
> type is expected.
> [ERROR] You can make this conversion explicit by writing `fold _` or
> `fold(_)(_)(_)` instead of `fold`.
> [ERROR] .fold(
> [ERROR] ^
> [ERROR] one error found
>
>
> ----------------------------------------------------------------------------------------
>
> I understand why is the compiler unhappy, but I am unsure if I have to go
> through all the *devilry*. In no Flink example, I see some such thing being
> prescribed. But, then, perhaps I am missing an important point.
>
> I have been through this comment
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Incremental-aggregations-Example-not-working-td10581.html#a10585
> >
> by *Yassine Marzougui*, before I added those type hints. But, I am using
> *Flink 1.2.0*.
>
> I know this sounds silly, but I am simply failing to get out of this.
>
> All help appreciated.
>
> -- Nirmalya
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>