You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by anshu shukla <an...@gmail.com> on 2015/07/08 23:21:54 UTC
Requirement failed: Some of the DStreams have different slide durations
Hi all ,
I want to create union of 2 DStreams , in one of them *RDD is created
per 1 second* , other is having RDD generated by reduceByWindowandKey
with *duration set to 60 sec.* (slide duration also 60 sec .)
- Main idea is to do some analysis for every minute data and emitting
union of input data (per sec.) and transformed data (per min.) .
*Code is -*
JavaPairDStream<String, String> windowedGridCounts =
GridtoPair.reduceByKeyAndWindow(new Function2<String, String, String>() {
@Override public String call(String i1, String i2) {
long id1= MsgIdAddandRemove.getMessageId(i1);
long id2= MsgIdAddandRemove.getMessageId(i2);
Float v1= Float.parseFloat(MsgIdAddandRemove.getMessageContent(i1));
Float v2= Float.parseFloat(MsgIdAddandRemove.getMessageContent(i1));
String res= String.valueOf(v1+v2);
if(id1>id2) {
return MsgIdAddandRemove.addMessageId(res, id1);
}
else{
return MsgIdAddandRemove.addMessageId(res,id2);
}
}}, *Durations.seconds(60),Durations.seconds(60));*
*JavaDStream<String>
UnionStream=tollPercent.union(underPay).union(taxSumPercent);*
*Getting the following error -*
*Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: Some of the DStreams have different slide durations at
scala.Predef$.require(Predef.scala:233) at
org.apache.spark.streaming.dstream.UnionDStream.<init>(UnionDStream.scala:33)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$union$1.apply(DStream.scala:849)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$union$1.apply(DStream.scala:849)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)*
--
Thanks & Regards,
Anshu Shukla